summaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-25 15:06:55 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-25 15:06:55 +0200
commit77f8522cf3a0a8a62fd3971455042815380b7fc7 (patch)
tree0a9dc0c0074949bf96b4cf0b7a1084073e716095 /vespa_feed_perf
parenta8949c869c613d671886b87ab684b2dfef9d9ca5 (diff)
Add a binary format too.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java16
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java123
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java28
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java59
4 files changed, 209 insertions, 17 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
index 0e1b038e6cd..ffe1eb42e3e 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -19,12 +19,14 @@ import java.io.PrintStream;
*/
class FeederParams {
+ enum DumpFormat {JSON, VESPA};
private InputStream stdIn = System.in;
private PrintStream stdErr = System.err;
private PrintStream stdOut = System.out;
private Route route = Route.parse("default");
private String configId = "client";
private OutputStream dumpStream = null;
+ private DumpFormat dumpFormat = DumpFormat.JSON;
private boolean serialTransferEnabled = false;
private int numDispatchThreads = 1;
@@ -64,6 +66,12 @@ class FeederParams {
return this;
}
+ DumpFormat getDumpFormat() { return dumpFormat; }
+ FeederParams setDumpFormat(DumpFormat dumpFormat) {
+ this.dumpFormat = dumpFormat;
+ return this;
+ }
+
String getConfigId() {
return configId;
}
@@ -89,7 +97,7 @@ class FeederParams {
opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation");
opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id.");
opts.addOption("r", "route", true, "Route for sending messages. default is 'default'....");
- opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .v8) json will be produced if no extension.");
+ opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension.");
CommandLine cmd = new DefaultParser().parse(opts, args);
serialTransferEnabled = cmd.hasOption('s');
@@ -100,7 +108,11 @@ class FeederParams {
route = Route.parse(cmd.getOptionValue('r').trim());
}
if (cmd.hasOption('o')) {
- dumpStream = new FileOutputStream(new File(cmd.getOptionValue('o').trim()));
+ String fileName = cmd.getOptionValue('o').trim();
+ dumpStream = new FileOutputStream(new File(fileName));
+ if (fileName.endsWith(".vespa")) {
+ dumpFormat = DumpFormat.VESPA;
+ }
}
return this;
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index b4520c0d9e3..fd91809ea4f 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -2,14 +2,23 @@
package com.yahoo.vespa.feed.perf;
import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentUpdate;
import com.yahoo.document.json.JsonFeedReader;
import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.document.serialization.DocumentSerializerFactory;
+import com.yahoo.document.serialization.DocumentWriter;
import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.io.GrowableByteBuffer;
import com.yahoo.messagebus.Error;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageBusParams;
@@ -23,11 +32,14 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
import com.yahoo.vespaxmlparser.FeedReader;
import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+import net.jpountz.xxhash.XXHashFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -100,7 +112,7 @@ public class SimpleFeeder implements ReplyHandler {
private static class JsonDestination implements Destination {
private final OutputStream outputStream;
- private final JsonWriter writer;
+ private final DocumentWriter writer;
private final AtomicLong numReplies;
private final AtomicReference<Throwable> failure;
private boolean isFirst = true;
@@ -140,6 +152,109 @@ public class SimpleFeeder implements ReplyHandler {
}
+ static final int NONE = 0;
+ static final int DOCUMENT = 1;
+ static final int UPDATE = 2;
+ static final int REMOVE = 3;
+ private static class VespaV1Destination implements Destination {
+ private final OutputStream outputStream;
+ GrowableByteBuffer buffer = new GrowableByteBuffer(16384);
+ ByteBuffer header = ByteBuffer.allocate(16);
+ private final AtomicLong numReplies;
+ private final AtomicReference<Throwable> failure;
+ VespaV1Destination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
+ this.outputStream = outputStream;
+ this.numReplies = numReplies;
+ this.failure = failure;
+ try {
+ outputStream.write('V');
+ outputStream.write('1');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ }
+ public void send(VespaXMLFeedReader.Operation op) {
+ DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer);
+ int type = NONE;
+ if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) {
+ writer.write(op.getDocument());
+ type = DOCUMENT;
+ } else if (op.getType() == VespaXMLFeedReader.OperationType.UPDATE) {
+ writer.write(op.getDocumentUpdate());
+ type = UPDATE;
+ } else if (op.getType() == VespaXMLFeedReader.OperationType.REMOVE) {
+ writer.write(op.getRemove());
+ type = REMOVE;
+ }
+ int sz = buffer.position();
+ long hash = XXHashFactory.fastestJavaInstance().hash64().hash(buffer.array(), 0, sz, 0);
+ try {
+
+ header.putInt(sz);
+ header.putInt(type);
+ header.putLong(hash);
+ outputStream.write(header.array(), 0, header.position());
+ outputStream.write(buffer.array(), 0, buffer.position());
+ header.clear();
+ buffer.clear();
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ numReplies.incrementAndGet();
+ }
+ public void close() throws Exception {
+ outputStream.close();
+ }
+ }
+
+ class VespaV1FeedReader implements FeedReader {
+ private final InputStream in;
+ private final DocumentTypeManager mgr;
+ private final byte[] prefix = new byte[16];
+ VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException {
+ this.in = in;
+ this.mgr = mgr;
+ byte [] header = new byte[2];
+ in.read(header);
+ if ((header[0] != 'V') && (header[1] != '1')) {
+ throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header));
+ }
+ }
+ @Override
+ public void read(VespaXMLFeedReader.Operation operation) throws Exception {
+ int read = in.read(prefix);
+ if (read != prefix.length) {
+ operation.setInvalid();
+ return;
+ }
+ ByteBuffer header = ByteBuffer.wrap(prefix);
+ int sz = header.getInt();
+ int type = header.getInt();
+ long hash = header.getLong();
+ byte [] blob = new byte[sz];
+ read = in.read(blob);
+ if (read != blob.length) {
+ throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read);
+ }
+ DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, GrowableByteBuffer.wrap(blob));
+ if (type == DOCUMENT) {
+ operation.setDocument(new Document(deser));
+ } else if (type == UPDATE) {
+ operation.setDocumentUpdate(new DocumentUpdate(deser));
+ } else if (type == REMOVE) {
+ operation.setRemove(new DocumentId(deser));
+ } else {
+ throw new IllegalArgumentException("Unknown operation " + type);
+ }
+ }
+ }
+
+ Destination createDumper(FeederParams params) {
+ if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) {
+ return new VespaV1Destination(params.getDumpStream(), failure, numReplies);
+ }
+ return new JsonDestination(params.getDumpStream(), failure, numReplies);
+ }
SimpleFeeder(FeederParams params) {
in = params.getStdIn();
out = params.getStdOut();
@@ -148,7 +263,7 @@ public class SimpleFeeder implements ReplyHandler {
session = newSession(mbus, this, params.isSerialTransferEnabled());
docTypeMgr.configure(params.getConfigId());
destination = (params.getDumpStream() != null)
- ? new JsonDestination(params.getDumpStream(), failure, numReplies)
+ ? createDumper(params)
: new MbusDestination(session, params.getRoute(), failure, params.getStdErr());
}
@@ -159,7 +274,7 @@ public class SimpleFeeder implements ReplyHandler {
SourceSession getSourceSession() { return session; }
private FeedReader createFeedReader() throws Exception {
in.mark(8);
- byte [] b = new byte[1];
+ byte [] b = new byte[2];
int numRead = in.read(b);
in.reset();
if (numRead != b.length) {
@@ -167,6 +282,8 @@ public class SimpleFeeder implements ReplyHandler {
}
if (b[0] == '[') {
return new JsonFeedReader(in, docTypeMgr);
+ } else if ((b[0] == 'V') && (b[1] == '1')) {
+ return new VespaV1FeedReader(in, docTypeMgr);
} else {
return new VespaXMLFeedReader(in, docTypeMgr);
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
index b2800110a39..ab1eb27e416 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
@@ -11,7 +11,6 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintStream;
import static org.junit.Assert.assertEquals;
@@ -25,7 +24,9 @@ import static org.junit.Assert.assertTrue;
* @author Simon Thoresen Hult
*/
public class FeederParamsTest {
- static final String TESTFILE = "test.json";
+ static final String TESTFILE_JSON = "test.json";
+ static final String TESTFILE_VESPA = "test.vespa";
+ static final String TESTFILE_UNKNOWN = "test.xyz";
@Test
public void requireThatAccessorsWork() {
@@ -94,10 +95,25 @@ public class FeederParamsTest {
@Test
public void requireThatDumpStreamAreParsed() throws ParseException, IOException {
assertNull(new FeederParams().getDumpStream());
- OutputStream dumpStream = new FeederParams().parseArgs("-o " + TESTFILE).getDumpStream();
- assertNotNull(dumpStream);
- dumpStream.close();
- assertTrue(new File(TESTFILE).delete());
+
+ FeederParams p = new FeederParams().parseArgs("-o " + TESTFILE_JSON);
+ assertNotNull(p.getDumpStream());
+ assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat());
+ p.getDumpStream().close();
+
+ p = new FeederParams().parseArgs("-o " + TESTFILE_VESPA);
+ assertNotNull(p.getDumpStream());
+ assertEquals(FeederParams.DumpFormat.VESPA, p.getDumpFormat());
+ p.getDumpStream().close();
+
+ p = new FeederParams().parseArgs("-o " + TESTFILE_UNKNOWN);
+ assertNotNull(p.getDumpStream());
+ assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat());
+ p.getDumpStream().close();
+
+ assertTrue(new File(TESTFILE_JSON).delete());
+ assertTrue(new File(TESTFILE_VESPA).delete());
+ assertTrue(new File(TESTFILE_UNKNOWN).delete());
}
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
index f93657138ca..1c2cac3bcee 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
@@ -11,7 +11,6 @@ import com.yahoo.messagebus.ErrorCode;
import com.yahoo.messagebus.Message;
import com.yahoo.messagebus.MessageHandler;
import com.yahoo.messagebus.Reply;
-import com.yahoo.messagebus.SourceSession;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.ThrottlePolicy;
import org.junit.Test;
@@ -19,6 +18,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -131,6 +131,46 @@ public class SimpleFeederTest {
}
@Test
+ public void requireThatDualPutXML2VespaFeederWorks() throws Throwable {
+ ByteArrayOutputStream dump = new ByteArrayOutputStream();
+ assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='id:simple:simple::0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <document documenttype='simple' documentid='id:simple:simple::1'>" +
+ " <my_str>bar</my_str>" +
+ " </document>" +
+ " <remove documenttype='simple' documentid='id:simple:simple::2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ assertEquals(178, dump.size());
+ assertFeed(new ByteArrayInputStream(dump.toByteArray()),
+ new MessageHandler() {
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ }
+
+ @Test
public void requireThatJsonFeederWorks() throws Throwable {
assertFeed("[" +
" { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," +
@@ -243,8 +283,13 @@ public class SimpleFeederTest {
private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut);
}
- private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut)
- throws Throwable {
+ private static void assertFeed(InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut);
+ }
+ private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ assertFeed(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator, expectedErr, expectedOut);
+ }
+ private static void assertFeed(FeederParams params, InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
TestDriver driver = new TestDriver(params, in, validator);
driver.run();
assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8));
@@ -265,12 +310,14 @@ public class SimpleFeederTest {
final SimpleFeeder feeder;
final SimpleServer server;
- TestDriver(FeederParams params, String in, MessageHandler validator)
- throws IOException, ListenFailedException {
+ TestDriver(FeederParams params, String in, MessageHandler validator) throws IOException, ListenFailedException {
+ this(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator);
+ }
+ TestDriver(FeederParams params, InputStream in, MessageHandler validator) throws IOException, ListenFailedException {
server = new SimpleServer(CONFIG_DIR, validator);
feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR)
.setStdErr(new PrintStream(err))
- .setStdIn(new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)))
+ .setStdIn(in)
.setStdOut(new PrintStream(out)));
}