diff options
author | Henning Baldersheim <balder@verizonmedia.com> | 2020-04-16 00:26:23 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@verizonmedia.com> | 2020-04-16 00:26:23 +0000 |
commit | f71fe343a717f4ea934af2dfc783dc665e054cfa (patch) | |
tree | 1b67c641f5859eccfd878960cd5b581590839458 /vespa_feed_perf/src | |
parent | bdb570a9e21410108bbb56f183bad1603c45c1fc (diff) |
Use a smoother blocking Q and block when full.
Diffstat (limited to 'vespa_feed_perf/src')
-rw-r--r-- | vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 14123c7a73c..aac7ab750bd 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -46,9 +46,10 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -154,7 +155,7 @@ public class SimpleFeeder implements ReplyHandler { } } catch (InterruptedException e) {} } - public void close() throws Exception { + public void close() { session.destroy(); } } @@ -277,7 +278,7 @@ public class SimpleFeeder implements ReplyHandler { } } - class LazyDocumentOperation extends ConditionalFeedOperation { + static class LazyDocumentOperation extends ConditionalFeedOperation { private final DocumentDeserializer deserializer; LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { super(Type.DOCUMENT, condition); @@ -289,7 +290,7 @@ public class SimpleFeeder implements ReplyHandler { return new Document(deserializer); } } - class LazyUpdateOperation extends ConditionalFeedOperation { + static class LazyUpdateOperation extends ConditionalFeedOperation { private final DocumentDeserializer deserializer; LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { super(Type.UPDATE, condition); @@ -377,13 +378,22 @@ public class SimpleFeeder implements ReplyHandler { } + static class RetryExecutionhandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) {} + } + } SimpleFeeder run() throws Throwable { ExecutorService executor = (numThreads > 1) ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), + new ArrayBlockingQueue<>(numThreads*100), ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), - new ThreadPoolExecutor.CallerRunsPolicy()) + new RetryExecutionhandler()) : null; printHeader(out); long numMessagesSent = 0; |