aboutsummaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-28 20:32:43 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-28 20:32:43 +0200
commit996c51dc08e2ac0361eb1efb09e5ae7e86adb925 (patch)
tree6ccb17817e8c10d0f9a7c6f5228741a9c9b576dd /vespa_feed_perf
parent19c4ff5ed6c259feaf92d54871bc2004f928c805 (diff)
Close the gap to vespafeeder and add support for the test-and-set condition.
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java30
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java54
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java3
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java20
4 files changed, 65 insertions, 42 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
index ffe1eb42e3e..b4549fe495c 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -27,8 +27,9 @@ class FeederParams {
private String configId = "client";
private OutputStream dumpStream = null;
private DumpFormat dumpFormat = DumpFormat.JSON;
- private boolean serialTransferEnabled = false;
+ private boolean benchmarkMode = false;
private int numDispatchThreads = 1;
+ private int maxPending = 0;
InputStream getStdIn() {
return stdIn;
@@ -81,32 +82,46 @@ class FeederParams {
return this;
}
+ FeederParams setMaxPending(int maxPending) {
+ this.maxPending = maxPending;
+ return this;
+ }
+
boolean isSerialTransferEnabled() {
- return serialTransferEnabled;
+ return maxPending == 1;
}
- FeederParams setSerialTransfer(boolean serial) {
- this.serialTransferEnabled = serial;
+ FeederParams setSerialTransfer() {
+ maxPending = 1;
+ numDispatchThreads = 1;
return this;
}
int getNumDispatchThreads() { return numDispatchThreads; }
+ int getMaxPending() { return maxPending; }
+ boolean isBenchmarkMode() { return benchmarkMode; }
FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException {
Options opts = new Options();
- opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation");
+ opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation and a single thread");
opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id.");
+ opts.addOption("m", "maxpending", true, "Max number of inflights messages. Default is auto.");
opts.addOption("r", "route", true, "Route for sending messages. default is 'default'....");
+ opts.addOption("b", "mode", true, "Mode for benchmarking.");
opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension.");
CommandLine cmd = new DefaultParser().parse(opts, args);
- serialTransferEnabled = cmd.hasOption('s');
+
if (cmd.hasOption('n')) {
numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim());
}
+ if (cmd.hasOption('m')) {
+ maxPending = Integer.valueOf(cmd.getOptionValue('m').trim());
+ }
if (cmd.hasOption('r')) {
route = Route.parse(cmd.getOptionValue('r').trim());
}
+ benchmarkMode = cmd.hasOption('b');
if (cmd.hasOption('o')) {
String fileName = cmd.getOptionValue('o').trim();
dumpStream = new FileOutputStream(new File(fileName));
@@ -114,6 +129,9 @@ class FeederParams {
dumpFormat = DumpFormat.VESPA;
}
}
+ if (cmd.hasOption('s')) {
+ setSerialTransfer();
+ }
return this;
}
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index dbb109aab0a..36e5cc37ea5 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -7,6 +7,7 @@ import com.yahoo.document.DocumentId;
import com.yahoo.document.DocumentPut;
import com.yahoo.document.DocumentTypeManager;
import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
import com.yahoo.document.json.JsonFeedReader;
import com.yahoo.document.json.JsonWriter;
import com.yahoo.document.serialization.DocumentDeserializer;
@@ -30,6 +31,7 @@ import com.yahoo.messagebus.SourceSessionParams;
import com.yahoo.messagebus.StaticThrottlePolicy;
import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespaxmlparser.ConditionalFeedOperation;
import com.yahoo.vespaxmlparser.FeedReader;
import com.yahoo.vespaxmlparser.FeedOperation;
import com.yahoo.vespaxmlparser.RemoveFeedOperation;
@@ -71,6 +73,7 @@ public class SimpleFeeder implements ReplyHandler {
private long sumLatency = 0;
private final int numThreads;
private final Destination destination;
+ private final boolean benchmarkMode;
public static void main(String[] args) throws Throwable {
new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
@@ -151,7 +154,6 @@ public class SimpleFeeder implements ReplyHandler {
outputStream.write(']');
outputStream.close();
}
-
}
static private final int NONE = 0;
@@ -176,6 +178,8 @@ public class SimpleFeeder implements ReplyHandler {
}
}
public void send(FeedOperation op) {
+ TestAndSetCondition cond = op.getCondition();
+ buffer.putUtf8String(cond.getSelection());
DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer);
int type = NONE;
if (op.getType() == FeedOperation.Type.DOCUMENT) {
@@ -189,9 +193,8 @@ public class SimpleFeeder implements ReplyHandler {
type = REMOVE;
}
int sz = buffer.position();
- long hash = hash(buffer.array(), 0, sz);
+ long hash = hash(buffer.array(), sz);
try {
-
header.putInt(sz);
header.putInt(type);
header.putLong(hash);
@@ -207,8 +210,8 @@ public class SimpleFeeder implements ReplyHandler {
public void close() throws Exception {
outputStream.close();
}
- static long hash(byte [] buf, int offset, int length) {
- return XXHashFactory.fastestJavaInstance().hash64().hash(buf, offset, length, 0);
+ static long hash(byte [] buf, int length) {
+ return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0);
}
}
@@ -230,10 +233,10 @@ public class SimpleFeeder implements ReplyHandler {
}
}
- class LazyDocumentOperation extends FeedOperation {
+ class LazyDocumentOperation extends ConditionalFeedOperation {
private final DocumentDeserializer deserializer;
- LazyDocumentOperation(DocumentDeserializer deserializer) {
- super(Type.DOCUMENT);
+ LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
+ super(Type.DOCUMENT, condition);
this.deserializer = deserializer;
}
@@ -242,10 +245,10 @@ public class SimpleFeeder implements ReplyHandler {
return new Document(deserializer);
}
}
- class LazyUpdateOperation extends FeedOperation {
+ class LazyUpdateOperation extends ConditionalFeedOperation {
private final DocumentDeserializer deserializer;
- LazyUpdateOperation(DocumentDeserializer deserializer) {
- super(Type.UPDATE);
+ LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
+ super(Type.UPDATE, condition);
this.deserializer = deserializer;
}
@@ -269,17 +272,22 @@ public class SimpleFeeder implements ReplyHandler {
if (read != blob.length) {
throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read);
}
- long computedHash = VespaV1Destination.hash(blob, 0, blob.length);
+ long computedHash = VespaV1Destination.hash(blob, blob.length);
if (computedHash != hash) {
throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash);
}
- DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, GrowableByteBuffer.wrap(blob));
+ GrowableByteBuffer buf = GrowableByteBuffer.wrap(blob);
+ String condition = buf.getUtf8String();
+ DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, buf);
+ TestAndSetCondition testAndSetCondition = condition.isEmpty()
+ ? TestAndSetCondition.NOT_PRESENT_CONDITION
+ : new TestAndSetCondition(condition);
if (type == DOCUMENT) {
- return new LazyDocumentOperation(deser);
+ return new LazyDocumentOperation(deser, testAndSetCondition);
} else if (type == UPDATE) {
- return new LazyUpdateOperation(deser);
+ return new LazyUpdateOperation(deser, testAndSetCondition);
} else if (type == REMOVE) {
- return new RemoveFeedOperation(new DocumentId(deser));
+ return new RemoveFeedOperation(new DocumentId(deser), testAndSetCondition);
} else {
throw new IllegalArgumentException("Unknown operation " + type);
}
@@ -297,8 +305,9 @@ public class SimpleFeeder implements ReplyHandler {
out = params.getStdOut();
numThreads = params.getNumDispatchThreads();
mbus = newMessageBus(docTypeMgr, params.getConfigId());
- session = newSession(mbus, this, params.isSerialTransferEnabled());
+ session = newSession(mbus, this, params.getMaxPending());
docTypeMgr.configure(params.getConfigId());
+ benchmarkMode = params.isBenchmarkMode();
destination = (params.getDumpStream() != null)
? createDumper(params)
: new MbusDestination(session, params.getRoute(), failure, params.getStdErr());
@@ -404,6 +413,7 @@ public class SimpleFeeder implements ReplyHandler {
minLatency = Math.min(minLatency, latency);
maxLatency = Math.max(maxLatency, latency);
sumLatency += latency;
+ if (benchmarkMode) { return; }
if (now > nextHeader) {
printHeader();
nextHeader += HEADER_INTERVAL;
@@ -415,12 +425,12 @@ public class SimpleFeeder implements ReplyHandler {
}
private void printHeader() {
- out.println("total time, num messages, min latency, avg latency, max latency");
+ out.println("# Time used, num ok, num error, min latency, max latency, average latency");
}
private void printReport() {
out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
- numReplies.get(), minLatency, sumLatency / numReplies.get(), maxLatency);
+ numReplies.get(), minLatency, maxLatency, sumLatency / numReplies.get());
}
private static String formatErrors(Reply reply) {
@@ -438,11 +448,11 @@ public class SimpleFeeder implements ReplyHandler {
configId);
}
- private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, boolean serial) {
+ private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, int maxPending) {
SourceSessionParams params = new SourceSessionParams();
params.setReplyHandler(replyHandler);
- if (serial) {
- params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1));
+ if (maxPending > 0) {
+ params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(maxPending));
}
return mbus.getMessageBus().createSourceSession(params);
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
index ab1eb27e416..d44cf41f9ab 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
@@ -47,9 +47,8 @@ public class FeederParamsTest {
params.setConfigId("my_config_id");
assertEquals("my_config_id", params.getConfigId());
- params.setSerialTransfer(false);
assertFalse(params.isSerialTransferEnabled());
- params.setSerialTransfer(true);
+ params.setSerialTransfer();
assertTrue(params.isSerialTransferEnabled());
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
index 1c2cac3bcee..2de7e831d04 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
@@ -131,18 +131,14 @@ public class SimpleFeederTest {
}
@Test
- public void requireThatDualPutXML2VespaFeederWorks() throws Throwable {
+ public void requireThatJson2VespaFeederWorks() throws Throwable {
ByteArrayOutputStream dump = new ByteArrayOutputStream();
assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA),
- "<vespafeed>" +
- " <document documenttype='simple' documentid='id:simple:simple::0'>" +
- " <my_str>foo</my_str>" +
- " </document>" +
- " <document documenttype='simple' documentid='id:simple:simple::1'>" +
- " <my_str>bar</my_str>" +
- " </document>" +
- " <remove documenttype='simple' documentid='id:simple:simple::2'/>" +
- "</vespafeed>",
+ "[" +
+ " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," +
+ " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," +
+ " { \"remove\": \"id:simple:simple::2\", \"condition\":\"true\"}" +
+ "]",
new MessageHandler() {
@Override
@@ -155,7 +151,7 @@ public class SimpleFeederTest {
"",
"(.+\n)+" +
"\\s*\\d+,\\s*3,.+\n");
- assertEquals(178, dump.size());
+ assertEquals(187, dump.size());
assertFeed(new ByteArrayInputStream(dump.toByteArray()),
new MessageHandler() {
@Override
@@ -261,7 +257,7 @@ public class SimpleFeederTest {
@Test
public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception {
- TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(true), "", null);
+ TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(), "", null);
assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass());
assertTrue(driver.close());
}