From 4b97ebddad6965f9dfa299b8932a4396decfc721 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 23 Oct 2018 23:43:48 +0200 Subject: Add support for using more than 1 sender thread. --- .../com/yahoo/feedapi/DocprocMessageProcessor.java | 8 +-- .../src/main/java/com/yahoo/feedapi/Feeder.java | 14 ++--- .../java/com/yahoo/feedapi/VespaFeedSender.java | 4 -- .../com/yahoo/feedhandler/VespaFeedHandler.java | 72 ++++++++++++++++++++-- .../yahoo/feedhandler/VespaFeedHandlerBase.java | 20 +++--- .../main/java/com/yahoo/vespafeeder/Arguments.java | 12 +++- .../java/com/yahoo/vespafeeder/VespaFeeder.java | 14 +++-- .../com/yahoo/vespafeeder/VespaFeederTestCase.java | 8 +++ 8 files changed, 115 insertions(+), 37 deletions(-) diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java index 195604b067f..c6974cff5c1 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/DocprocMessageProcessor.java @@ -6,18 +6,16 @@ import com.yahoo.docproc.CallStack; import com.yahoo.docproc.DocprocService; import com.yahoo.docproc.DocumentProcessor; import com.yahoo.docproc.Processing; -import com.yahoo.document.*; +import com.yahoo.document.DocumentOperation; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.routing.Route; -import com.yahoo.vdslib.Entry; import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; public class DocprocMessageProcessor implements MessageProcessor { private final DocprocService docproc; @@ -31,7 +29,7 @@ public class DocprocMessageProcessor implements MessageProcessor { @Override public void process(Message m) { try { - List documentBases = new ArrayList(); + List documentBases = new ArrayList<>(1); if (m.getType() == DocumentProtocol.MESSAGE_PUTDOCUMENT) { documentBases.add(((PutDocumentMessage) m).getDocumentPut()); @@ -49,7 +47,7 @@ public class DocprocMessageProcessor implements MessageProcessor { } } - public void processDocumentOperations(List documentOperations, Message m) throws Exception { + private void processDocumentOperations(List documentOperations, Message m) throws Exception { Processing processing = Processing.createProcessingFromDocumentOperations(docproc.getName(), documentOperations, new CallStack(docproc.getCallStack())); processing.setServiceName(docproc.getName()); processing.setDocprocServiceRegistry(docprocServiceRegistry); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java index 523ea0605a4..4ad5c86b663 100644 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/Feeder.java @@ -24,11 +24,11 @@ public abstract class Feeder { protected final InputStream stream; protected final DocumentTypeManager docMan; - protected List errors = new LinkedList(); - protected boolean doAbort = true; - protected boolean createIfNonExistent = false; - protected final VespaFeedSender sender; - private final int MAX_ERRORS = 10; + protected List errors = new LinkedList<>(); + private boolean doAbort = true; + private boolean createIfNonExistent = false; + private final VespaFeedSender sender; + private static final int MAX_ERRORS = 10; protected Feeder(DocumentTypeManager docMan, VespaFeedSender sender, InputStream stream) { this.docMan = docMan; @@ -44,7 +44,7 @@ public abstract class Feeder { this.createIfNonExistent = value; } - public void addException(Exception e) { + private void addException(Exception e) { String message; if (e.getMessage() != null) { message = e.getMessage().replaceAll("\"", "'"); @@ -69,7 +69,7 @@ public abstract class Feeder { protected abstract FeedReader createReader() throws Exception; public List parse() { - FeedReader reader = null; + FeedReader reader; try { reader = createReader(); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java index d7329264bc0..b441e81a829 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/VespaFeedSender.java @@ -3,10 +3,6 @@ package com.yahoo.feedapi; import com.yahoo.vespaxmlparser.VespaXMLFeedReader; -import java.util.Date; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; - /** * Wrapper class for SimpleFeedAccess to send various XML operations. */ diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 8180bfd84ea..ada4cfb1725 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -5,16 +5,22 @@ import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.SlobroksConfig; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; import com.yahoo.feedapi.Feeder; import com.yahoo.feedapi.JsonFeeder; import com.yahoo.feedapi.MessagePropertyProcessor; +import com.yahoo.feedapi.SimpleFeedAccess; import com.yahoo.feedapi.SingleSender; import com.yahoo.feedapi.XMLFeeder; import com.yahoo.jdisc.Metric; @@ -23,6 +29,10 @@ import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -63,10 +73,59 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { @Override public HttpResponse handle(HttpRequest request) { - return handle(request, (RouteMetricSet.ProgressCallback)null); + return handle(request, null, 1); } - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback) { + static final class ThreadedFeedAccess implements SimpleFeedAccess { + + private final SimpleFeedAccess simpleFeedAccess; + private final ExecutorService executor; + ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { + this.simpleFeedAccess = simpleFeedAccess; + if (numThreads <= 0) { + numThreads = Runtime.getRuntime().availableProcessors(); + } + executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy()); + } + @Override + public void put(Document doc) { + executor.execute(() -> simpleFeedAccess.put(doc)); + } + + @Override + public void remove(DocumentId docId) { + executor.execute(() -> simpleFeedAccess.remove(docId)); + } + + @Override + public void update(DocumentUpdate update) { + executor.execute(() -> simpleFeedAccess.update(update)); + } + + @Override + public void put(Document doc, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.put(doc, condition)); + } + + @Override + public void remove(DocumentId docId, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.remove(docId, condition)); + } + + @Override + public void update(DocumentUpdate update, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.update(update, condition)); + } + + @Override + public boolean isAborted() { + return simpleFeedAccess.isAborted(); + } + } + + public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { if (request.getProperty("status") != null) { return new MetricResponse(context.getMetrics().getMetricSet()); } @@ -85,8 +144,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); sender.addMessageProcessor(properties); sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - - Feeder feeder = createFeeder(sender, request); + SimpleFeedAccess feedAccess = sender; + if (numThreads != 1) { + feedAccess = new ThreadedFeedAccess(numThreads, feedAccess); + } + Feeder feeder = createFeeder(feedAccess, request); feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); response.setAbortOnFeedError(properties.getAbortOnFeedError()); @@ -116,7 +178,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { } } - private Feeder createFeeder(SingleSender sender, HttpRequest request) { + private Feeder createFeeder(SimpleFeedAccess sender, HttpRequest request) { String contentType = request.getHeader("Content-Type"); if (Boolean.valueOf(request.getProperty(JSON_INPUT)) || (contentType != null && contentType.startsWith("application/json"))) { return new JsonFeeder(getDocumentTypeManager(), sender, getRequestInputStream(request)); diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java index 05d0c2b81bd..7dc852bfd6d 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandlerBase.java @@ -36,35 +36,35 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { SlobroksConfig slobroksConfig, ClusterListConfig clusterListConfig, Executor executor, - Metric metric) throws Exception { + Metric metric) { this(FeedContext.getInstance(feederConfig, loadTypeConfig, documentmanagerConfig, slobroksConfig, clusterListConfig, metric), executor, (long)feederConfig.timeout() * 1000); } - public VespaFeedHandlerBase(FeedContext context, Executor executor) throws Exception { + VespaFeedHandlerBase(FeedContext context, Executor executor) { this(context, executor, context.getPropertyProcessor().getDefaultTimeoutMillis()); } - public VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) throws Exception { + private VespaFeedHandlerBase(FeedContext context, Executor executor, long defaultTimeoutMillis) { super(executor, context.getMetricAPI()); this.context = context; this.defaultTimeoutMillis = defaultTimeoutMillis; } - public SharedSender getSharedSender(String route) { + SharedSender getSharedSender(String route) { return context.getSharedSender(route); } - public DocprocService getDocprocChain(HttpRequest request) { + DocprocService getDocprocChain(HttpRequest request) { return context.getPropertyProcessor().getDocprocChain(request); } - public ComponentRegistry getDocprocServiceRegistry(HttpRequest request) { + ComponentRegistry getDocprocServiceRegistry(HttpRequest request) { return context.getPropertyProcessor().getDocprocServiceRegistry(request); } - public MessagePropertyProcessor getPropertyProcessor() { + MessagePropertyProcessor getPropertyProcessor() { return context.getPropertyProcessor(); } @@ -74,7 +74,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { * original data stream. * @throws IllegalArgumentException if GZIP stream creation failed */ - public InputStream getRequestInputStream(HttpRequest request) { + InputStream getRequestInputStream(HttpRequest request) { if ("gzip".equals(request.getHeader("Content-Encoding"))) { try { return new GZIPInputStream(request.getData()); @@ -86,7 +86,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { } } - protected DocumentTypeManager getDocumentTypeManager() { + DocumentTypeManager getDocumentTypeManager() { return context.getDocumentTypeManager(); } @@ -94,7 +94,7 @@ public abstract class VespaFeedHandlerBase extends ThreadedHttpRequestHandler { return context.getMetrics(); } - protected long getTimeoutMillis(HttpRequest request) { + long getTimeoutMillis(HttpRequest request) { return ParameterParser.asMilliSeconds(request.getProperty("timeout"), defaultTimeoutMillis); } diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java index 86512dfab73..0d23af1fec5 100644 --- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/Arguments.java @@ -39,14 +39,15 @@ public class Arguments { } private FeederConfig.Builder feederConfigBuilder = new FeederConfig.Builder(); - private List files = new ArrayList(); + private List files = new ArrayList<>(); private String dumpDocumentsFile = null; private String mode = "standard"; private boolean validateOnly = false; private boolean verbose = false; - SessionFactory sessionFactory = null; + SessionFactory sessionFactory; MessagePropertyProcessor propertyProcessor = null; private String priority = null; + private int numThreads = 1; public MessagePropertyProcessor getPropertyProcessor() { return propertyProcessor; @@ -83,6 +84,7 @@ public class Arguments { " feeding them.\n" + " --dumpDocuments Specify a file where documents in the put are serialized.\n" + " --priority arg Specify priority of sent messages (see documentation for priority values)\n" + + " --numthreads arg Specify how many threads to use for sending. Default is 1.\n" + " --create-if-non-existent Enable setting of create-if-non-existent to true on all document updates in the given xml feed.\n" + " -v [ --verbose ] Enable verbose output of progress.\n"); } @@ -152,6 +154,8 @@ public class Arguments { verbose = true; } else if ("--priority".equals(arg)) { priority = getParam(args, arg); + } else if ("--numthreads".equals(arg)) { + numThreads = Integer.parseInt(getParam(args, arg)); } else { files.add(arg); } @@ -183,6 +187,10 @@ public class Arguments { return priority; } + public int getNumThreads() { + return numThreads; + } + public SessionFactory getSessionFactory() { return sessionFactory; } diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java index 0a926f6aae2..f80567709c4 100755 --- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java @@ -13,8 +13,14 @@ import com.yahoo.log.LogSetup; import com.yahoo.concurrent.SystemTimer; import com.yahoo.vespaclient.ClusterList; -import java.io.*; -import java.util.*; +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -82,7 +88,7 @@ public class VespaFeeder { if (args.getFiles().isEmpty()) { InputStreamRequest req = new InputStreamRequest(input); setProperties(req, input); - FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output)); + FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output), args.getNumThreads()); if ( ! response.isSuccess()) { throw renderErrors(response.getErrorList()); } @@ -100,7 +106,7 @@ public class VespaFeeder { final BufferedInputStream inputSnooper = new BufferedInputStream(new FileInputStream(fileName)); setProperties(req, inputSnooper); inputSnooper.close(); - FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output)); + FeedResponse response = (FeedResponse)handler.handle(req.toRequest(), createProgressCallback(output), args.getNumThreads()); if (!response.isSuccess()) { throw renderErrors(response.getErrorList()); } diff --git a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java index d1b7397de34..4de286398e9 100644 --- a/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java +++ b/vespaclient-java/src/test/java/com/yahoo/vespafeeder/VespaFeederTestCase.java @@ -67,6 +67,14 @@ public class VespaFeederTestCase { assertTrue(arguments.getFeederConfig().createifnonexistent()); } + @Test + public void requireThatnumThreadsBeParsed() throws Exception { + String argsS="--numthreads 5"; + Arguments arguments = new Arguments(argsS.split(" "), DummySessionFactory.createWithAutoReply()); + assertEquals(5, arguments.getNumThreads()); + assertEquals(1, new Arguments("".split(" "), DummySessionFactory.createWithAutoReply()).getNumThreads()); + } + @Test public void testHelp() throws Exception { String argsS="-h"; -- cgit v1.2.3