diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-04-25 18:32:57 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-25 18:32:57 +0200 |
commit | 249fe76c9437d0f1a033294df98d8d8101baef2c (patch) | |
tree | 95c11718559291fc36d7f6a3ac5e58c435fec508 /vespa_feed_perf | |
parent | a21f0b780458f792dc7e30c134d917fa60d72d02 (diff) | |
parent | e4ee020164e6cb98f8706bae6b7f284ccee03ae3 (diff) |
Merge pull request #9187 from vespa-engine/balder/add-binary-option
Add a binary format too.
Diffstat (limited to 'vespa_feed_perf')
4 files changed, 216 insertions, 17 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index 0e1b038e6cd..ffe1eb42e3e 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -19,12 +19,14 @@ import java.io.PrintStream; */ class FeederParams { + enum DumpFormat {JSON, VESPA}; private InputStream stdIn = System.in; private PrintStream stdErr = System.err; private PrintStream stdOut = System.out; private Route route = Route.parse("default"); private String configId = "client"; private OutputStream dumpStream = null; + private DumpFormat dumpFormat = DumpFormat.JSON; private boolean serialTransferEnabled = false; private int numDispatchThreads = 1; @@ -64,6 +66,12 @@ class FeederParams { return this; } + DumpFormat getDumpFormat() { return dumpFormat; } + FeederParams setDumpFormat(DumpFormat dumpFormat) { + this.dumpFormat = dumpFormat; + return this; + } + String getConfigId() { return configId; } @@ -89,7 +97,7 @@ class FeederParams { opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation"); opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id."); opts.addOption("r", "route", true, "Route for sending messages. default is 'default'...."); - opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .v8) json will be produced if no extension."); + opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension."); CommandLine cmd = new DefaultParser().parse(opts, args); serialTransferEnabled = cmd.hasOption('s'); @@ -100,7 +108,11 @@ class FeederParams { route = Route.parse(cmd.getOptionValue('r').trim()); } if (cmd.hasOption('o')) { - dumpStream = new FileOutputStream(new File(cmd.getOptionValue('o').trim())); + String fileName = cmd.getOptionValue('o').trim(); + dumpStream = new FileOutputStream(new File(fileName)); + if (fileName.endsWith(".vespa")) { + dumpFormat = DumpFormat.VESPA; + } } return this; diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index b4520c0d9e3..1fdbd2db9c0 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -2,14 +2,23 @@ package com.yahoo.vespa.feed.perf; import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentUpdate; import com.yahoo.document.json.JsonFeedReader; import com.yahoo.document.json.JsonWriter; +import com.yahoo.document.serialization.DocumentDeserializer; +import com.yahoo.document.serialization.DocumentDeserializerFactory; +import com.yahoo.document.serialization.DocumentSerializer; +import com.yahoo.document.serialization.DocumentSerializerFactory; +import com.yahoo.document.serialization.DocumentWriter; import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.io.GrowableByteBuffer; import com.yahoo.messagebus.Error; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageBusParams; @@ -23,11 +32,14 @@ import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.routing.Route; import com.yahoo.vespaxmlparser.FeedReader; import com.yahoo.vespaxmlparser.VespaXMLFeedReader; +import net.jpountz.xxhash.XXHashFactory; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.concurrent.ExecutorService; import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -100,7 +112,7 @@ public class SimpleFeeder implements ReplyHandler { private static class JsonDestination implements Destination { private final OutputStream outputStream; - private final JsonWriter writer; + private final DocumentWriter writer; private final AtomicLong numReplies; private final AtomicReference<Throwable> failure; private boolean isFirst = true; @@ -140,6 +152,116 @@ public class SimpleFeeder implements ReplyHandler { } + static final int NONE = 0; + static final int DOCUMENT = 1; + static final int UPDATE = 2; + static final int REMOVE = 3; + private static class VespaV1Destination implements Destination { + private final OutputStream outputStream; + GrowableByteBuffer buffer = new GrowableByteBuffer(16384); + ByteBuffer header = ByteBuffer.allocate(16); + private final AtomicLong numReplies; + private final AtomicReference<Throwable> failure; + VespaV1Destination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) { + this.outputStream = outputStream; + this.numReplies = numReplies; + this.failure = failure; + try { + outputStream.write('V'); + outputStream.write('1'); + } catch (IOException e) { + failure.set(e); + } + } + public void send(VespaXMLFeedReader.Operation op) { + DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer); + int type = NONE; + if (op.getType() == VespaXMLFeedReader.OperationType.DOCUMENT) { + writer.write(op.getDocument()); + type = DOCUMENT; + } else if (op.getType() == VespaXMLFeedReader.OperationType.UPDATE) { + writer.write(op.getDocumentUpdate()); + type = UPDATE; + } else if (op.getType() == VespaXMLFeedReader.OperationType.REMOVE) { + writer.write(op.getRemove()); + type = REMOVE; + } + int sz = buffer.position(); + long hash = hash(buffer.array(), 0, sz); + try { + + header.putInt(sz); + header.putInt(type); + header.putLong(hash); + outputStream.write(header.array(), 0, header.position()); + outputStream.write(buffer.array(), 0, buffer.position()); + header.clear(); + buffer.clear(); + } catch (IOException e) { + failure.set(e); + } + numReplies.incrementAndGet(); + } + public void close() throws Exception { + outputStream.close(); + } + static long hash(byte [] buf, int offset, int length) { + return XXHashFactory.fastestJavaInstance().hash64().hash(buf, offset, length, 0); + } + } + + static class VespaV1FeedReader implements FeedReader { + private final InputStream in; + private final DocumentTypeManager mgr; + private final byte[] prefix = new byte[16]; + VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException { + this.in = in; + this.mgr = mgr; + byte [] header = new byte[2]; + in.read(header); + if ((header[0] != 'V') && (header[1] != '1')) { + throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header)); + } + } + @Override + public void read(VespaXMLFeedReader.Operation operation) throws Exception { + int read = in.read(prefix); + if (read != prefix.length) { + operation.setInvalid(); + return; + } + ByteBuffer header = ByteBuffer.wrap(prefix); + int sz = header.getInt(); + int type = header.getInt(); + long hash = header.getLong(); + byte [] blob = new byte[sz]; + read = in.read(blob); + if (read != blob.length) { + throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read); + } + long computedHash = VespaV1Destination.hash(blob, 0, blob.length); + if (computedHash != hash) { + throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash); + } + DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, GrowableByteBuffer.wrap(blob)); + if (type == DOCUMENT) { + operation.setDocument(new Document(deser)); + } else if (type == UPDATE) { + operation.setDocumentUpdate(new DocumentUpdate(deser)); + } else if (type == REMOVE) { + operation.setRemove(new DocumentId(deser)); + } else { + throw new IllegalArgumentException("Unknown operation " + type); + } + } + } + + Destination createDumper(FeederParams params) { + if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) { + return new VespaV1Destination(params.getDumpStream(), failure, numReplies); + } + return new JsonDestination(params.getDumpStream(), failure, numReplies); + } SimpleFeeder(FeederParams params) { in = params.getStdIn(); out = params.getStdOut(); @@ -148,7 +270,7 @@ public class SimpleFeeder implements ReplyHandler { session = newSession(mbus, this, params.isSerialTransferEnabled()); docTypeMgr.configure(params.getConfigId()); destination = (params.getDumpStream() != null) - ? new JsonDestination(params.getDumpStream(), failure, numReplies) + ? createDumper(params) : new MbusDestination(session, params.getRoute(), failure, params.getStdErr()); } @@ -159,7 +281,7 @@ public class SimpleFeeder implements ReplyHandler { SourceSession getSourceSession() { return session; } private FeedReader createFeedReader() throws Exception { in.mark(8); - byte [] b = new byte[1]; + byte [] b = new byte[2]; int numRead = in.read(b); in.reset(); if (numRead != b.length) { @@ -167,6 +289,8 @@ public class SimpleFeeder implements ReplyHandler { } if (b[0] == '[') { return new JsonFeedReader(in, docTypeMgr); + } else if ((b[0] == 'V') && (b[1] == '1')) { + return new VespaV1FeedReader(in, docTypeMgr); } else { return new VespaXMLFeedReader(in, docTypeMgr); } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index b2800110a39..ab1eb27e416 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -11,7 +11,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.io.PrintStream; import static org.junit.Assert.assertEquals; @@ -25,7 +24,9 @@ import static org.junit.Assert.assertTrue; * @author Simon Thoresen Hult */ public class FeederParamsTest { - static final String TESTFILE = "test.json"; + static final String TESTFILE_JSON = "test.json"; + static final String TESTFILE_VESPA = "test.vespa"; + static final String TESTFILE_UNKNOWN = "test.xyz"; @Test public void requireThatAccessorsWork() { @@ -94,10 +95,25 @@ public class FeederParamsTest { @Test public void requireThatDumpStreamAreParsed() throws ParseException, IOException { assertNull(new FeederParams().getDumpStream()); - OutputStream dumpStream = new FeederParams().parseArgs("-o " + TESTFILE).getDumpStream(); - assertNotNull(dumpStream); - dumpStream.close(); - assertTrue(new File(TESTFILE).delete()); + + FeederParams p = new FeederParams().parseArgs("-o " + TESTFILE_JSON); + assertNotNull(p.getDumpStream()); + assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat()); + p.getDumpStream().close(); + + p = new FeederParams().parseArgs("-o " + TESTFILE_VESPA); + assertNotNull(p.getDumpStream()); + assertEquals(FeederParams.DumpFormat.VESPA, p.getDumpFormat()); + p.getDumpStream().close(); + + p = new FeederParams().parseArgs("-o " + TESTFILE_UNKNOWN); + assertNotNull(p.getDumpStream()); + assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat()); + p.getDumpStream().close(); + + assertTrue(new File(TESTFILE_JSON).delete()); + assertTrue(new File(TESTFILE_VESPA).delete()); + assertTrue(new File(TESTFILE_UNKNOWN).delete()); } } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java index f93657138ca..1c2cac3bcee 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java @@ -11,7 +11,6 @@ import com.yahoo.messagebus.ErrorCode; import com.yahoo.messagebus.Message; import com.yahoo.messagebus.MessageHandler; import com.yahoo.messagebus.Reply; -import com.yahoo.messagebus.SourceSession; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.ThrottlePolicy; import org.junit.Test; @@ -19,6 +18,7 @@ import org.junit.Test; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -131,6 +131,46 @@ public class SimpleFeederTest { } @Test + public void requireThatDualPutXML2VespaFeederWorks() throws Throwable { + ByteArrayOutputStream dump = new ByteArrayOutputStream(); + assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA), + "<vespafeed>" + + " <document documenttype='simple' documentid='id:simple:simple::0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <document documenttype='simple' documentid='id:simple:simple::1'>" + + " <my_str>bar</my_str>" + + " </document>" + + " <remove documenttype='simple' documentid='id:simple:simple::2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + assertEquals(178, dump.size()); + assertFeed(new ByteArrayInputStream(dump.toByteArray()), + new MessageHandler() { + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + } + + @Test public void requireThatJsonFeederWorks() throws Throwable { assertFeed("[" + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + @@ -243,8 +283,13 @@ public class SimpleFeederTest { private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); } - private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) - throws Throwable { + private static void assertFeed(InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); + } + private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + assertFeed(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator, expectedErr, expectedOut); + } + private static void assertFeed(FeederParams params, InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { TestDriver driver = new TestDriver(params, in, validator); driver.run(); assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8)); @@ -265,12 +310,14 @@ public class SimpleFeederTest { final SimpleFeeder feeder; final SimpleServer server; - TestDriver(FeederParams params, String in, MessageHandler validator) - throws IOException, ListenFailedException { + TestDriver(FeederParams params, String in, MessageHandler validator) throws IOException, ListenFailedException { + this(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator); + } + TestDriver(FeederParams params, InputStream in, MessageHandler validator) throws IOException, ListenFailedException { server = new SimpleServer(CONFIG_DIR, validator); feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR) .setStdErr(new PrintStream(err)) - .setStdIn(new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8))) + .setStdIn(in) .setStdOut(new PrintStream(out))); } |