diff options
-rw-r--r-- | vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 14123c7a73c..aac7ab750bd 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -46,9 +46,10 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -154,7 +155,7 @@ public class SimpleFeeder implements ReplyHandler { } } catch (InterruptedException e) {} } - public void close() throws Exception { + public void close() { session.destroy(); } } @@ -277,7 +278,7 @@ public class SimpleFeeder implements ReplyHandler { } } - class LazyDocumentOperation extends ConditionalFeedOperation { + static class LazyDocumentOperation extends ConditionalFeedOperation { private final DocumentDeserializer deserializer; LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { super(Type.DOCUMENT, condition); @@ -289,7 +290,7 @@ public class SimpleFeeder implements ReplyHandler { return new Document(deserializer); } } - class LazyUpdateOperation extends ConditionalFeedOperation { + static class LazyUpdateOperation extends ConditionalFeedOperation { private final DocumentDeserializer deserializer; LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { super(Type.UPDATE, condition); @@ -377,13 +378,22 @@ public class SimpleFeeder implements ReplyHandler { } + static class RetryExecutionhandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) {} + } + } SimpleFeeder run() throws Throwable { ExecutorService executor = (numThreads > 1) ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(false), + new ArrayBlockingQueue<>(numThreads*100), ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), - new ThreadPoolExecutor.CallerRunsPolicy()) + new RetryExecutionhandler()) : null; printHeader(out); long numMessagesSent = 0; |