From 8e62cd1238a7c98009d8695eac4c85a8da1549b5 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Mon, 15 Apr 2019 13:33:56 +0200 Subject: Add control for number of feed threads. --- .../java/com/yahoo/vespaxmlparser/FeedReader.java | 2 +- .../yahoo/vespaxmlparser/VespaXMLFeedReader.java | 1 - .../java/com/yahoo/messagebus/RPCMessageBus.java | 5 --- .../main/java/com/yahoo/messagebus/Sequencer.java | 2 +- .../com/yahoo/vespa/feed/perf/FeederParams.java | 7 ++++ .../com/yahoo/vespa/feed/perf/SimpleFeeder.java | 49 ++++++++++++++++------ .../yahoo/vespa/feed/perf/FeederParamsTest.java | 6 +++ .../java/com/yahoo/vespafeeder/VespaFeeder.java | 1 - 8 files changed, 51 insertions(+), 22 deletions(-) diff --git a/document/src/main/java/com/yahoo/vespaxmlparser/FeedReader.java b/document/src/main/java/com/yahoo/vespaxmlparser/FeedReader.java index 28028f1d7de..2c130cae782 100644 --- a/document/src/main/java/com/yahoo/vespaxmlparser/FeedReader.java +++ b/document/src/main/java/com/yahoo/vespaxmlparser/FeedReader.java @@ -16,6 +16,6 @@ public interface FeedReader { * Reads the next operation from the stream. * @param operation The operation to fill in. Operation is unchanged if none was found. */ - public abstract void read(Operation operation) throws Exception; + void read(Operation operation) throws Exception; } diff --git a/document/src/main/java/com/yahoo/vespaxmlparser/VespaXMLFeedReader.java b/document/src/main/java/com/yahoo/vespaxmlparser/VespaXMLFeedReader.java index 49652b53da8..a24f1abd22b 100644 --- a/document/src/main/java/com/yahoo/vespaxmlparser/VespaXMLFeedReader.java +++ b/document/src/main/java/com/yahoo/vespaxmlparser/VespaXMLFeedReader.java @@ -12,7 +12,6 @@ import javax.xml.stream.XMLStreamReader; import java.io.InputStream; import java.util.ArrayList; import java.util.List; -import java.util.Locale; import java.util.Optional; /** diff --git a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java index 4bbe88ffc2c..5ea278c410b 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/RPCMessageBus.java @@ -1,16 +1,12 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.messagebus; -import com.yahoo.log.LogLevel; import com.yahoo.messagebus.network.Identity; -import com.yahoo.messagebus.network.Network; import com.yahoo.messagebus.network.rpc.RPCNetwork; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Logger; /** * The RPCMessageBus class wraps a MessageBus with an RPCNetwork and handles reconfiguration. Please note that according @@ -20,7 +16,6 @@ import java.util.logging.Logger; */ public class RPCMessageBus extends NetworkMessageBus { - private static final Logger log = Logger.getLogger(RPCMessageBus.class.getName()); private final ConfigAgent configAgent; /** diff --git a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java index 35218364c8f..6d1ec7586b3 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/Sequencer.java @@ -69,7 +69,7 @@ public class Sequencer implements MessageHandler, ReplyHandler { if (seqMap.containsKey(seqId)) { Queue queue = seqMap.get(seqId); if (queue == null) { - queue = new LinkedList(); + queue = new LinkedList<>(); seqMap.put(seqId, queue); } if (msg.getTrace().shouldTrace(TraceLevel.COMPONENT)) { diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index af8522f4fc2..e7738d92818 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -21,6 +21,7 @@ public class FeederParams { private Route route = Route.parse("default"); private String configId = "client"; private boolean serialTransferEnabled = false; + private int numDispatchThreads = 1; public InputStream getStdIn() { return stdIn; @@ -76,12 +77,18 @@ public class FeederParams { return this; } + public int getNumDispatchThreads() { return numDispatchThreads; } + public FeederParams parseArgs(String... args) throws ParseException { Options opts = new Options(); opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation"); + opts.addOption("n", "numthreads", true, "Number of clients for sending messages."); CommandLine cmd = new DefaultParser().parse(opts, args); serialTransferEnabled = cmd.hasOption("s"); + if (cmd.hasOption('n')) { + numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim()); + } route = newRoute(cmd.getArgs()); return this; } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index e10dce31adf..e55cd27f7da 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.feed.perf; +import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; @@ -23,6 +24,9 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader; import java.io.IOException; import java.io.InputStream; import java.io.PrintStream; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -42,13 +46,14 @@ public class SimpleFeeder implements ReplyHandler { private final Route route; private final SourceSession session; private final long startTime = System.currentTimeMillis(); - private AtomicReference failure = new AtomicReference<>(null); - private AtomicLong numReplies = new AtomicLong(0); + private final AtomicReference failure = new AtomicReference<>(null); + private final AtomicLong numReplies = new AtomicLong(0); private long maxLatency = Long.MIN_VALUE; private long minLatency = Long.MAX_VALUE; private long nextHeader = startTime + HEADER_INTERVAL; private long nextReport = startTime + REPORT_INTERVAL; private long sumLatency = 0; + private final int numThreads; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); @@ -59,31 +64,49 @@ public class SimpleFeeder implements ReplyHandler { this.out = params.getStdOut(); this.err = params.getStdErr(); this.route = params.getRoute(); + this.numThreads = params.getNumDispatchThreads(); this.mbus = newMessageBus(docTypeMgr, params.getConfigId()); this.session = newSession(mbus, this, params.isSerialTransferEnabled()); this.docTypeMgr.configure(params.getConfigId()); } + private void sendOperation(VespaXMLFeedReader.Operation op) { + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + return; + } + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + try { + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + failure.set(new IOException(err.toString())); + } + } catch (InterruptedException e) {} + } + SimpleFeeder run() throws Throwable { + ExecutorService executor = (numThreads > 1) + ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(false), + ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr); - VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); + printHeader(); long numMessages = 0; while (failure.get() == null) { + VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); reader.read(op); if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { break; } - Message msg = newMessage(op); - if (msg == null) { - err.println("ignoring operation; " + op.getType()); - continue; // ignore - } - msg.setContext(System.currentTimeMillis()); - msg.setRoute(route); - Error err = session.sendBlocking(msg).getError(); - if (err != null) { - throw new IOException(err.toString()); + if (executor != null) { + executor.execute(() -> sendOperation(op)); + } else { + sendOperation(op); } ++numMessages; } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index 38542c1c6b0..f08e494a717 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -85,4 +85,10 @@ public class FeederParamsTest { assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute()); } + @Test + public void requireThatNumThreadsAreParsed() throws ParseException { + assertEquals(1, new FeederParams().getNumDispatchThreads()); + assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads()); + } + } diff --git a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java index 4a5c451c5bc..557caf21a89 100755 --- a/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java +++ b/vespaclient-java/src/main/java/com/yahoo/vespafeeder/VespaFeeder.java @@ -11,7 +11,6 @@ import com.yahoo.feedhandler.NullFeedMetric; import com.yahoo.feedhandler.VespaFeedHandler; import com.yahoo.log.LogSetup; import com.yahoo.concurrent.SystemTimer; -import com.yahoo.vespaclient.ClusterList; import java.io.BufferedInputStream; import java.io.File; -- cgit v1.2.3