summaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-24 15:33:05 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-24 21:47:37 +0200
commit7ad9d874ba3f3fa003e7a60c0980967e01273272 (patch)
tree4cba4d302573deea84a75035998161224eb31222 /vespa_feed_perf
parent3975dbc206434999e1b7f262b7dd58749e29a013 (diff)
Add support for dumping as json
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/pom.xml5
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java63
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java128
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java45
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java87
5 files changed, 238 insertions, 90 deletions
diff --git a/vespa_feed_perf/pom.xml b/vespa_feed_perf/pom.xml
index c6cf0cb9edf..a4edd9cda2c 100644
--- a/vespa_feed_perf/pom.xml
+++ b/vespa_feed_perf/pom.xml
@@ -34,11 +34,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>com.yahoo.vespa</groupId>
<artifactId>component</artifactId>
<version>${project.version}</version>
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
index e7738d92818..0e1b038e6cd 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -7,100 +7,103 @@ import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
/**
* @author Simon Thoresen Hult
*/
-public class FeederParams {
+class FeederParams {
private InputStream stdIn = System.in;
private PrintStream stdErr = System.err;
private PrintStream stdOut = System.out;
private Route route = Route.parse("default");
private String configId = "client";
+ private OutputStream dumpStream = null;
private boolean serialTransferEnabled = false;
private int numDispatchThreads = 1;
- public InputStream getStdIn() {
+ InputStream getStdIn() {
return stdIn;
}
- public FeederParams setStdIn(InputStream stdIn) {
+ FeederParams setStdIn(InputStream stdIn) {
this.stdIn = stdIn;
return this;
}
- public PrintStream getStdErr() {
+ PrintStream getStdErr() {
return stdErr;
}
- public FeederParams setStdErr(PrintStream stdErr) {
+ FeederParams setStdErr(PrintStream stdErr) {
this.stdErr = stdErr;
return this;
}
- public PrintStream getStdOut() {
+ PrintStream getStdOut() {
return stdOut;
}
- public FeederParams setStdOut(PrintStream stdOut) {
+ FeederParams setStdOut(PrintStream stdOut) {
this.stdOut = stdOut;
return this;
}
- public Route getRoute() {
+ Route getRoute() {
return route;
}
-
- public FeederParams setRoute(Route route) {
- this.route = new Route(route);
+ OutputStream getDumpStream() { return dumpStream; }
+ FeederParams setDumpStream(OutputStream dumpStream) {
+ this.dumpStream = dumpStream;
return this;
}
- public String getConfigId() {
+ String getConfigId() {
return configId;
}
- public FeederParams setConfigId(String configId) {
+ FeederParams setConfigId(String configId) {
this.configId = configId;
return this;
}
- public boolean isSerialTransferEnabled() {
+ boolean isSerialTransferEnabled() {
return serialTransferEnabled;
}
- public FeederParams setSerialTransfer(boolean serial) {
+ FeederParams setSerialTransfer(boolean serial) {
this.serialTransferEnabled = serial;
return this;
}
- public int getNumDispatchThreads() { return numDispatchThreads; }
+ int getNumDispatchThreads() { return numDispatchThreads; }
- public FeederParams parseArgs(String... args) throws ParseException {
+ FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException {
Options opts = new Options();
opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation");
- opts.addOption("n", "numthreads", true, "Number of clients for sending messages.");
+ opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id.");
+ opts.addOption("r", "route", true, "Route for sending messages. default is 'default'....");
+ opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .v8) json will be produced if no extension.");
CommandLine cmd = new DefaultParser().parse(opts, args);
- serialTransferEnabled = cmd.hasOption("s");
+ serialTransferEnabled = cmd.hasOption('s');
if (cmd.hasOption('n')) {
numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim());
}
- route = newRoute(cmd.getArgs());
- return this;
- }
-
- private static Route newRoute(String... args) {
- if (args.length == 0) {
- return Route.parse("default");
+ if (cmd.hasOption('r')) {
+ route = Route.parse(cmd.getOptionValue('r').trim());
}
- StringBuilder out = new StringBuilder();
- for (String arg : args) {
- out.append(arg).append(' ');
+ if (cmd.hasOption('o')) {
+ dumpStream = new FileOutputStream(new File(cmd.getOptionValue('o').trim()));
}
- return Route.parse(out.toString());
+
+ return this;
}
+
}
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index 0b6604fed1e..b4520c0d9e3 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -5,6 +5,7 @@ import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.json.JsonFeedReader;
+import com.yahoo.document.json.JsonWriter;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
@@ -25,6 +26,7 @@ import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
@@ -43,9 +45,7 @@ public class SimpleFeeder implements ReplyHandler {
private final DocumentTypeManager docTypeMgr = new DocumentTypeManager();
private final InputStream in;
private final PrintStream out;
- private final PrintStream err;
private final RPCMessageBus mbus;
- private final Route route;
private final SourceSession session;
private final long startTime = System.currentTimeMillis();
private final AtomicReference<Throwable> failure = new AtomicReference<>(null);
@@ -56,43 +56,115 @@ public class SimpleFeeder implements ReplyHandler {
private long nextReport = startTime + REPORT_INTERVAL;
private long sumLatency = 0;
private final int numThreads;
+ private final Destination destination;
public static void main(String[] args) throws Throwable {
new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
}
- SimpleFeeder(FeederParams params) {
- this.in = params.getStdIn();
- this.out = params.getStdOut();
- this.err = params.getStdErr();
- this.route = params.getRoute();
- this.numThreads = params.getNumDispatchThreads();
- this.mbus = newMessageBus(docTypeMgr, params.getConfigId());
- this.session = newSession(mbus, this, params.isSerialTransferEnabled());
- this.docTypeMgr.configure(params.getConfigId());
+ private interface Destination {
+ void send(VespaXMLFeedReader.Operation op);
+ void close() throws Exception;
}
- private void sendOperation(VespaXMLFeedReader.Operation op) {
- Message msg = newMessage(op);
- if (msg == null) {
- err.println("ignoring operation; " + op.getType());
- return;
+ private static class MbusDestination implements Destination {
+ private final PrintStream err;
+ private final Route route;
+ private final SourceSession session;
+ private final AtomicReference<Throwable> failure;
+ MbusDestination(SourceSession session, Route route, AtomicReference<Throwable> failure, PrintStream err) {
+ this.route = route;
+ this.err = err;
+ this.session = session;
+ this.failure = failure;
+ }
+ public void send(VespaXMLFeedReader.Operation op) {
+ Message msg = newMessage(op);
+ if (msg == null) {
+ err.println("ignoring operation; " + op.getType());
+ return;
+ }
+ msg.setContext(System.currentTimeMillis());
+ msg.setRoute(route);
+ try {
+ Error err = session.sendBlocking(msg).getError();
+ if (err != null) {
+ failure.set(new IOException(err.toString()));
+ }
+ } catch (InterruptedException e) {}
+ }
+ public void close() throws Exception {
+ session.destroy();
+ }
+ }
+
+ private static class JsonDestination implements Destination {
+ private final OutputStream outputStream;
+ private final JsonWriter writer;
+ private final AtomicLong numReplies;
+ private final AtomicReference<Throwable> failure;
+ private boolean isFirst = true;
+ JsonDestination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
+ this.outputStream = outputStream;
+ writer = new JsonWriter(outputStream);
+ this.numReplies = numReplies;
+ this.failure = failure;
+ try {
+ outputStream.write('[');
+ outputStream.write('\n');
+ } catch (IOException e) {
+ failure.set(e);
+ }
}
- msg.setContext(System.currentTimeMillis());
- msg.setRoute(route);
- try {
- Error err = session.sendBlocking(msg).getError();
- if (err != null) {
- failure.set(new IOException(err.toString()));
+ public void send(VespaXMLFeedReader.Operation op) {
+ if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) {
+ if (!isFirst) {
+ try {
+ outputStream.write(',');
+ outputStream.write('\n');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ } else {
+ isFirst = false;
+ }
+ writer.write(op.getDocument());
}
- } catch (InterruptedException e) {}
+ numReplies.incrementAndGet();
+ }
+ public void close() throws Exception {
+ outputStream.write('\n');
+ outputStream.write(']');
+ outputStream.close();
+ }
+
+ }
+
+ SimpleFeeder(FeederParams params) {
+ in = params.getStdIn();
+ out = params.getStdOut();
+ numThreads = params.getNumDispatchThreads();
+ mbus = newMessageBus(docTypeMgr, params.getConfigId());
+ session = newSession(mbus, this, params.isSerialTransferEnabled());
+ docTypeMgr.configure(params.getConfigId());
+ destination = (params.getDumpStream() != null)
+ ? new JsonDestination(params.getDumpStream(), failure, numReplies)
+ : new MbusDestination(session, params.getRoute(), failure, params.getStdErr());
}
+ private void sendOperation(VespaXMLFeedReader.Operation op) {
+ destination.send(op);
+ }
+
+ SourceSession getSourceSession() { return session; }
private FeedReader createFeedReader() throws Exception {
in.mark(8);
- byte b[] = new byte[1];
- in.read(b);
+ byte [] b = new byte[1];
+ int numRead = in.read(b);
in.reset();
+ if (numRead != b.length) {
+ throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes.");
+ }
if (b[0] == '[') {
return new JsonFeedReader(in, docTypeMgr);
} else {
@@ -134,12 +206,12 @@ public class SimpleFeeder implements ReplyHandler {
return this;
}
- void close() {
- session.destroy();
+ void close() throws Exception {
+ destination.close();
mbus.destroy();
}
- private Message newMessage(VespaXMLFeedReader.Operation op) {
+ private static Message newMessage(VespaXMLFeedReader.Operation op) {
switch (op.getType()) {
case DOCUMENT: {
PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument()));
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
index f08e494a717..b2800110a39 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
@@ -4,14 +4,20 @@ package com.yahoo.vespa.feed.perf;
import com.yahoo.messagebus.routing.Route;
import org.apache.commons.cli.ParseException;
import org.junit.Test;
-import org.mockito.Mockito;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -19,28 +25,24 @@ import static org.junit.Assert.assertTrue;
* @author Simon Thoresen Hult
*/
public class FeederParamsTest {
+ static final String TESTFILE = "test.json";
@Test
public void requireThatAccessorsWork() {
FeederParams params = new FeederParams();
- InputStream stdIn = Mockito.mock(InputStream.class);
+ InputStream stdIn = new ByteArrayInputStream(new byte[1]);
params.setStdIn(stdIn);
assertSame(stdIn, params.getStdIn());
- PrintStream stdErr = Mockito.mock(PrintStream.class);
+ PrintStream stdErr = new PrintStream(new ByteArrayOutputStream());
params.setStdErr(stdErr);
assertSame(stdErr, params.getStdErr());
- PrintStream stdOut = Mockito.mock(PrintStream.class);
+ PrintStream stdOut = new PrintStream(new ByteArrayOutputStream());
params.setStdOut(stdOut);
assertSame(stdOut, params.getStdOut());
- Route route = Route.parse("my_route");
- params.setRoute(route);
- assertEquals(route, params.getRoute());
- assertNotSame(route, params.getRoute());
-
params.setConfigId("my_config_id");
assertEquals("my_config_id", params.getConfigId());
@@ -62,7 +64,7 @@ public class FeederParamsTest {
}
@Test
- public void requireThatSerialTransferOptionIsParsed() throws ParseException {
+ public void requireThatSerialTransferOptionIsParsed() throws ParseException, FileNotFoundException {
assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled());
assertTrue(new FeederParams().parseArgs("foo", "-s").isSerialTransferEnabled());
assertTrue(new FeederParams().parseArgs("-s", "foo").isSerialTransferEnabled());
@@ -72,23 +74,30 @@ public class FeederParamsTest {
}
@Test
- public void requireThatArgumentsAreParsedAsRoute() throws ParseException {
- assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar").getRoute());
- assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-s", "foo", "bar").getRoute());
- assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "-s", "bar").getRoute());
- assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar", "-s").getRoute());
+ public void requireThatArgumentsAreParsedAsRoute() throws ParseException, FileNotFoundException {
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r foo bar").getRoute());
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("--route","foo bar").getRoute());
}
@Test
- public void requireThatRouteIsAnOptionalArgument() throws ParseException {
+ public void requireThatRouteIsAnOptionalArgument() throws ParseException, FileNotFoundException {
assertEquals(Route.parse("default"), new FeederParams().parseArgs().getRoute());
assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute());
}
@Test
- public void requireThatNumThreadsAreParsed() throws ParseException {
+ public void requireThatNumThreadsAreParsed() throws ParseException, FileNotFoundException {
assertEquals(1, new FeederParams().getNumDispatchThreads());
assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads());
}
+ @Test
+ public void requireThatDumpStreamAreParsed() throws ParseException, IOException {
+ assertNull(new FeederParams().getDumpStream());
+ OutputStream dumpStream = new FeederParams().parseArgs("-o " + TESTFILE).getDumpStream();
+ assertNotNull(dumpStream);
+ dumpStream.close();
+ assertTrue(new File(TESTFILE).delete());
+ }
+
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
index 25c56a5cc57..f93657138ca 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
@@ -61,6 +61,76 @@ public class SimpleFeederTest {
}
@Test
+ public void requireThatXML2JsonFeederWorks() throws Throwable {
+ ByteArrayOutputStream dump = new ByteArrayOutputStream();
+ assertFeed(new FeederParams().setDumpStream(dump),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='id:simple:simple::0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <update documenttype='simple' documentid='id:simple:simple::1'>" +
+ " <assign field='my_str'>bar</assign>" +
+ " </update>" +
+ " <remove documenttype='simple' documentid='id:simple:simple::2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ assertEquals(58, dump.size());
+ assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}}\n]", dump.toString());
+ }
+
+ @Test
+ public void requireThatDualPutXML2JsonFeederWorks() throws Throwable {
+ ByteArrayOutputStream dump = new ByteArrayOutputStream();
+ assertFeed(new FeederParams().setDumpStream(dump),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='id:simple:simple::0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <document documenttype='simple' documentid='id:simple:simple::1'>" +
+ " <my_str>bar</my_str>" +
+ " </document>" +
+ " <remove documenttype='simple' documentid='id:simple:simple::2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ assertEquals(115, dump.size());
+ assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}},\n {\"id\":\"id:simple:simple::1\",\"fields\":{\"my_str\":\"bar\"}}\n]", dump.toString());
+ assertFeed(dump.toString(),
+ new MessageHandler() {
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*2,.+\n");
+ }
+
+ @Test
public void requireThatJsonFeederWorks() throws Throwable {
assertFeed("[" +
" { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," +
@@ -105,7 +175,7 @@ public class SimpleFeederTest {
" <document documenttype='simple' documentid='doc:scheme:0'/>" +
"</vespafeed>",
null);
- getSourceSession(driver).close();
+ driver.feeder.getSourceSession().close();
try {
driver.run();
fail();
@@ -156,12 +226,8 @@ public class SimpleFeederTest {
assertTrue(driver.close());
}
- private static SourceSession getSourceSession(TestDriver driver) {
- return (SourceSession)getField(driver.feeder, "session");
- }
-
private static ThrottlePolicy getThrottlePolicy(TestDriver driver) {
- return (ThrottlePolicy)getField(getSourceSession(driver), "throttlePolicy");
+ return (ThrottlePolicy)getField(driver.feeder.getSourceSession(), "throttlePolicy");
}
private static Object getField(Object obj, String fieldName) {
@@ -174,9 +240,12 @@ public class SimpleFeederTest {
}
}
- private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut)
+ private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut);
+ }
+ private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut)
throws Throwable {
- TestDriver driver = new TestDriver(new FeederParams(), in, validator);
+ TestDriver driver = new TestDriver(params, in, validator);
driver.run();
assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8));
assertMatches(expectedOut, new String(driver.out.toByteArray(), StandardCharsets.UTF_8));
@@ -209,7 +278,7 @@ public class SimpleFeederTest {
feeder.run();
}
- boolean close() {
+ boolean close() throws Exception {
feeder.close();
server.close();
return true;