diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-24 10:08:17 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2018-10-24 10:08:17 +0200 |
commit | 2be52546297baac4cd2b2b3f180ef762e942dcd0 (patch) | |
tree | 036d19e269e53c493aef0edf7b3d4edd48fd8725 /vespaclient-core | |
parent | 0e3dea0ed702cd8e63a6856474e9dbb111b85d47 (diff) |
Move ThreadedFeedAccess out as independent class.
Diffstat (limited to 'vespaclient-core')
-rw-r--r-- | vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java | 82 | ||||
-rwxr-xr-x | vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java | 78 |
2 files changed, 82 insertions, 78 deletions
diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java new file mode 100644 index 00000000000..3ad3e0b7f42 --- /dev/null +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/ThreadedFeedAccess.java @@ -0,0 +1,82 @@ +package com.yahoo.feedhandler; + +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.feedapi.SimpleFeedAccess; + +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +final class ThreadedFeedAccess implements SimpleFeedAccess { + + private final SimpleFeedAccess simpleFeedAccess; + private final ExecutorService executorService; + private final Executor executor; + ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { + this.simpleFeedAccess = simpleFeedAccess; + if (numThreads <= 0) { + numThreads = Runtime.getRuntime().availableProcessors(); + } + if (numThreads > 1) { + executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()); + executor = executorService; + } else { + executorService = null; + executor = new Executor() { + @Override + public void execute(Runnable command) { + command.run(); + } + }; + } + } + @Override + public void put(Document doc) { + executor.execute(() -> simpleFeedAccess.put(doc)); + } + + @Override + public void remove(DocumentId docId) { + executor.execute(() -> simpleFeedAccess.remove(docId)); + } + + @Override + public void update(DocumentUpdate update) { + executor.execute(() -> simpleFeedAccess.update(update)); + } + + @Override + public void put(Document doc, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.put(doc, condition)); + } + + @Override + public void remove(DocumentId docId, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.remove(docId, condition)); + } + + @Override + public void update(DocumentUpdate update, TestAndSetCondition condition) { + executor.execute(() -> simpleFeedAccess.update(update, condition)); + } + + @Override + public boolean isAborted() { + return simpleFeedAccess.isAborted(); + } + @Override + public void close() { + if (executorService != null) { + executorService.shutdown(); + } + } +} diff --git a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java index c94cc10b098..32c2d848b82 100755 --- a/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java +++ b/vespaclient-core/src/main/java/com/yahoo/feedhandler/VespaFeedHandler.java @@ -5,15 +5,10 @@ import com.google.inject.Inject; import com.yahoo.clientmetrics.RouteMetricSet; import com.yahoo.cloud.config.ClusterListConfig; import com.yahoo.cloud.config.SlobroksConfig; -import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.container.jdisc.EmptyResponse; import com.yahoo.container.jdisc.HttpRequest; import com.yahoo.container.jdisc.HttpResponse; import com.yahoo.container.protect.Error; -import com.yahoo.document.Document; -import com.yahoo.document.DocumentId; -import com.yahoo.document.DocumentUpdate; -import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.config.DocumentmanagerConfig; import com.yahoo.feedapi.DocprocMessageProcessor; import com.yahoo.feedapi.FeedContext; @@ -29,11 +24,6 @@ import com.yahoo.vespaclient.config.FeederConfig; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -77,74 +67,6 @@ public final class VespaFeedHandler extends VespaFeedHandlerBase { return handle(request, null, 1); } - static final class ThreadedFeedAccess implements SimpleFeedAccess { - - private final SimpleFeedAccess simpleFeedAccess; - private final ExecutorService executorService; - private final Executor executor; - ThreadedFeedAccess(int numThreads, SimpleFeedAccess simpleFeedAccess) { - this.simpleFeedAccess = simpleFeedAccess; - if (numThreads <= 0) { - numThreads = Runtime.getRuntime().availableProcessors(); - } - if (numThreads > 1) { - executorService = new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), - ThreadFactoryFactory.getDaemonThreadFactory("feeder"), - new ThreadPoolExecutor.CallerRunsPolicy()); - executor = executorService; - } else { - executorService = null; - executor = new Executor() { - @Override - public void execute(Runnable command) { - command.run(); - } - }; - } - } - @Override - public void put(Document doc) { - executor.execute(() -> simpleFeedAccess.put(doc)); - } - - @Override - public void remove(DocumentId docId) { - executor.execute(() -> simpleFeedAccess.remove(docId)); - } - - @Override - public void update(DocumentUpdate update) { - executor.execute(() -> simpleFeedAccess.update(update)); - } - - @Override - public void put(Document doc, TestAndSetCondition condition) { - executor.execute(() -> simpleFeedAccess.put(doc, condition)); - } - - @Override - public void remove(DocumentId docId, TestAndSetCondition condition) { - executor.execute(() -> simpleFeedAccess.remove(docId, condition)); - } - - @Override - public void update(DocumentUpdate update, TestAndSetCondition condition) { - executor.execute(() -> simpleFeedAccess.update(update, condition)); - } - - @Override - public boolean isAborted() { - return simpleFeedAccess.isAborted(); - } - @Override - public void close() { - if (executorService != null) { - executorService.shutdown(); - } - } - } - public HttpResponse handle(HttpRequest request, RouteMetricSet.ProgressCallback callback, int numThreads) { if (request.getProperty("status") != null) { return new MetricResponse(context.getMetrics().getMetricSet()); |