diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 72 |
1 files changed, 67 insertions, 5 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 8180bfd84ea..ada4cfb1725 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -5,16 +5,22 @@ import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.SlobroksConfig; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; import com.yahoo.feedapi.MessagePropertyProcessor; +import com.yahoo.feedapi.SimpleFeedAccess; import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; import com.yahoo.jdisc.Metric; @@ -23,6 +29,10 @@ import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -63,10 +73,59 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { @Override public HttpResponse handle(HttpRequest request) { - return handle(request, (RouteMetricSet.ProgressCallback)null); + return handle(request, null, 1); } - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) { + static final class ThreadedFeedAccess implements SimpleFeedAccess { + + private final SimpleFeedAccess simpleFeedAccess; + private final ExecutorService executor; + ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { + this.simpleFeedAccess = simpleFeedAccess; + if (numThreads <= 0) { + numThreads = Runtime.getRuntime().availableProcessors(); + } + executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Override + public void put(Document doc) { + executor.execute(() -> simpleFeedAccess.put(doc)); + } + + @Override + public void remove(DocumentId docId) { + executor.execute(() -> simpleFeedAccess.remove(docId)); + } + + @Override + public void update(DocumentUpdate update) { + executor.execute(() -> simpleFeedAccess.update(update)); + } + + @Override + public void put(Document doc, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.put(doc, condition)); + } + + @Override + public void remove(DocumentId docId, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.remove(docId, condition)); + } + + @Override + public void update(DocumentUpdate update, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.update(update, condition)); + } + + @Override + public boolean isAborted() { + return simpleFeedAccess.isAborted(); + } + } + + public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { if (request.getProperty("status") != null) { return new MetricResponse(context.getMetrics().getMetricSet()); } @@ -85,8 +144,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); sender.addMessageProcessor(properties); sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - - Feeder feeder = createFeeder(sender, request); + SimpleFeedAccess feedAccess = sender; + if (numThreads != 1) { + feedAccess = new ThreadedFeedAccess(numThreads, feedAccess); + } + Feeder feeder = createFeeder(feedAccess, request); feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); response.setAbortOnFeedError(properties.getAbortOnFeedError()); @@ -116,7 +178,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { } } - private Feeder createFeeder(SingleSender sender, HttpRequest request) { + private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { String contentType = request.getHeader("Content-Type"); if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); |