aboutsummaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@verizonmedia.com>2020-04-16 00:26:23 +0000
committerHenning Baldersheim <balder@verizonmedia.com>2020-04-16 00:26:23 +0000
commitf71fe343a717f4ea934af2dfc783dc665e054cfa (patch)
tree1b67c641f5859eccfd878960cd5b581590839458 /vespa_feed_perf
parentbdb570a9e21410108bbb56f183bad1603c45c1fc (diff)
Use a smoother blocking Q and block when full.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java22
1 files changed, 16 insertions, 6 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index 14123c7a73c..aac7ab750bd 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -46,9 +46,10 @@ import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -154,7 +155,7 @@ public class SimpleFeeder implements ReplyHandler {
}
} catch (InterruptedException e) {}
}
- public void close() throws Exception {
+ public void close() {
session.destroy();
}
}
@@ -277,7 +278,7 @@ public class SimpleFeeder implements ReplyHandler {
}
}
- class LazyDocumentOperation extends ConditionalFeedOperation {
+ static class LazyDocumentOperation extends ConditionalFeedOperation {
private final DocumentDeserializer deserializer;
LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
super(Type.DOCUMENT, condition);
@@ -289,7 +290,7 @@ public class SimpleFeeder implements ReplyHandler {
return new Document(deserializer);
}
}
- class LazyUpdateOperation extends ConditionalFeedOperation {
+ static class LazyUpdateOperation extends ConditionalFeedOperation {
private final DocumentDeserializer deserializer;
LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
super(Type.UPDATE, condition);
@@ -377,13 +378,22 @@ public class SimpleFeeder implements ReplyHandler {
}
+ static class RetryExecutionhandler implements RejectedExecutionHandler {
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {}
+ }
+ }
SimpleFeeder run() throws Throwable {
ExecutorService executor = (numThreads > 1)
? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
- new SynchronousQueue<>(false),
+ new ArrayBlockingQueue<>(numThreads*100),
ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"),
- new ThreadPoolExecutor.CallerRunsPolicy())
+ new RetryExecutionhandler())
: null;
printHeader(out);
long numMessagesSent = 0;