diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-29 07:51:54 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-29 07:51:54 +0200 |
commit | 1f2395a64721868956a11184e73d962aaa394f86 (patch) | |
tree | 738a0d14f8cec0548da31a49439d48f79ce58134 /vespa_feed_perf/src | |
parent | 7ebc3fe27ab60427ca9b0842730cd9732ada34b5 (diff) |
Add support for files, not only stdin
Diffstat (limited to 'vespa_feed_perf/src')
4 files changed, 104 insertions, 68 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index b4549fe495c..1fcd5d72a00 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -8,11 +8,14 @@ import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; /** * @author Simon Thoresen Hult @@ -20,7 +23,6 @@ import java.io.PrintStream; class FeederParams { enum DumpFormat {JSON, VESPA}; - private InputStream stdIn = System.in; private PrintStream stdErr = System.err; private PrintStream stdOut = System.out; private Route route = Route.parse("default"); @@ -30,14 +32,10 @@ class FeederParams { private boolean benchmarkMode = false; private int numDispatchThreads = 1; private int maxPending = 0; + private List<InputStream> inputStreams = new ArrayList<>(); - InputStream getStdIn() { - return stdIn; - } - - FeederParams setStdIn(InputStream stdIn) { - this.stdIn = stdIn; - return this; + FeederParams() { + inputStreams.add(System.in); } PrintStream getStdErr() { @@ -82,11 +80,6 @@ class FeederParams { return this; } - FeederParams setMaxPending(int maxPending) { - this.maxPending = maxPending; - return this; - } - boolean isSerialTransferEnabled() { return maxPending == 1; } @@ -96,6 +89,11 @@ class FeederParams { numDispatchThreads = 1; return this; } + List<InputStream> getInputStreams() { return inputStreams; } + FeederParams setInputStreams(List<InputStream> inputStreams) { + this.inputStreams = inputStreams; + return this; + } int getNumDispatchThreads() { return numDispatchThreads; } int getMaxPending() { return maxPending; } @@ -133,6 +131,13 @@ class FeederParams { setSerialTransfer(); } + if ( !cmd.getArgList().isEmpty()) { + inputStreams.clear(); + for (String fileName : cmd.getArgList()) { + inputStreams.add(new FileInputStream(new File(fileName))); + } + } + return this; } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 36e5cc37ea5..70bb2c78ce7 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -44,6 +44,8 @@ import java.io.OutputStream; import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -56,24 +58,59 @@ import java.util.concurrent.atomic.AtomicReference; */ public class SimpleFeeder implements ReplyHandler { - private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10); - private final static long HEADER_INTERVAL = REPORT_INTERVAL * 24; + private final DocumentTypeManager docTypeMgr = new DocumentTypeManager(); - private final InputStream in; + private final List<InputStream> inputStreams; private final PrintStream out; private final RPCMessageBus mbus; private final SourceSession session; + private final int numThreads; + private final Destination destination; + private final boolean benchmarkMode; + private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10); private final long startTime = System.currentTimeMillis(); private final AtomicReference<Throwable> failure = new AtomicReference<>(null); private final AtomicLong numReplies = new AtomicLong(0); private long maxLatency = Long.MIN_VALUE; private long minLatency = Long.MAX_VALUE; - private long nextHeader = startTime + HEADER_INTERVAL; private long nextReport = startTime + REPORT_INTERVAL; private long sumLatency = 0; - private final int numThreads; - private final Destination destination; - private final boolean benchmarkMode; + + static class Metrics { + + private final Destination destination; + private final FeedReader reader; + private final Executor executor; + AtomicReference<Throwable> failure; + + Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference<Throwable> failure) { + this.destination = destination; + this.reader = reader; + this.executor = executor; + this.failure = failure; + } + + long feed() throws Throwable { + long numMessages = 0; + while (failure.get() == null) { + FeedOperation op = reader.read(); + if (op.getType() == FeedOperation.Type.INVALID) { + break; + } + if (executor != null) { + executor.execute(() -> sendOperation(op)); + } else { + sendOperation(op); + } + ++numMessages; + } + return numMessages; + } + private void sendOperation(FeedOperation op) { + destination.send(op); + } + } + public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); @@ -301,7 +338,7 @@ public class SimpleFeeder implements ReplyHandler { return new JsonDestination(params.getDumpStream(), failure, numReplies); } SimpleFeeder(FeederParams params) { - in = params.getStdIn(); + inputStreams = params.getInputStreams(); out = params.getStdOut(); numThreads = params.getNumDispatchThreads(); mbus = newMessageBus(docTypeMgr, params.getConfigId()); @@ -313,12 +350,8 @@ public class SimpleFeeder implements ReplyHandler { : new MbusDestination(session, params.getRoute(), failure, params.getStdErr()); } - private void sendOperation(FeedOperation op) { - destination.send(op); - } - SourceSession getSourceSession() { return session; } - private FeedReader createFeedReader() throws Exception { + private FeedReader createFeedReader(InputStream in) throws Exception { in.mark(8); byte [] b = new byte[2]; int numRead = readExact(in, b); @@ -335,6 +368,8 @@ public class SimpleFeeder implements ReplyHandler { } } + + SimpleFeeder run() throws Throwable { ExecutorService executor = (numThreads > 1) ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, @@ -342,29 +377,19 @@ public class SimpleFeeder implements ReplyHandler { ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), new ThreadPoolExecutor.CallerRunsPolicy()) : null; - FeedReader reader = createFeedReader(); - - printHeader(); - long numMessages = 0; - while (failure.get() == null) { - FeedOperation op = reader.read(); - if (op.getType() == FeedOperation.Type.INVALID) { - break; - } - if (executor != null) { - executor.execute(() -> sendOperation(op)); - } else { - sendOperation(op); - } - ++numMessages; + printHeader(out); + long numMessagesSent = 0; + for (InputStream in : inputStreams) { + Metrics m = new Metrics(destination, createFeedReader(in), executor, failure); + numMessagesSent += m.feed(); } - while (failure.get() == null && numReplies.get() < numMessages) { + while (failure.get() == null && numReplies.get() < numMessagesSent) { Thread.sleep(100); } + printReport(out); if (failure.get() != null) { throw failure.get(); } - printReport(); return this; } @@ -414,23 +439,18 @@ public class SimpleFeeder implements ReplyHandler { maxLatency = Math.max(maxLatency, latency); sumLatency += latency; if (benchmarkMode) { return; } - if (now > nextHeader) { - printHeader(); - nextHeader += HEADER_INTERVAL; - } if (now > nextReport) { - printReport(); + printReport(out); nextReport += REPORT_INTERVAL; } } - - private void printHeader() { + private static void printHeader(PrintStream out) { out.println("# Time used, num ok, num error, min latency, max latency, average latency"); } - private void printReport() { + private void printReport(PrintStream out) { out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, - numReplies.get(), minLatency, maxLatency, sumLatency / numReplies.get()); + numReplies.get(), minLatency, maxLatency, sumLatency / numReplies.get()); } private static String formatErrors(Reply reply) { diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index d44cf41f9ab..5cc5d0bc018 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -5,12 +5,12 @@ import com.yahoo.messagebus.routing.Route; import org.apache.commons.cli.ParseException; import org.junit.Test; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.InputStream; import java.io.PrintStream; import static org.junit.Assert.assertEquals; @@ -24,18 +24,14 @@ import static org.junit.Assert.assertTrue; * @author Simon Thoresen Hult */ public class FeederParamsTest { - static final String TESTFILE_JSON = "test.json"; - static final String TESTFILE_VESPA = "test.vespa"; - static final String TESTFILE_UNKNOWN = "test.xyz"; + private static final String TESTFILE_JSON = "test.json"; + private static final String TESTFILE_VESPA = "test.vespa"; + private static final String TESTFILE_UNKNOWN = "test.xyz"; @Test public void requireThatAccessorsWork() { FeederParams params = new FeederParams(); - InputStream stdIn = new ByteArrayInputStream(new byte[1]); - params.setStdIn(stdIn); - assertSame(stdIn, params.getStdIn()); - PrintStream stdErr = new PrintStream(new ByteArrayOutputStream()); params.setStdErr(stdErr); assertSame(stdErr, params.getStdErr()); @@ -55,7 +51,7 @@ public class FeederParamsTest { @Test public void requireThatParamsHaveReasonableDefaults() { FeederParams params = new FeederParams(); - assertSame(System.in, params.getStdIn()); + assertSame(System.in, params.getInputStreams().get(0)); assertSame(System.err, params.getStdErr()); assertSame(System.out, params.getStdOut()); assertEquals(Route.parse("default"), params.getRoute()); @@ -66,16 +62,14 @@ public class FeederParamsTest { @Test public void requireThatSerialTransferOptionIsParsed() throws ParseException, FileNotFoundException { assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled()); - assertTrue(new FeederParams().parseArgs("foo", "-s").isSerialTransferEnabled()); - assertTrue(new FeederParams().parseArgs("-s", "foo").isSerialTransferEnabled()); assertTrue(new FeederParams().parseArgs("--serial").isSerialTransferEnabled()); - assertTrue(new FeederParams().parseArgs("foo", "--serial").isSerialTransferEnabled()); - assertTrue(new FeederParams().parseArgs("--serial", "foo").isSerialTransferEnabled()); + assertEquals(1, new FeederParams().parseArgs("-s").getMaxPending()); + assertEquals(1, new FeederParams().parseArgs("-s").getNumDispatchThreads()); } @Test public void requireThatArgumentsAreParsedAsRoute() throws ParseException, FileNotFoundException { - assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r foo bar").getRoute()); + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r", "foo bar").getRoute()); assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("--route","foo bar").getRoute()); } @@ -115,4 +109,20 @@ public class FeederParamsTest { assertTrue(new File(TESTFILE_UNKNOWN).delete()); } + @Test + public void requireThatInputFilesAreAggregated() throws ParseException, IOException { + File json = new File(TESTFILE_JSON); + File vespa = new File(TESTFILE_VESPA); + new FileOutputStream(json).close(); + new FileOutputStream(vespa).close(); + FeederParams p = new FeederParams(); + p.parseArgs("-n", "3", TESTFILE_JSON, TESTFILE_VESPA); + assertEquals(3, p.getNumDispatchThreads()); + assertEquals(2, p.getInputStreams().size()); + assertTrue(p.getInputStreams().get(0) instanceof FileInputStream); + assertTrue(p.getInputStreams().get(1) instanceof FileInputStream); + json.delete(); + vespa.delete(); + } + } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java index 2de7e831d04..8af4dd5dac9 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java @@ -22,6 +22,7 @@ import java.io.InputStream; import java.io.PrintStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.util.Arrays; import java.util.regex.Pattern; import static org.junit.Assert.assertEquals; @@ -313,7 +314,7 @@ public class SimpleFeederTest { server = new SimpleServer(CONFIG_DIR, validator); feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR) .setStdErr(new PrintStream(err)) - .setStdIn(in) + .setInputStreams(Arrays.asList(in)) .setStdOut(new PrintStream(out))); } |