aboutsummaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-15 13:33:56 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-15 13:33:56 +0200
commit8e62cd1238a7c98009d8695eac4c85a8da1549b5 (patch)
treea60ddc5c93bac2ece9cd5d1e9c2b07592c2a24d7 /vespa_feed_perf
parent85962336458e1a33aa5f0f0130dc018d4bee7d72 (diff)
Add control for number of feed threads.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java7
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java49
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java6
3 files changed, 49 insertions, 13 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
index af8522f4fc2..e7738d92818 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -21,6 +21,7 @@ public class FeederParams {
private Route route = Route.parse("default");
private String configId = "client";
private boolean serialTransferEnabled = false;
+ private int numDispatchThreads = 1;
public InputStream getStdIn() {
return stdIn;
@@ -76,12 +77,18 @@ public class FeederParams {
return this;
}
+ public int getNumDispatchThreads() { return numDispatchThreads; }
+
public FeederParams parseArgs(String... args) throws ParseException {
Options opts = new Options();
opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation");
+ opts.addOption("n", "numthreads", true, "Number of clients for sending messages.");
CommandLine cmd = new DefaultParser().parse(opts, args);
serialTransferEnabled = cmd.hasOption("s");
+ if (cmd.hasOption('n')) {
+ numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim());
+ }
route = newRoute(cmd.getArgs());
return this;
}
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index e10dce31adf..e55cd27f7da 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.feed.perf;
+import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
@@ -23,6 +24,9 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,13 +46,14 @@ public class SimpleFeeder implements ReplyHandler {
private final Route route;
private final SourceSession session;
private final long startTime = System.currentTimeMillis();
- private AtomicReference<Throwable> failure = new AtomicReference<>(null);
- private AtomicLong numReplies = new AtomicLong(0);
+ private final AtomicReference<Throwable> failure = new AtomicReference<>(null);
+ private final AtomicLong numReplies = new AtomicLong(0);
private long maxLatency = Long.MIN_VALUE;
private long minLatency = Long.MAX_VALUE;
private long nextHeader = startTime + HEADER_INTERVAL;
private long nextReport = startTime + REPORT_INTERVAL;
private long sumLatency = 0;
+ private final int numThreads;
public static void main(String[] args) throws Throwable {
new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
@@ -59,31 +64,49 @@ public class SimpleFeeder implements ReplyHandler {
this.out = params.getStdOut();
this.err = params.getStdErr();
this.route = params.getRoute();
+ this.numThreads = params.getNumDispatchThreads();
this.mbus = newMessageBus(docTypeMgr, params.getConfigId());
this.session = newSession(mbus, this, params.isSerialTransferEnabled());
this.docTypeMgr.configure(params.getConfigId());
}
+ private void sendOperation(VespaXMLFeedReader.Operation op) {
+ Message msg = newMessage(op);
+ if (msg == null) {
+ err.println("ignoring operation; " + op.getType());
+ return;
+ }
+ msg.setContext(System.currentTimeMillis());
+ msg.setRoute(route);
+ try {
+ Error err = session.sendBlocking(msg).getError();
+ if (err != null) {
+ failure.set(new IOException(err.toString()));
+ }
+ } catch (InterruptedException e) {}
+ }
+
SimpleFeeder run() throws Throwable {
+ ExecutorService executor = (numThreads > 1)
+ ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
+ new SynchronousQueue<>(false),
+ ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"),
+ new ThreadPoolExecutor.CallerRunsPolicy())
+ : null;
VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr);
- VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
+
printHeader();
long numMessages = 0;
while (failure.get() == null) {
+ VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
reader.read(op);
if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) {
break;
}
- Message msg = newMessage(op);
- if (msg == null) {
- err.println("ignoring operation; " + op.getType());
- continue; // ignore
- }
- msg.setContext(System.currentTimeMillis());
- msg.setRoute(route);
- Error err = session.sendBlocking(msg).getError();
- if (err != null) {
- throw new IOException(err.toString());
+ if (executor != null) {
+ executor.execute(() -> sendOperation(op));
+ } else {
+ sendOperation(op);
}
++numMessages;
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
index 38542c1c6b0..f08e494a717 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
@@ -85,4 +85,10 @@ public class FeederParamsTest {
assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute());
}
+ @Test
+ public void requireThatNumThreadsAreParsed() throws ParseException {
+ assertEquals(1, new FeederParams().getNumDispatchThreads());
+ assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads());
+ }
+
}