diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler')
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 40 |
1 files changed, 26 insertions, 14 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 8661a4e4db1..c94cc10b098 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -30,6 +30,7 @@ import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -79,15 +80,28 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { static final class ThreadedFeedAccess implements SimpleFeedAccess { private final SimpleFeedAccess simpleFeedAccess; - private final ExecutorService executor; + private final ExecutorService executorService; + private final Executor executor; ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { this.simpleFeedAccess = simpleFeedAccess; if (numThreads <= 0) { numThreads = Runtime.getRuntime().availableProcessors(); } - executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), - ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy()); + if (numThreads > 1) { + executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()); + executor = executorService; + } else { + executorService = null; + executor = new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } } @Override public void put(Document doc) { @@ -123,8 +137,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { public boolean isAborted() { return simpleFeedAccess.isAborted(); } - void close() { - executor.shutdown(); + @Override + public void close() { + if (executorService != null) { + executorService.shutdown(); + } } } @@ -147,11 +164,8 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); sender.addMessageProcessor(properties); sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - SimpleFeedAccess feedAccess = sender; - if (numThreads != 1) { - feedAccess = new ThreadedFeedAccess(numThreads, feedAccess); - } - Feeder feeder = createFeeder(feedAccess, request); + ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); + Feeder feeder = createFeeder(sender, request); feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); response.setAbortOnFeedError(properties.getAbortOnFeedError()); @@ -165,9 +179,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { } sender.done(); - if (feedAccess instanceof ThreadedFeedAccess) { - ((ThreadedFeedAccess)feedAccess).close(); - } + feedAccess.close(); if (asynchronous) { return response; |