diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-15 13:33:56 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-15 13:33:56 +0200 |
commit | 8e62cd1238a7c98009d8695eac4c85a8da1549b5 (patch) | |
tree | a60ddc5c93bac2ece9cd5d1e9c2b07592c2a24d7 /vespa_feed_perf | |
parent | 85962336458e1a33aa5f0f0130dc018d4bee7d72 (diff) |
Add control for number of feed threads.
Diffstat (limited to 'vespa_feed_perf')
3 files changed, 49 insertions, 13 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index af8522f4fc2..e7738d92818 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -21,6 +21,7 @@ public class FeederParams { private Route route = Route.parse("default"); private String configId = "client"; private boolean serialTransferEnabled = false; + private int numDispatchThreads = 1; public InputStream getStdIn() { return stdIn; @@ -76,12 +77,18 @@ public class FeederParams { return this; } + public int getNumDispatchThreads() { return numDispatchThreads; } + public FeederParams parseArgs(String... args) throws ParseException { Options opts = new Options(); opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation"); + opts.addOption("n", "numthreads", true, "Number of clients for sending messages."); CommandLine cmd = new DefaultParser().parse(opts, args); serialTransferEnabled = cmd.hasOption("s"); + if (cmd.hasOption('n')) { + numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim()); + } route = newRoute(cmd.getArgs()); return this; } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index e10dce31adf..e55cd27f7da 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.feed.perf; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; @@ -23,6 +24,9 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -42,13 +46,14 @@ public class SimpleFeeder implements ReplyHandler { private final Route route; private final SourceSession session; private final long startTime = System.currentTimeMillis(); - private AtomicReference<Throwable> failure = new AtomicReference<>(null); - private AtomicLong numReplies = new AtomicLong(0); + private final AtomicReference<Throwable> failure = new AtomicReference<>(null); + private final AtomicLong numReplies = new AtomicLong(0); private long maxLatency = Long.MIN_VALUE; private long minLatency = Long.MAX_VALUE; private long nextHeader = startTime + HEADER_INTERVAL; private long nextReport = startTime + REPORT_INTERVAL; private long sumLatency = 0; + private final int numThreads; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); @@ -59,31 +64,49 @@ public class SimpleFeeder implements ReplyHandler { this.out = params.getStdOut(); this.err = params.getStdErr(); this.route = params.getRoute(); + this.numThreads = params.getNumDispatchThreads(); this.mbus = newMessageBus(docTypeMgr, params.getConfigId()); this.session = newSession(mbus, this, params.isSerialTransferEnabled()); this.docTypeMgr.configure(params.getConfigId()); } + private void sendOperation(VespaXMLFeedReader.Operation op) { + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + return; + } + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + try { + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + failure.set(new IOException(err.toString())); + } + } catch (InterruptedException e) {} + } + SimpleFeeder run() throws Throwable { + ExecutorService executor = (numThreads > 1) + ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr); - VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); + printHeader(); long numMessages = 0; while (failure.get() == null) { + VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); reader.read(op); if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { break; } - Message msg = newMessage(op); - if (msg == null) { - err.println("ignoring operation; " + op.getType()); - continue; // ignore - } - msg.setContext(System.currentTimeMillis()); - msg.setRoute(route); - Error err = session.sendBlocking(msg).getError(); - if (err != null) { - throw new IOException(err.toString()); + if (executor != null) { + executor.execute(() -> sendOperation(op)); + } else { + sendOperation(op); } ++numMessages; } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index 38542c1c6b0..f08e494a717 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -85,4 +85,10 @@ public class FeederParamsTest { assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute()); } + @Test + public void requireThatNumThreadsAreParsed() throws ParseException { + assertEquals(1, new FeederParams().getNumDispatchThreads()); + assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads()); + } + } |