diff options
Diffstat (limited to 'vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java')
-rw-r--r-- | vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java new file mode 100644 index 00000000000..3ad3e0b7f42 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java @@ -0,0 +1,82 @@ +package com.yahoo.feedhandler; + +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.feedapi.SimpleFeedAccess; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +final class ThreadedFeedAccess implements SimpleFeedAccess { + + private final SimpleFeedAccess simpleFeedAccess; + private final ExecutorService executorService; + private final Executor executor; + ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { + this.simpleFeedAccess = simpleFeedAccess; + if (numThreads <= 0) { + numThreads = Runtime.getRuntime().availableProcessors(); + } + if (numThreads > 1) { + executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()); + executor = executorService; + } else { + executorService = null; + executor = new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } + } + @Override + public void put(Document doc) { + executor.execute(() -> simpleFeedAccess.put(doc)); + } + + @Override + public void remove(DocumentId docId) { + executor.execute(() -> simpleFeedAccess.remove(docId)); + } + + @Override + public void update(DocumentUpdate update) { + executor.execute(() -> simpleFeedAccess.update(update)); + } + + @Override + public void put(Document doc, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.put(doc, condition)); + } + + @Override + public void remove(DocumentId docId, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.remove(docId, condition)); + } + + @Override + public void update(DocumentUpdate update, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.update(update, condition)); + } + + @Override + public boolean isAborted() { + return simpleFeedAccess.isAborted(); + } + @Override + public void close() { + if (executorService != null) { + executorService.shutdown(); + } + } +} |