summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-10-23 23:43:48 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-10-23 23:43:48 +0200
commit4b97ebddad6965f9dfa299b8932a4396decfc721 (patch)
tree08b17e71aacf2c12ceacf90bcb2e5984bd4c8365 /vespaclient-core
parent4522ba150e4033fcfad6b8231c3a9557339c1502 (diff)
Add support for using more than 1 sender thread.
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java8
-rw-r--r--vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java14
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java4
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java72
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java20
5 files changed, 87 insertions, 31 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java
index 195604b067f..c6974cff5c1 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java
@@ -6,18 +6,16 @@ import com.yahoo.docproc.CallStack;
import com.yahoo.docproc.DocprocService;
import com.yahoo.docproc.DocumentProcessor;
import com.yahoo.docproc.Processing;
-import com.yahoo.document.*;
+import com.yahoo.document.DocumentOperation;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.routing.Route;
-import com.yahoo.vdslib.Entry;
import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
public class DocprocMessageProcessor implements MessageProcessor {
private final DocprocService docproc;
@@ -31,7 +29,7 @@ public class DocprocMessageProcessor implements MessageProcessor {
@Override
public void process(Message m) {
try {
- List<DocumentOperation> documentBases = new ArrayList<DocumentOperation>();
+ List<DocumentOperation> documentBases = new ArrayList<>(1);
if (m.getType() == DocumentProtocol.MESSAGE_PUTDOCUMENT) {
documentBases.add(((PutDocumentMessage) m).getDocumentPut());
@@ -49,7 +47,7 @@ public class DocprocMessageProcessor implements MessageProcessor {
}
}
- public void processDocumentOperations(List<DocumentOperation> documentOperations, Message m) throws Exception {
+ private void processDocumentOperations(List<DocumentOperation> documentOperations, Message m) throws Exception {
Processing processing = Processing.createProcessingFromDocumentOperations(docproc.getName(), documentOperations, new CallStack(docproc.getCallStack()));
processing.setServiceName(docproc.getName());
processing.setDocprocServiceRegistry(docprocServiceRegistry);
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java
index 523ea0605a4..4ad5c86b663 100644
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java
@@ -24,11 +24,11 @@ public abstract class Feeder {
protected final InputStream stream;
protected final DocumentTypeManager docMan;
- protected List<String> errors = new LinkedList<String>();
- protected boolean doAbort = true;
- protected boolean createIfNonExistent = false;
- protected final VespaFeedSender sender;
- private final int MAX_ERRORS = 10;
+ protected List<String> errors = new LinkedList<>();
+ private boolean doAbort = true;
+ private boolean createIfNonExistent = false;
+ private final VespaFeedSender sender;
+ private static final int MAX_ERRORS = 10;
protected Feeder(DocumentTypeManager docMan, VespaFeedSender sender, InputStream stream) {
this.docMan = docMan;
@@ -44,7 +44,7 @@ public abstract class Feeder {
this.createIfNonExistent = value;
}
- public void addException(Exception e) {
+ private void addException(Exception e) {
String message;
if (e.getMessage() != null) {
message = e.getMessage().replaceAll("\"", "'");
@@ -69,7 +69,7 @@ public abstract class Feeder {
protected abstract FeedReader createReader() throws Exception;
public List<String> parse() {
- FeedReader reader = null;
+ FeedReader reader;
try {
reader = createReader();
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java
index d7329264bc0..b441e81a829 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java
@@ -3,10 +3,6 @@ package com.yahoo.feedapi;
import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
-import java.util.Date;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.logging.Logger;
-
/**
* Wrapper class for SimpleFeedAccess to send various XML operations.
*/
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 8180bfd84ea..ada4cfb1725 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -5,16 +5,22 @@ import com.google.inject.Inject;
import com.yahoo.clientmetrics.RouteMetricSet;
import com.yahoo.cloud.config.ClusterListConfig;
import com.yahoo.cloud.config.SlobroksConfig;
+import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.container.jdisc.EmptyResponse;
import com.yahoo.container.jdisc.HttpRequest;
import com.yahoo.container.jdisc.HttpResponse;
import com.yahoo.container.protect.Error;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.config.DocumentmanagerConfig;
import com.yahoo.feedapi.DocprocMessageProcessor;
import com.yahoo.feedapi.FeedContext;
import com.yahoo.feedapi.Feeder;
import com.yahoo.feedapi.JsonFeeder;
import com.yahoo.feedapi.MessagePropertyProcessor;
+import com.yahoo.feedapi.SimpleFeedAccess;
import com.yahoo.feedapi.SingleSender;
import com.yahoo.feedapi.XMLFeeder;
import com.yahoo.jdisc.Metric;
@@ -23,6 +29,10 @@ import com.yahoo.vespaclient.config.FeederConfig;
import java.util.List;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -63,10 +73,59 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
@Override
public HttpResponse handle(HttpRequest request) {
- return handle(request, (RouteMetricSet.ProgressCallback)null);
+ return handle(request, null, 1);
}
- public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) {
+ static final class ThreadedFeedAccess implements SimpleFeedAccess {
+
+ private final SimpleFeedAccess simpleFeedAccess;
+ private final ExecutorService executor;
+ ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) {
+ this.simpleFeedAccess = simpleFeedAccess;
+ if (numThreads <= 0) {
+ numThreads = Runtime.getRuntime().availableProcessors();
+ }
+ executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(false),
+ ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy());
+ }
+ @Override
+ public void put(Document doc) {
+ executor.execute(() -> simpleFeedAccess.put(doc));
+ }
+
+ @Override
+ public void remove(DocumentId docId) {
+ executor.execute(() -> simpleFeedAccess.remove(docId));
+ }
+
+ @Override
+ public void update(DocumentUpdate update) {
+ executor.execute(() -> simpleFeedAccess.update(update));
+ }
+
+ @Override
+ public void put(Document doc, TestAndSetCondition condition) {
+ executor.execute(() -> simpleFeedAccess.put(doc, condition));
+ }
+
+ @Override
+ public void remove(DocumentId docId, TestAndSetCondition condition) {
+ executor.execute(() -> simpleFeedAccess.remove(docId, condition));
+ }
+
+ @Override
+ public void update(DocumentUpdate update, TestAndSetCondition condition) {
+ executor.execute(() -> simpleFeedAccess.update(update, condition));
+ }
+
+ @Override
+ public boolean isAborted() {
+ return simpleFeedAccess.isAborted();
+ }
+ }
+
+ public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) {
if (request.getProperty("status") != null) {
return new MetricResponse(context.getMetrics().getMetricSet());
}
@@ -85,8 +144,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
sender.addMessageProcessor(properties);
sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
-
- Feeder feeder = createFeeder(sender, request);
+ SimpleFeedAccess feedAccess = sender;
+ if (numThreads != 1) {
+ feedAccess = new ThreadedFeedAccess(numThreads, feedAccess);
+ }
+ Feeder feeder = createFeeder(feedAccess, request);
feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
response.setAbortOnFeedError(properties.getAbortOnFeedError());
@@ -116,7 +178,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
}
}
- private Feeder createFeeder(SingleSender sender, HttpRequest request) {
+ private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) {
String contentType = request.getHeader("Content-Type");
if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) {
return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request));
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
index 05d0c2b81bd..7dc852bfd6d 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java
@@ -36,35 +36,35 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
SlobroksConfig slobroksConfig,
ClusterListConfig clusterListConfig,
Executor executor,
- Metric metric) throws Exception {
+ Metric metric) {
this(FeedContext.getInstance(feederConfig, loadTypeConfig, documentmanagerConfig,
slobroksConfig, clusterListConfig, metric),
executor, (long)feederConfig.timeout() * 1000);
}
- public VespaFeedHandlerBase(FeedContext context, Executor executor) throws Exception {
+ VespaFeedHandlerBase(FeedContext context, Executor executor) {
this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis());
}
- public VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) throws Exception {
+ private VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) {
super(executor, context.getMetricAPI());
this.context = context;
this.defaultTimeoutMillis = defaultTimeoutMillis;
}
- public SharedSender getSharedSender(String route) {
+ SharedSender getSharedSender(String route) {
return context.getSharedSender(route);
}
- public DocprocService getDocprocChain(HttpRequest request) {
+ DocprocService getDocprocChain(HttpRequest request) {
return context.getPropertyProcessor().getDocprocChain(request);
}
- public ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) {
+ ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) {
return context.getPropertyProcessor().getDocprocServiceRegistry(request);
}
- public MessagePropertyProcessor getPropertyProcessor() {
+ MessagePropertyProcessor getPropertyProcessor() {
return context.getPropertyProcessor();
}
@@ -74,7 +74,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
* original data stream.
* @throws IllegalArgumentException if GZIP stream creation failed
*/
- public InputStream getRequestInputStream(HttpRequest request) {
+ InputStream getRequestInputStream(HttpRequest request) {
if ("gzip".equals(request.getHeader("Content-Encoding"))) {
try {
return new GZIPInputStream(request.getData());
@@ -86,7 +86,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
}
}
- protected DocumentTypeManager getDocumentTypeManager() {
+ DocumentTypeManager getDocumentTypeManager() {
return context.getDocumentTypeManager();
}
@@ -94,7 +94,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler {
return context.getMetrics();
}
- protected long getTimeoutMillis(HttpRequest request) {
+ long getTimeoutMillis(HttpRequest request) {
return ParameterParser.asMilliSeconds(request.getProperty("timeout"), defaultTimeoutMillis);
}