diff options
Diffstat (limited to 'vespaclient-core')
5 files changed, 87 insertions, 31 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java index 195604b067f..c6974cff5c1 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java @@ -6,18 +6,16 @@ import com.yahoo.docproc.CallStack; import com.yahoo.docproc.DocprocService; import com.yahoo.docproc.DocumentProcessor; import com.yahoo.docproc.Processing; -import com.yahoo.document.*; +import com.yahoo.document.DocumentOperation; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.routing.Route; -import com.yahoo.vdslib.Entry; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class DocprocMessageProcessor implements MessageProcessor { private final DocprocService docproc; @@ -31,7 +29,7 @@ public class DocprocMessageProcessor implements MessageProcessor { @Override public void process(Message m) { try { - List<DocumentOperation> documentBases = new ArrayList<DocumentOperation>(); + List<DocumentOperation> documentBases = new ArrayList<>(1); if (m.getType() == DocumentProtocol.MESSAGE_PUTDOCUMENT) { documentBases.add(((PutDocumentMessage) m).getDocumentPut()); @@ -49,7 +47,7 @@ public class DocprocMessageProcessor implements MessageProcessor { } } - public void processDocumentOperations(List<DocumentOperation> documentOperations, Message m) throws Exception { + private void processDocumentOperations(List<DocumentOperation> documentOperations, Message m) throws Exception { Processing processing = Processing.createProcessingFromDocumentOperations(docproc.getName(), documentOperations, new CallStack(docproc.getCallStack())); processing.setServiceName(docproc.getName()); processing.setDocprocServiceRegistry(docprocServiceRegistry); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java index 523ea0605a4..4ad5c86b663 100644 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java @@ -24,11 +24,11 @@ public abstract class Feeder { protected final InputStream stream; protected final DocumentTypeManager docMan; - protected List<String> errors = new LinkedList<String>(); - protected boolean doAbort = true; - protected boolean createIfNonExistent = false; - protected final VespaFeedSender sender; - private final int MAX_ERRORS = 10; + protected List<String> errors = new LinkedList<>(); + private boolean doAbort = true; + private boolean createIfNonExistent = false; + private final VespaFeedSender sender; + private static final int MAX_ERRORS = 10; protected Feeder(DocumentTypeManager docMan, VespaFeedSender sender, InputStream stream) { this.docMan = docMan; @@ -44,7 +44,7 @@ public abstract class Feeder { this.createIfNonExistent = value; } - public void addException(Exception e) { + private void addException(Exception e) { String message; if (e.getMessage() != null) { message = e.getMessage().replaceAll("\"", "'"); @@ -69,7 +69,7 @@ public abstract class Feeder { protected abstract FeedReader createReader() throws Exception; public List<String> parse() { - FeedReader reader = null; + FeedReader reader; try { reader = createReader(); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java index d7329264bc0..b441e81a829 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java @@ -3,10 +3,6 @@ package com.yahoo.feedapi; import com.yahoo.vespaxmlparser.VespaXMLFeedReader; -import java.util.Date; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; - /** * Wrapper class for SimpleFeedAccess to send various XML operations. */ diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 8180bfd84ea..ada4cfb1725 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -5,16 +5,22 @@ import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.SlobroksConfig; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; import com.yahoo.feedapi.MessagePropertyProcessor; +import com.yahoo.feedapi.SimpleFeedAccess; import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; import com.yahoo.jdisc.Metric; @@ -23,6 +29,10 @@ import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -63,10 +73,59 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { @Override public HttpResponse handle(HttpRequest request) { - return handle(request, (RouteMetricSet.ProgressCallback)null); + return handle(request, null, 1); } - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) { + static final class ThreadedFeedAccess implements SimpleFeedAccess { + + private final SimpleFeedAccess simpleFeedAccess; + private final ExecutorService executor; + ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { + this.simpleFeedAccess = simpleFeedAccess; + if (numThreads <= 0) { + numThreads = Runtime.getRuntime().availableProcessors(); + } + executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Override + public void put(Document doc) { + executor.execute(() -> simpleFeedAccess.put(doc)); + } + + @Override + public void remove(DocumentId docId) { + executor.execute(() -> simpleFeedAccess.remove(docId)); + } + + @Override + public void update(DocumentUpdate update) { + executor.execute(() -> simpleFeedAccess.update(update)); + } + + @Override + public void put(Document doc, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.put(doc, condition)); + } + + @Override + public void remove(DocumentId docId, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.remove(docId, condition)); + } + + @Override + public void update(DocumentUpdate update, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.update(update, condition)); + } + + @Override + public boolean isAborted() { + return simpleFeedAccess.isAborted(); + } + } + + public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { if (request.getProperty("status") != null) { return new MetricResponse(context.getMetrics().getMetricSet()); } @@ -85,8 +144,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); sender.addMessageProcessor(properties); sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - - Feeder feeder = createFeeder(sender, request); + SimpleFeedAccess feedAccess = sender; + if (numThreads != 1) { + feedAccess = new ThreadedFeedAccess(numThreads, feedAccess); + } + Feeder feeder = createFeeder(feedAccess, request); feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); response.setAbortOnFeedError(properties.getAbortOnFeedError()); @@ -116,7 +178,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { } } - private Feeder createFeeder(SingleSender sender, HttpRequest request) { + private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { String contentType = request.getHeader("Content-Type"); if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java index 05d0c2b81bd..7dc852bfd6d 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java @@ -36,35 +36,35 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { SlobroksConfig slobroksConfig, ClusterListConfig clusterListConfig, Executor executor, - Metric metric) throws Exception { + Metric metric) { this(FeedContext.getInstance(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, clusterListConfig, metric), executor, (long)feederConfig.timeout() * 1000); } - public VespaFeedHandlerBase(FeedContext context, Executor executor) throws Exception { + VespaFeedHandlerBase(FeedContext context, Executor executor) { this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis()); } - public VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) throws Exception { + private VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) { super(executor, context.getMetricAPI()); this.context = context; this.defaultTimeoutMillis = defaultTimeoutMillis; } - public SharedSender getSharedSender(String route) { + SharedSender getSharedSender(String route) { return context.getSharedSender(route); } - public DocprocService getDocprocChain(HttpRequest request) { + DocprocService getDocprocChain(HttpRequest request) { return context.getPropertyProcessor().getDocprocChain(request); } - public ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) { + ComponentRegistry<DocprocService> getDocprocServiceRegistry(HttpRequest request) { return context.getPropertyProcessor().getDocprocServiceRegistry(request); } - public MessagePropertyProcessor getPropertyProcessor() { + MessagePropertyProcessor getPropertyProcessor() { return context.getPropertyProcessor(); } @@ -74,7 +74,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { * original data stream. * @throws IllegalArgumentException if GZIP stream creation failed */ - public InputStream getRequestInputStream(HttpRequest request) { + InputStream getRequestInputStream(HttpRequest request) { if ("gzip".equals(request.getHeader("Content-Encoding"))) { try { return new GZIPInputStream(request.getData()); @@ -86,7 +86,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { } } - protected DocumentTypeManager getDocumentTypeManager() { + DocumentTypeManager getDocumentTypeManager() { return context.getDocumentTypeManager(); } @@ -94,7 +94,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { return context.getMetrics(); } - protected long getTimeoutMillis(HttpRequest request) { + long getTimeoutMillis(HttpRequest request) { return ParameterParser.asMilliSeconds(request.getProperty("timeout"), defaultTimeoutMillis); } |