diff options
Diffstat (limited to 'vespa_feed_perf/src')
4 files changed, 65 insertions, 42 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java index ffe1eb42e3e..b4549fe495c 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -27,8 +27,9 @@ class FeederParams { private String configId = "client"; private OutputStream dumpStream = null; private DumpFormat dumpFormat = DumpFormat.JSON; - private boolean serialTransferEnabled = false; + private boolean benchmarkMode = false; private int numDispatchThreads = 1; + private int maxPending = 0; InputStream getStdIn() { return stdIn; @@ -81,32 +82,46 @@ class FeederParams { return this; } + FeederParams setMaxPending(int maxPending) { + this.maxPending = maxPending; + return this; + } + boolean isSerialTransferEnabled() { - return serialTransferEnabled; + return maxPending == 1; } - FeederParams setSerialTransfer(boolean serial) { - this.serialTransferEnabled = serial; + FeederParams setSerialTransfer() { + maxPending = 1; + numDispatchThreads = 1; return this; } int getNumDispatchThreads() { return numDispatchThreads; } + int getMaxPending() { return maxPending; } + boolean isBenchmarkMode() { return benchmarkMode; } FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException { Options opts = new Options(); - opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation"); + opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation and a single thread"); opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id."); + opts.addOption("m", "maxpending", true, "Max number of inflights messages. Default is auto."); opts.addOption("r", "route", true, "Route for sending messages. default is 'default'...."); + opts.addOption("b", "mode", true, "Mode for benchmarking."); opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension."); CommandLine cmd = new DefaultParser().parse(opts, args); - serialTransferEnabled = cmd.hasOption('s'); + if (cmd.hasOption('n')) { numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim()); } + if (cmd.hasOption('m')) { + maxPending = Integer.valueOf(cmd.getOptionValue('m').trim()); + } if (cmd.hasOption('r')) { route = Route.parse(cmd.getOptionValue('r').trim()); } + benchmarkMode = cmd.hasOption('b'); if (cmd.hasOption('o')) { String fileName = cmd.getOptionValue('o').trim(); dumpStream = new FileOutputStream(new File(fileName)); @@ -114,6 +129,9 @@ class FeederParams { dumpFormat = DumpFormat.VESPA; } } + if (cmd.hasOption('s')) { + setSerialTransfer(); + } return this; } diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java index dbb109aab0a..36e5cc37ea5 100644 --- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -7,6 +7,7 @@ import com.yahoo.document.DocumentId; import com.yahoo.document.DocumentPut; import com.yahoo.document.DocumentTypeManager; import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; import com.yahoo.document.json.JsonFeedReader; import com.yahoo.document.json.JsonWriter; import com.yahoo.document.serialization.DocumentDeserializer; @@ -30,6 +31,7 @@ import com.yahoo.messagebus.SourceSessionParams; import com.yahoo.messagebus.StaticThrottlePolicy; import com.yahoo.messagebus.network.rpc.RPCNetworkParams; import com.yahoo.messagebus.routing.Route; +import com.yahoo.vespaxmlparser.ConditionalFeedOperation; import com.yahoo.vespaxmlparser.FeedReader; import com.yahoo.vespaxmlparser.FeedOperation; import com.yahoo.vespaxmlparser.RemoveFeedOperation; @@ -71,6 +73,7 @@ public class SimpleFeeder implements ReplyHandler { private long sumLatency = 0; private final int numThreads; private final Destination destination; + private final boolean benchmarkMode; public static void main(String[] args) throws Throwable { new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); @@ -151,7 +154,6 @@ public class SimpleFeeder implements ReplyHandler { outputStream.write(']'); outputStream.close(); } - } static private final int NONE = 0; @@ -176,6 +178,8 @@ public class SimpleFeeder implements ReplyHandler { } } public void send(FeedOperation op) { + TestAndSetCondition cond = op.getCondition(); + buffer.putUtf8String(cond.getSelection()); DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer); int type = NONE; if (op.getType() == FeedOperation.Type.DOCUMENT) { @@ -189,9 +193,8 @@ public class SimpleFeeder implements ReplyHandler { type = REMOVE; } int sz = buffer.position(); - long hash = hash(buffer.array(), 0, sz); + long hash = hash(buffer.array(), sz); try { - header.putInt(sz); header.putInt(type); header.putLong(hash); @@ -207,8 +210,8 @@ public class SimpleFeeder implements ReplyHandler { public void close() throws Exception { outputStream.close(); } - static long hash(byte [] buf, int offset, int length) { - return XXHashFactory.fastestJavaInstance().hash64().hash(buf, offset, length, 0); + static long hash(byte [] buf, int length) { + return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0); } } @@ -230,10 +233,10 @@ public class SimpleFeeder implements ReplyHandler { } } - class LazyDocumentOperation extends FeedOperation { + class LazyDocumentOperation extends ConditionalFeedOperation { private final DocumentDeserializer deserializer; - LazyDocumentOperation(DocumentDeserializer deserializer) { - super(Type.DOCUMENT); + LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { + super(Type.DOCUMENT, condition); this.deserializer = deserializer; } @@ -242,10 +245,10 @@ public class SimpleFeeder implements ReplyHandler { return new Document(deserializer); } } - class LazyUpdateOperation extends FeedOperation { + class LazyUpdateOperation extends ConditionalFeedOperation { private final DocumentDeserializer deserializer; - LazyUpdateOperation(DocumentDeserializer deserializer) { - super(Type.UPDATE); + LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { + super(Type.UPDATE, condition); this.deserializer = deserializer; } @@ -269,17 +272,22 @@ public class SimpleFeeder implements ReplyHandler { if (read != blob.length) { throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read); } - long computedHash = VespaV1Destination.hash(blob, 0, blob.length); + long computedHash = VespaV1Destination.hash(blob, blob.length); if (computedHash != hash) { throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash); } - DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, GrowableByteBuffer.wrap(blob)); + GrowableByteBuffer buf = GrowableByteBuffer.wrap(blob); + String condition = buf.getUtf8String(); + DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, buf); + TestAndSetCondition testAndSetCondition = condition.isEmpty() + ? TestAndSetCondition.NOT_PRESENT_CONDITION + : new TestAndSetCondition(condition); if (type == DOCUMENT) { - return new LazyDocumentOperation(deser); + return new LazyDocumentOperation(deser, testAndSetCondition); } else if (type == UPDATE) { - return new LazyUpdateOperation(deser); + return new LazyUpdateOperation(deser, testAndSetCondition); } else if (type == REMOVE) { - return new RemoveFeedOperation(new DocumentId(deser)); + return new RemoveFeedOperation(new DocumentId(deser), testAndSetCondition); } else { throw new IllegalArgumentException("Unknown operation " + type); } @@ -297,8 +305,9 @@ public class SimpleFeeder implements ReplyHandler { out = params.getStdOut(); numThreads = params.getNumDispatchThreads(); mbus = newMessageBus(docTypeMgr, params.getConfigId()); - session = newSession(mbus, this, params.isSerialTransferEnabled()); + session = newSession(mbus, this, params.getMaxPending()); docTypeMgr.configure(params.getConfigId()); + benchmarkMode = params.isBenchmarkMode(); destination = (params.getDumpStream() != null) ? createDumper(params) : new MbusDestination(session, params.getRoute(), failure, params.getStdErr()); @@ -404,6 +413,7 @@ public class SimpleFeeder implements ReplyHandler { minLatency = Math.min(minLatency, latency); maxLatency = Math.max(maxLatency, latency); sumLatency += latency; + if (benchmarkMode) { return; } if (now > nextHeader) { printHeader(); nextHeader += HEADER_INTERVAL; @@ -415,12 +425,12 @@ public class SimpleFeeder implements ReplyHandler { } private void printHeader() { - out.println("total time, num messages, min latency, avg latency, max latency"); + out.println("# Time used, num ok, num error, min latency, max latency, average latency"); } private void printReport() { out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, - numReplies.get(), minLatency, sumLatency / numReplies.get(), maxLatency); + numReplies.get(), minLatency, maxLatency, sumLatency / numReplies.get()); } private static String formatErrors(Reply reply) { @@ -438,11 +448,11 @@ public class SimpleFeeder implements ReplyHandler { configId); } - private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, boolean serial) { + private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, int maxPending) { SourceSessionParams params = new SourceSessionParams(); params.setReplyHandler(replyHandler); - if (serial) { - params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1)); + if (maxPending > 0) { + params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(maxPending)); } return mbus.getMessageBus().createSourceSession(params); } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java index ab1eb27e416..d44cf41f9ab 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -47,9 +47,8 @@ public class FeederParamsTest { params.setConfigId("my_config_id"); assertEquals("my_config_id", params.getConfigId()); - params.setSerialTransfer(false); assertFalse(params.isSerialTransferEnabled()); - params.setSerialTransfer(true); + params.setSerialTransfer(); assertTrue(params.isSerialTransferEnabled()); } diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java index 1c2cac3bcee..2de7e831d04 100644 --- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java @@ -131,18 +131,14 @@ public class SimpleFeederTest { } @Test - public void requireThatDualPutXML2VespaFeederWorks() throws Throwable { + public void requireThatJson2VespaFeederWorks() throws Throwable { ByteArrayOutputStream dump = new ByteArrayOutputStream(); assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA), - "<vespafeed>" + - " <document documenttype='simple' documentid='id:simple:simple::0'>" + - " <my_str>foo</my_str>" + - " </document>" + - " <document documenttype='simple' documentid='id:simple:simple::1'>" + - " <my_str>bar</my_str>" + - " </document>" + - " <remove documenttype='simple' documentid='id:simple:simple::2'/>" + - "</vespafeed>", + "[" + + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + + " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," + + " { \"remove\": \"id:simple:simple::2\", \"condition\":\"true\"}" + + "]", new MessageHandler() { @Override @@ -155,7 +151,7 @@ public class SimpleFeederTest { "", "(.+\n)+" + "\\s*\\d+,\\s*3,.+\n"); - assertEquals(178, dump.size()); + assertEquals(187, dump.size()); assertFeed(new ByteArrayInputStream(dump.toByteArray()), new MessageHandler() { @Override @@ -261,7 +257,7 @@ public class SimpleFeederTest { @Test public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception { - TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(true), "", null); + TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(), "", null); assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass()); assertTrue(driver.close()); } |