From c478da73cc5a0290a43f55229bc28d0ecbc4fcf4 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 24 Oct 2018 08:28:23 +0200 Subject: Hide the details in the ThreadedFeedAccess to avoid ifs on the outside. --- .../java/com/yahoo/feedapi/SimpleFeedAccess.java | 2 +- .../main/java/com/yahoo/feedapi/SingleSender.java | 6 ++-- .../com/yahoo/feedhandler/VespaFeedHandler.java | 40 ++++++++++++++-------- 3 files changed, 29 insertions(+), 19 deletions(-) (limited to 'vespaclient-core/src/main') diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java index 79690d14486..98609650432 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java @@ -15,5 +15,5 @@ public interface SimpleFeedAccess { void remove(DocumentId docId, TestAndSetCondition condition); void update(DocumentUpdate update, TestAndSetCondition condition); boolean isAborted(); - + void close(); } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java index 49f252b10f4..e0e12b26ae6 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java @@ -97,12 +97,10 @@ public class SingleSender implements SimpleFeedAccess { // empty } - public void waitForPending() { - waitForPending(-1); - } - public boolean waitForPending(long timeoutMs) { return sender.waitForPending(owner, timeoutMs); } + @Override + public void close() { } } diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index 8661a4e4db1..c94cc10b098 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -30,6 +30,7 @@ import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -79,15 +80,28 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { static final class ThreadedFeedAccess implements SimpleFeedAccess { private final SimpleFeedAccess simpleFeedAccess; - private final ExecutorService executor; + private final ExecutorService executorService; + private final Executor executor; ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { this.simpleFeedAccess = simpleFeedAccess; if (numThreads <= 0) { numThreads = Runtime.getRuntime().availableProcessors(); } - executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), - ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy()); + if (numThreads > 1) { + executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()); + executor = executorService; + } else { + executorService = null; + executor = new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } } @Override public void put(Document doc) { @@ -123,8 +137,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { public boolean isAborted() { return simpleFeedAccess.isAborted(); } - void close() { - executor.shutdown(); + @Override + public void close() { + if (executorService != null) { + executorService.shutdown(); + } } } @@ -147,11 +164,8 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous); sender.addMessageProcessor(properties); sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request))); - SimpleFeedAccess feedAccess = sender; - if (numThreads != 1) { - feedAccess = new ThreadedFeedAccess(numThreads, feedAccess); - } - Feeder feeder = createFeeder(feedAccess, request); + ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender); + Feeder feeder = createFeeder(sender, request); feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError()); feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent()); response.setAbortOnFeedError(properties.getAbortOnFeedError()); @@ -165,9 +179,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { } sender.done(); - if (feedAccess instanceof ThreadedFeedAccess) { - ((ThreadedFeedAccess)feedAccess).close(); - } + feedAccess.close(); if (asynchronous) { return response; -- cgit v1.2.3