summaryrefslogtreecommitdiffstats
path: root/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
diff options
context:
space:
mode:
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java72
1 files changed, 67 insertions, 5 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 8180bfd84ea..ada4cfb1725 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -5,16 +5,22 @@ import com.google.inject.Inject;
import com.yahoo.clientmetrics.RouteMetricSet;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.jdisc.EmptyResponse;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.protect.Error;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.feedapi.DocprocMessageProcessor;
import com.yahoo.feedapi.FeedContext;
import com.yahoo.feedapi.Feeder;
import com.yahoo.feedapi.JsonFeeder;
import com.yahoo.feedapi.MessagePropertyProcessor;
+import com.yahoo.feedapi.SimpleFeedAccess;
import com.yahoo.feedapi.SingleSender;
import com.yahoo.feedapi.XMLFeeder;
import com.yahoo.jdisc.Metric;
@@ -23,6 +29,10 @@ import com.yahoo.vespaclient.config.FeederConfig;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,10 +73,59 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
@Override
public HttpResponse handle(HttpRequest request) {
- return handle(request, (RouteMetricSet.ProgressCallback)null);
+ return handle(request, null, 1);
}
- public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) {
+ static final class ThreadedFeedAccess implements SimpleFeedAccess {
+
+ private final SimpleFeedAccess simpleFeedAccess;
+ private final ExecutorService executor;
+ ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) {
+ this.simpleFeedAccess = simpleFeedAccess;
+ if (numThreads <= 0) {
+ numThreads = Runtime.getRuntime().availableProcessors();
+ }
+ executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(false),
+ ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+ @Override
+ public void put(Document doc) {
+ executor.execute(() -> simpleFeedAccess.put(doc));
+ }
+
+ @Override
+ public void remove(DocumentId docId) {
+ executor.execute(() -> simpleFeedAccess.remove(docId));
+ }
+
+ @Override
+ public void update(DocumentUpdate update) {
+ executor.execute(() -> simpleFeedAccess.update(update));
+ }
+
+ @Override
+ public void put(Document doc, TestAndSetCondition condition) {
+ executor.execute(() -> simpleFeedAccess.put(doc, condition));
+ }
+
+ @Override
+ public void remove(DocumentId docId, TestAndSetCondition condition) {
+ executor.execute(() -> simpleFeedAccess.remove(docId, condition));
+ }
+
+ @Override
+ public void update(DocumentUpdate update, TestAndSetCondition condition) {
+ executor.execute(() -> simpleFeedAccess.update(update, condition));
+ }
+
+ @Override
+ public boolean isAborted() {
+ return simpleFeedAccess.isAborted();
+ }
+ }
+
+ public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
if (request.getProperty("status") != null) {
return new MetricResponse(context.getMetrics().getMetricSet());
}
@@ -85,8 +144,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
sender.addMessageProcessor(properties);
sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
-
- Feeder feeder = createFeeder(sender, request);
+ SimpleFeedAccess feedAccess = sender;
+ if (numThreads != 1) {
+ feedAccess = new ThreadedFeedAccess(numThreads, feedAccess);
+ }
+ Feeder feeder = createFeeder(feedAccess, request);
feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
response.setAbortOnFeedError(properties.getAbortOnFeedError());
@@ -116,7 +178,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
}
}
- private Feeder createFeeder(SingleSender sender, HttpRequest request) {
+ private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) {
String contentType = request.getHeader("Content-Type");
if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) {
return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));