diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-24 15:33:05 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-24 21:47:37 +0200 |
commit | 7ad9d874ba3f3fa003e7a60c0980967e01273272 (patch) | |
tree | 4cba4d302573deea84a75035998161224eb31222 /vespa_feed_perf/src | |
parent | 3975dbc206434999e1b7f262b7dd58749e29a013 (diff) |
Add support for dumping as json
Diffstat (limited to 'vespa_feed_perf/src')
4 files changed, 238 insertions, 85 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index e7738d92818..0e1b038e6cd 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -7,100 +7,103 @@ import org.apache.commons.cli.DefaultParser; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; /** * @author Simon Thoresen Hult */ -public class FeederParams { +class FeederParams { private InputStream stdIn = System.in; private PrintStream stdErr = System.err; private PrintStream stdOut = System.out; private Route route = Route.parse("default"); private String configId = "client"; + private OutputStream dumpStream = null; private boolean serialTransferEnabled = false; private int numDispatchThreads = 1; - public InputStream getStdIn() { + InputStream getStdIn() { return stdIn; } - public FeederParams setStdIn(InputStream stdIn) { + FeederParams setStdIn(InputStream stdIn) { this.stdIn = stdIn; return this; } - public PrintStream getStdErr() { + PrintStream getStdErr() { return stdErr; } - public FeederParams setStdErr(PrintStream stdErr) { + FeederParams setStdErr(PrintStream stdErr) { this.stdErr = stdErr; return this; } - public PrintStream getStdOut() { + PrintStream getStdOut() { return stdOut; } - public FeederParams setStdOut(PrintStream stdOut) { + FeederParams setStdOut(PrintStream stdOut) { this.stdOut = stdOut; return this; } - public Route getRoute() { + Route getRoute() { return route; } - - public FeederParams setRoute(Route route) { - this.route = new Route(route); + OutputStream getDumpStream() { return dumpStream; } + FeederParams setDumpStream(OutputStream dumpStream) { + this.dumpStream = dumpStream; return this; } - public String getConfigId() { + String getConfigId() { return configId; } - public FeederParams setConfigId(String configId) { + FeederParams setConfigId(String configId) { this.configId = configId; return this; } - public boolean isSerialTransferEnabled() { + boolean isSerialTransferEnabled() { return serialTransferEnabled; } - public FeederParams setSerialTransfer(boolean serial) { + FeederParams setSerialTransfer(boolean serial) { this.serialTransferEnabled = serial; return this; } - public int getNumDispatchThreads() { return numDispatchThreads; } + int getNumDispatchThreads() { return numDispatchThreads; } - public FeederParams parseArgs(String... args) throws ParseException { + FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException { Options opts = new Options(); opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation"); - opts.addOption("n", "numthreads", true, "Number of clients for sending messages."); + opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id."); + opts.addOption("r", "route", true, "Route for sending messages. default is 'default'...."); + opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .v8) json will be produced if no extension."); CommandLine cmd = new DefaultParser().parse(opts, args); - serialTransferEnabled = cmd.hasOption("s"); + serialTransferEnabled = cmd.hasOption('s'); if (cmd.hasOption('n')) { numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim()); } - route = newRoute(cmd.getArgs()); - return this; - } - - private static Route newRoute(String... args) { - if (args.length == 0) { - return Route.parse("default"); + if (cmd.hasOption('r')) { + route = Route.parse(cmd.getOptionValue('r').trim()); } - StringBuilder out = new StringBuilder(); - for (String arg : args) { - out.append(arg).append(' '); + if (cmd.hasOption('o')) { + dumpStream = new FileOutputStream(new File(cmd.getOptionValue('o').trim())); } - return Route.parse(out.toString()); + + return this; } + } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index 0b6604fed1e..b4520c0d9e3 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -5,6 +5,7 @@ import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.json.JsonFeedReader; +import com.yahoo.document.json.JsonWriter; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; @@ -25,6 +26,7 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; @@ -43,9 +45,7 @@ public class SimpleFeeder implements ReplyHandler { private final DocumentTypeManager docTypeMgr = new DocumentTypeManager(); private final InputStream in; private final PrintStream out; - private final PrintStream err; private final RPCMessageBus mbus; - private final Route route; private final SourceSession session; private final long startTime = System.currentTimeMillis(); private final AtomicReference<Throwable> failure = new AtomicReference<>(null); @@ -56,43 +56,115 @@ public class SimpleFeeder implements ReplyHandler { private long nextReport = startTime + REPORT_INTERVAL; private long sumLatency = 0; private final int numThreads; + private final Destination destination; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); } - SimpleFeeder(FeederParams params) { - this.in = params.getStdIn(); - this.out = params.getStdOut(); - this.err = params.getStdErr(); - this.route = params.getRoute(); - this.numThreads = params.getNumDispatchThreads(); - this.mbus = newMessageBus(docTypeMgr, params.getConfigId()); - this.session = newSession(mbus, this, params.isSerialTransferEnabled()); - this.docTypeMgr.configure(params.getConfigId()); + private interface Destination { + void send(VespaXMLFeedReader.Operation op); + void close() throws Exception; } - private void sendOperation(VespaXMLFeedReader.Operation op) { - Message msg = newMessage(op); - if (msg == null) { - err.println("ignoring operation; " + op.getType()); - return; + private static class MbusDestination implements Destination { + private final PrintStream err; + private final Route route; + private final SourceSession session; + private final AtomicReference<Throwable> failure; + MbusDestination(SourceSession session, Route route, AtomicReference<Throwable> failure, PrintStream err) { + this.route = route; + this.err = err; + this.session = session; + this.failure = failure; + } + public void send(VespaXMLFeedReader.Operation op) { + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + return; + } + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + try { + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + failure.set(new IOException(err.toString())); + } + } catch (InterruptedException e) {} + } + public void close() throws Exception { + session.destroy(); + } + } + + private static class JsonDestination implements Destination { + private final OutputStream outputStream; + private final JsonWriter writer; + private final AtomicLong numReplies; + private final AtomicReference<Throwable> failure; + private boolean isFirst = true; + JsonDestination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) { + this.outputStream = outputStream; + writer = new JsonWriter(outputStream); + this.numReplies = numReplies; + this.failure = failure; + try { + outputStream.write('['); + outputStream.write('\n'); + } catch (IOException e) { + failure.set(e); + } } - msg.setContext(System.currentTimeMillis()); - msg.setRoute(route); - try { - Error err = session.sendBlocking(msg).getError(); - if (err != null) { - failure.set(new IOException(err.toString())); + public void send(VespaXMLFeedReader.Operation op) { + if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) { + if (!isFirst) { + try { + outputStream.write(','); + outputStream.write('\n'); + } catch (IOException e) { + failure.set(e); + } + } else { + isFirst = false; + } + writer.write(op.getDocument()); } - } catch (InterruptedException e) {} + numReplies.incrementAndGet(); + } + public void close() throws Exception { + outputStream.write('\n'); + outputStream.write(']'); + outputStream.close(); + } + + } + + SimpleFeeder(FeederParams params) { + in = params.getStdIn(); + out = params.getStdOut(); + numThreads = params.getNumDispatchThreads(); + mbus = newMessageBus(docTypeMgr, params.getConfigId()); + session = newSession(mbus, this, params.isSerialTransferEnabled()); + docTypeMgr.configure(params.getConfigId()); + destination = (params.getDumpStream() != null) + ? new JsonDestination(params.getDumpStream(), failure, numReplies) + : new MbusDestination(session, params.getRoute(), failure, params.getStdErr()); } + private void sendOperation(VespaXMLFeedReader.Operation op) { + destination.send(op); + } + + SourceSession getSourceSession() { return session; } private FeedReader createFeedReader() throws Exception { in.mark(8); - byte b[] = new byte[1]; - in.read(b); + byte [] b = new byte[1]; + int numRead = in.read(b); in.reset(); + if (numRead != b.length) { + throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes."); + } if (b[0] == '[') { return new JsonFeedReader(in, docTypeMgr); } else { @@ -134,12 +206,12 @@ public class SimpleFeeder implements ReplyHandler { return this; } - void close() { - session.destroy(); + void close() throws Exception { + destination.close(); mbus.destroy(); } - private Message newMessage(VespaXMLFeedReader.Operation op) { + private static Message newMessage(VespaXMLFeedReader.Operation op) { switch (op.getType()) { case DOCUMENT: { PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument())); diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index f08e494a717..b2800110a39 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -4,14 +4,20 @@ package com.yahoo.vespa.feed.perf; import com.yahoo.messagebus.routing.Route; import org.apache.commons.cli.ParseException; import org.junit.Test; -import org.mockito.Mockito; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.io.PrintStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; @@ -19,28 +25,24 @@ import static org.junit.Assert.assertTrue; * @author Simon Thoresen Hult */ public class FeederParamsTest { + static final String TESTFILE = "test.json"; @Test public void requireThatAccessorsWork() { FeederParams params = new FeederParams(); - InputStream stdIn = Mockito.mock(InputStream.class); + InputStream stdIn = new ByteArrayInputStream(new byte[1]); params.setStdIn(stdIn); assertSame(stdIn, params.getStdIn()); - PrintStream stdErr = Mockito.mock(PrintStream.class); + PrintStream stdErr = new PrintStream(new ByteArrayOutputStream()); params.setStdErr(stdErr); assertSame(stdErr, params.getStdErr()); - PrintStream stdOut = Mockito.mock(PrintStream.class); + PrintStream stdOut = new PrintStream(new ByteArrayOutputStream()); params.setStdOut(stdOut); assertSame(stdOut, params.getStdOut()); - Route route = Route.parse("my_route"); - params.setRoute(route); - assertEquals(route, params.getRoute()); - assertNotSame(route, params.getRoute()); - params.setConfigId("my_config_id"); assertEquals("my_config_id", params.getConfigId()); @@ -62,7 +64,7 @@ public class FeederParamsTest { } @Test - public void requireThatSerialTransferOptionIsParsed() throws ParseException { + public void requireThatSerialTransferOptionIsParsed() throws ParseException, FileNotFoundException { assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled()); assertTrue(new FeederParams().parseArgs("foo", "-s").isSerialTransferEnabled()); assertTrue(new FeederParams().parseArgs("-s", "foo").isSerialTransferEnabled()); @@ -72,23 +74,30 @@ public class FeederParamsTest { } @Test - public void requireThatArgumentsAreParsedAsRoute() throws ParseException { - assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar").getRoute()); - assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-s", "foo", "bar").getRoute()); - assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "-s", "bar").getRoute()); - assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar", "-s").getRoute()); + public void requireThatArgumentsAreParsedAsRoute() throws ParseException, FileNotFoundException { + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r foo bar").getRoute()); + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("--route","foo bar").getRoute()); } @Test - public void requireThatRouteIsAnOptionalArgument() throws ParseException { + public void requireThatRouteIsAnOptionalArgument() throws ParseException, FileNotFoundException { assertEquals(Route.parse("default"), new FeederParams().parseArgs().getRoute()); assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute()); } @Test - public void requireThatNumThreadsAreParsed() throws ParseException { + public void requireThatNumThreadsAreParsed() throws ParseException, FileNotFoundException { assertEquals(1, new FeederParams().getNumDispatchThreads()); assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads()); } + @Test + public void requireThatDumpStreamAreParsed() throws ParseException, IOException { + assertNull(new FeederParams().getDumpStream()); + OutputStream dumpStream = new FeederParams().parseArgs("-o " + TESTFILE).getDumpStream(); + assertNotNull(dumpStream); + dumpStream.close(); + assertTrue(new File(TESTFILE).delete()); + } + } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java index 25c56a5cc57..f93657138ca 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java @@ -61,6 +61,76 @@ public class SimpleFeederTest { } @Test + public void requireThatXML2JsonFeederWorks() throws Throwable { + ByteArrayOutputStream dump = new ByteArrayOutputStream(); + assertFeed(new FeederParams().setDumpStream(dump), + "<vespafeed>" + + " <document documenttype='simple' documentid='id:simple:simple::0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <update documenttype='simple' documentid='id:simple:simple::1'>" + + " <assign field='my_str'>bar</assign>" + + " </update>" + + " <remove documenttype='simple' documentid='id:simple:simple::2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + assertEquals(58, dump.size()); + assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}}\n]", dump.toString()); + } + + @Test + public void requireThatDualPutXML2JsonFeederWorks() throws Throwable { + ByteArrayOutputStream dump = new ByteArrayOutputStream(); + assertFeed(new FeederParams().setDumpStream(dump), + "<vespafeed>" + + " <document documenttype='simple' documentid='id:simple:simple::0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <document documenttype='simple' documentid='id:simple:simple::1'>" + + " <my_str>bar</my_str>" + + " </document>" + + " <remove documenttype='simple' documentid='id:simple:simple::2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + assertEquals(115, dump.size()); + assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}},\n {\"id\":\"id:simple:simple::1\",\"fields\":{\"my_str\":\"bar\"}}\n]", dump.toString()); + assertFeed(dump.toString(), + new MessageHandler() { + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*2,.+\n"); + } + + @Test public void requireThatJsonFeederWorks() throws Throwable { assertFeed("[" + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + @@ -105,7 +175,7 @@ public class SimpleFeederTest { " <document documenttype='simple' documentid='doc:scheme:0'/>" + "</vespafeed>", null); - getSourceSession(driver).close(); + driver.feeder.getSourceSession().close(); try { driver.run(); fail(); @@ -156,12 +226,8 @@ public class SimpleFeederTest { assertTrue(driver.close()); } - private static SourceSession getSourceSession(TestDriver driver) { - return (SourceSession)getField(driver.feeder, "session"); - } - private static ThrottlePolicy getThrottlePolicy(TestDriver driver) { - return (ThrottlePolicy)getField(getSourceSession(driver), "throttlePolicy"); + return (ThrottlePolicy)getField(driver.feeder.getSourceSession(), "throttlePolicy"); } private static Object getField(Object obj, String fieldName) { @@ -174,9 +240,12 @@ public class SimpleFeederTest { } } - private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) + private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); + } + private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { - TestDriver driver = new TestDriver(new FeederParams(), in, validator); + TestDriver driver = new TestDriver(params, in, validator); driver.run(); assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8)); assertMatches(expectedOut, new String(driver.out.toByteArray(), StandardCharsets.UTF_8)); @@ -209,7 +278,7 @@ public class SimpleFeederTest { feeder.run(); } - boolean close() { + boolean close() throws Exception { feeder.close(); server.close(); return true; |