summaryrefslogtreecommitdiffstats
path: root/vespaclient-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-10-24 08:28:23 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2018-10-24 08:28:23 +0200
commitc478da73cc5a0290a43f55229bc28d0ecbc4fcf4 (patch)
tree097794c117ee346b440b15127f717cea7812bd8b /vespaclient-core
parent9f88cfb2ac36ea523919aa115d7038cee12c19ad (diff)
Hide the details in the ThreadedFeedAccess to avoid ifs on the outside.
Diffstat (limited to 'vespaclient-core')
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java2
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java6
-rwxr-xr-xvespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java40
3 files changed, 29 insertions, 19 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java
index 79690d14486..98609650432 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SimpleFeedAccess.java
@@ -15,5 +15,5 @@ public interface SimpleFeedAccess {
void remove(DocumentId docId, TestAndSetCondition condition);
void update(DocumentUpdate update, TestAndSetCondition condition);
boolean isAborted();
-
+ void close();
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
index 49f252b10f4..e0e12b26ae6 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedapi/SingleSender.java
@@ -97,12 +97,10 @@ public class SingleSender implements SimpleFeedAccess {
// empty
}
- public void waitForPending() {
- waitForPending(-1);
- }
-
public boolean waitForPending(long timeoutMs) {
return sender.waitForPending(owner, timeoutMs);
}
+ @Override
+ public void close() { }
}
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
index 8661a4e4db1..c94cc10b098 100755
--- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
+++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java
@@ -30,6 +30,7 @@ import com.yahoo.vespaclient.config.FeederConfig;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -79,15 +80,28 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
static final class ThreadedFeedAccess implements SimpleFeedAccess {
private final SimpleFeedAccess simpleFeedAccess;
- private final ExecutorService executor;
+ private final ExecutorService executorService;
+ private final Executor executor;
ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) {
this.simpleFeedAccess = simpleFeedAccess;
if (numThreads <= 0) {
numThreads = Runtime.getRuntime().availableProcessors();
}
- executor = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
- new SynchronousQueue<>(false),
- ThreadFactoryFactory.getDaemonThreadFactory("feeder"), new ThreadPoolExecutor.CallerRunsPolicy());
+ if (numThreads > 1) {
+ executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(false),
+ ThreadFactoryFactory.getDaemonThreadFactory("feeder"),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+ executor = executorService;
+ } else {
+ executorService = null;
+ executor = new Executor() {
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+ };
+ }
}
@Override
public void put(Document doc) {
@@ -123,8 +137,11 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
public boolean isAborted() {
return simpleFeedAccess.isAborted();
}
- void close() {
- executor.shutdown();
+ @Override
+ public void close() {
+ if (executorService != null) {
+ executorService.shutdown();
+ }
}
}
@@ -147,11 +164,8 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
SingleSender sender = new SingleSender(response, getSharedSender(route), !asynchronous);
sender.addMessageProcessor(properties);
sender.addMessageProcessor(new DocprocMessageProcessor(getDocprocChain(request), getDocprocServiceRegistry(request)));
- SimpleFeedAccess feedAccess = sender;
- if (numThreads != 1) {
- feedAccess = new ThreadedFeedAccess(numThreads, feedAccess);
- }
- Feeder feeder = createFeeder(feedAccess, request);
+ ThreadedFeedAccess feedAccess = new ThreadedFeedAccess(numThreads, sender);
+ Feeder feeder = createFeeder(sender, request);
feeder.setAbortOnDocumentError(properties.getAbortOnDocumentError());
feeder.setCreateIfNonExistent(properties.getCreateIfNonExistent());
response.setAbortOnFeedError(properties.getAbortOnFeedError());
@@ -165,9 +179,7 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase {
}
sender.done();
- if (feedAccess instanceof ThreadedFeedAccess) {
- ((ThreadedFeedAccess)feedAccess).close();
- }
+ feedAccess.close();
if (asynchronous) {
return response;