From 39a9c842e64c55e62c17e07a4e08c5f7b4c7b1e2 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sun, 29 Mar 2020 21:59:19 +0000 Subject: Add an option to send onlyfirst N messages. --- .../main/java/com/yahoo/vespa/feed/perf/FeederParams.java | 12 ++++++++---- .../main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java | 12 ++++++++---- .../java/com/yahoo/vespa/feed/perf/FeederParamsTest.java | 9 ++++++++- 3 files changed, 24 insertions(+), 9 deletions(-) (limited to 'vespa_feed_perf/src') diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index d4c6c706b2a..c1e164f7fe8 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -35,6 +35,7 @@ class FeederParams { private int numDispatchThreads = 1; private int maxPending = 0; private int numConnectionsPerTarget = 1; + private long numMessagesToSend = Long.MAX_VALUE; private List inputStreams = new ArrayList<>(); FeederParams() { @@ -84,10 +85,9 @@ class FeederParams { } int getNumConnectionsPerTarget() { return numConnectionsPerTarget; } - FeederParams setNumConnectionsPerTarget(int numConnectionsPerTarget) { - this.numConnectionsPerTarget = numConnectionsPerTarget; - return this; - } + + long getNumMessagesToSend() { return numMessagesToSend; } + boolean isSerialTransferEnabled() { return maxPending == 1; } @@ -116,6 +116,7 @@ class FeederParams { opts.addOption("b", "mode", true, "Mode for benchmarking."); opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension."); opts.addOption("c", "numconnections", true, "Number of connections per host."); + opts.addOption("l", "nummessages", true, "Number of messages to send (all is default)."); CommandLine cmd = new DefaultParser().parse(opts, args); @@ -142,6 +143,9 @@ class FeederParams { if (cmd.hasOption('s')) { setSerialTransfer(); } + if (cmd.hasOption('l')) { + numMessagesToSend = Long.valueOf(cmd.getOptionValue('l').trim()); + } if ( !cmd.getArgList().isEmpty()) { inputStreams.clear(); diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 2925ea08de9..556d9bd60c7 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -65,6 +65,7 @@ public class SimpleFeeder implements ReplyHandler { private final RPCMessageBus mbus; private final SourceSession session; private final int numThreads; + private final long numMessagesToSend; private final Destination destination; private final boolean benchmarkMode; private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10); @@ -81,18 +82,20 @@ public class SimpleFeeder implements ReplyHandler { private final Destination destination; private final FeedReader reader; private final Executor executor; - AtomicReference failure; + private final long messagesToSend; + private final AtomicReference failure; - Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference failure) { + Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference failure, long messagesToSend) { this.destination = destination; this.reader = reader; this.executor = executor; + this.messagesToSend = messagesToSend; this.failure = failure; } long feed() throws Throwable { long numMessages = 0; - while (failure.get() == null) { + while ((failure.get() == null) && (numMessages < messagesToSend)) { FeedOperation op = reader.read(); if (op.getType() == FeedOperation.Type.INVALID) { break; @@ -341,6 +344,7 @@ public class SimpleFeeder implements ReplyHandler { inputStreams = params.getInputStreams(); out = params.getStdOut(); numThreads = params.getNumDispatchThreads(); + numMessagesToSend = params.getNumMessagesToSend(); mbus = newMessageBus(docTypeMgr, params); session = newSession(mbus, this, params.getMaxPending()); docTypeMgr.configure(params.getConfigId()); @@ -380,7 +384,7 @@ public class SimpleFeeder implements ReplyHandler { printHeader(out); long numMessagesSent = 0; for (InputStream in : inputStreams) { - Metrics m = new Metrics(destination, createFeedReader(in), executor, failure); + Metrics m = new Metrics(destination, createFeedReader(in), executor, failure, numMessagesToSend); numMessagesSent += m.feed(); } while (failure.get() == null && numReplies.get() < numMessagesSent) { diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index 1768b3680b1..13e307d9973 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -89,10 +89,17 @@ public class FeederParamsTest { @Test public void requireThatNumConnectionsAreParsed() throws ParseException, FileNotFoundException { assertEquals(1, new FeederParams().getNumConnectionsPerTarget()); - assertEquals(17, new FeederParams().parseArgs("-c 17").getNumConnectionsPerTarget()); + assertEquals(16, new FeederParams().parseArgs("-c 16").getNumConnectionsPerTarget()); assertEquals(17, new FeederParams().parseArgs("--numconnections", "17").getNumConnectionsPerTarget()); } + @Test + public void requireThatNumMessagesToSendAreParsed() throws ParseException, FileNotFoundException { + assertEquals(Long.MAX_VALUE, new FeederParams().getNumMessagesToSend()); + assertEquals(18, new FeederParams().parseArgs("-l 18").getNumMessagesToSend()); + assertEquals(19, new FeederParams().parseArgs("--nummessages", "19").getNumMessagesToSend()); + } + @Test public void requireThatDumpStreamAreParsed() throws ParseException, IOException { assertNull(new FeederParams().getDumpStream()); -- cgit v1.2.3