summaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vespa_feed_perf
Publish
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/.gitignore2
-rw-r--r--vespa_feed_perf/OWNERS1
-rw-r--r--vespa_feed_perf/pom.xml89
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java99
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java179
-rwxr-xr-xvespa_feed_perf/src/main/sh/vespa-feed-perf63
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java88
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java198
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java63
-rw-r--r--vespa_feed_perf/src/test/resources/documentmanager.cfg50
10 files changed, 832 insertions, 0 deletions
diff --git a/vespa_feed_perf/.gitignore b/vespa_feed_perf/.gitignore
new file mode 100644
index 00000000000..12251442258
--- /dev/null
+++ b/vespa_feed_perf/.gitignore
@@ -0,0 +1,2 @@
+/target
+/pom.xml.build
diff --git a/vespa_feed_perf/OWNERS b/vespa_feed_perf/OWNERS
new file mode 100644
index 00000000000..0e39145d8c3
--- /dev/null
+++ b/vespa_feed_perf/OWNERS
@@ -0,0 +1 @@
+dybdahl
diff --git a/vespa_feed_perf/pom.xml b/vespa_feed_perf/pom.xml
new file mode 100644
index 00000000000..c78d18f055a
--- /dev/null
+++ b/vespa_feed_perf/pom.xml
@@ -0,0 +1,89 @@
+<?xml version="1.0"?>
+<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
+ http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <!-- This module is used in 4 tests:
+ interpolated_lookup.rb, match_phase_degradation_test.rb reportcoverage.rb slow_query.rb
+ TODO: Remove usage and remove this module -->
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>parent</artifactId>
+ <version>6-SNAPSHOT</version>
+ <relativePath>../parent/pom.xml</relativePath>
+ </parent>
+ <artifactId>vespa_feed_perf</artifactId>
+ <version>6-SNAPSHOT</version>
+ <packaging>jar</packaging>
+ <name>${project.artifactId}</name>
+ <dependencies>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>component</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>documentapi</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.yahoo.vespa</groupId>
+ <artifactId>messagebus</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <build>
+ <resources>
+ <resource>
+ <directory>src/test/resources</directory>
+ <filtering>false</filtering>
+ </resource>
+ </resources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <archive>
+ <manifest>
+ <mainClass>com.yahoo.vespa.feed.perf.SimpleFeeder</mainClass>
+ </manifest>
+ </archive>
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ </configuration>
+ <executions>
+ <execution>
+ <id>make-assembly</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
new file mode 100644
index 00000000000..ed73ba72f4b
--- /dev/null
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -0,0 +1,99 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.messagebus.routing.Route;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import java.io.InputStream;
+import java.io.PrintStream;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class FeederParams {
+
+ private InputStream stdIn = System.in;
+ private PrintStream stdErr = System.err;
+ private PrintStream stdOut = System.out;
+ private Route route = Route.parse("default");
+ private String configId = "client";
+ private boolean serialTransferEnabled = false;
+
+ public InputStream getStdIn() {
+ return stdIn;
+ }
+
+ public FeederParams setStdIn(InputStream stdIn) {
+ this.stdIn = stdIn;
+ return this;
+ }
+
+ public PrintStream getStdErr() {
+ return stdErr;
+ }
+
+ public FeederParams setStdErr(PrintStream stdErr) {
+ this.stdErr = stdErr;
+ return this;
+ }
+
+ public PrintStream getStdOut() {
+ return stdOut;
+ }
+
+ public FeederParams setStdOut(PrintStream stdOut) {
+ this.stdOut = stdOut;
+ return this;
+ }
+
+ public Route getRoute() {
+ return route;
+ }
+
+ public FeederParams setRoute(Route route) {
+ this.route = new Route(route);
+ return this;
+ }
+
+ public String getConfigId() {
+ return configId;
+ }
+
+ public FeederParams setConfigId(String configId) {
+ this.configId = configId;
+ return this;
+ }
+
+ public boolean isSerialTransferEnabled() {
+ return serialTransferEnabled;
+ }
+
+ public FeederParams setSerialTransfer(boolean serial) {
+ this.serialTransferEnabled = serial;
+ return this;
+ }
+
+ public FeederParams parseArgs(String... args) throws ParseException {
+ Options opts = new Options();
+ opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation");
+
+ CommandLine cmd = new DefaultParser().parse(opts, args);
+ serialTransferEnabled = cmd.hasOption("s");
+ route = newRoute(cmd.getArgs());
+ return this;
+ }
+
+ private static Route newRoute(String... args) {
+ if (args.length == 0) {
+ return Route.parse("default");
+ }
+ StringBuilder out = new StringBuilder();
+ for (String arg : args) {
+ out.append(arg).append(' ');
+ }
+ return Route.parse(out.toString());
+ }
+}
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
new file mode 100644
index 00000000000..e5f21506d59
--- /dev/null
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -0,0 +1,179 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.messagebus.protocol.*;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.RPCMessageBus;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class SimpleFeeder implements ReplyHandler {
+
+ private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10);
+ private final static long HEADER_INTERVAL = REPORT_INTERVAL * 24;
+ private final DocumentTypeManager docTypeMgr = new DocumentTypeManager();
+ private final InputStream in;
+ private final PrintStream out;
+ private final PrintStream err;
+ private final RPCMessageBus mbus;
+ private final Route route;
+ private final SourceSession session;
+ private final long startTime = System.currentTimeMillis();
+ private volatile Throwable failure;
+ private volatile long numReplies = 0;
+ private long maxLatency = Long.MIN_VALUE;
+ private long minLatency = Long.MAX_VALUE;
+ private long nextHeader = startTime + HEADER_INTERVAL;
+ private long nextReport = startTime + REPORT_INTERVAL;
+ private long numMessages = 0;
+ private long sumLatency = 0;
+
+ public static void main(String[] args) throws Throwable {
+ new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
+ }
+
+ public SimpleFeeder(FeederParams params) {
+ this.in = params.getStdIn();
+ this.out = params.getStdOut();
+ this.err = params.getStdErr();
+ this.route = params.getRoute();
+ this.mbus = newMessageBus(docTypeMgr, params.getConfigId());
+ this.session = newSession(mbus, this, params.isSerialTransferEnabled());
+ this.docTypeMgr.configure(params.getConfigId());
+ }
+
+ public SimpleFeeder run() throws Throwable {
+ VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr);
+ VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation();
+ printHeader();
+ while (failure == null) {
+ reader.read(op);
+ if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) {
+ break;
+ }
+ Message msg = newMessage(op);
+ if (msg == null) {
+ err.println("ignoring operation; " + op.getType());
+ continue; // ignore
+ }
+ msg.setContext(System.currentTimeMillis());
+ msg.setRoute(route);
+ Error err = session.sendBlocking(msg).getError();
+ if (err != null) {
+ throw new IOException(err.toString());
+ }
+ ++numMessages;
+ }
+ while (failure == null && numReplies < numMessages) {
+ Thread.sleep(100);
+ }
+ if (failure != null) {
+ throw failure;
+ }
+ printReport();
+ return this;
+ }
+
+ public void close() {
+ session.destroy();
+ mbus.destroy();
+ }
+
+ private Message newMessage(VespaXMLFeedReader.Operation op) {
+ switch (op.getType()) {
+ case DOCUMENT: {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument()));
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ case REMOVE: {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove());
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ case UPDATE: {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate());
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ default:
+ return null;
+ }
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ if (failure != null) {
+ return;
+ }
+ if (reply.hasErrors()) {
+ failure = new IOException(formatErrors(reply));
+ return;
+ }
+ long now = System.currentTimeMillis();
+ long latency = now - (long)reply.getContext();
+ minLatency = Math.min(minLatency, latency);
+ maxLatency = Math.max(maxLatency, latency);
+ sumLatency += latency;
+ ++numReplies;
+ if (now > nextHeader) {
+ printHeader();
+ nextHeader += HEADER_INTERVAL;
+ }
+ if (now > nextReport) {
+ printReport();
+ nextReport += REPORT_INTERVAL;
+ }
+ }
+
+ private void printHeader() {
+ out.println("total time, num messages, min latency, avg latency, max latency");
+ }
+
+ private void printReport() {
+ out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
+ numReplies, minLatency, sumLatency / numReplies, maxLatency);
+ }
+
+ private static String formatErrors(Reply reply) {
+ StringBuilder out = new StringBuilder();
+ out.append(reply.getMessage().toString()).append('\n');
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ out.append(reply.getError(i).toString()).append('\n');
+ }
+ return out.toString();
+ }
+
+ private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, String configId) {
+ return new RPCMessageBus(new MessageBusParams().addProtocol(new DocumentProtocol(docTypeMgr)),
+ new RPCNetworkParams().setSlobrokConfigId(configId),
+ configId);
+ }
+
+ private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, boolean serial) {
+ SourceSessionParams params = new SourceSessionParams();
+ params.setReplyHandler(replyHandler);
+ if (serial) {
+ params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1));
+ }
+ return mbus.getMessageBus().createSourceSession(params);
+ }
+}
diff --git a/vespa_feed_perf/src/main/sh/vespa-feed-perf b/vespa_feed_perf/src/main/sh/vespa-feed-perf
new file mode 100755
index 00000000000..208e0e8fe0d
--- /dev/null
+++ b/vespa_feed_perf/src/main/sh/vespa-feed-perf
@@ -0,0 +1,63 @@
+#!/bin/sh
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+# BEGIN environment bootstrap section
+# Do not edit between here and END as this section should stay identical in all scripts
+
+findpath () {
+ myname=${0}
+ mypath=${myname%/*}
+ myname=${myname##*/}
+ if [ "$mypath" ] && [ -d "$mypath" ]; then
+ return
+ fi
+ mypath=$(pwd)
+ if [ -f "${mypath}/${myname}" ]; then
+ return
+ fi
+ echo "FATAL: Could not figure out the path where $myname lives from $0"
+ exit 1
+}
+
+COMMON_ENV=libexec/vespa/common-env.sh
+
+source_common_env () {
+ if [ "$VESPA_HOME" ] && [ -d "$VESPA_HOME" ]; then
+ # ensure it ends with "/" :
+ VESPA_HOME=${VESPA_HOME%/}/
+ export VESPA_HOME
+ common_env=$VESPA_HOME/$COMMON_ENV
+ if [ -f "$common_env" ]; then
+ . $common_env
+ return
+ fi
+ fi
+ return 1
+}
+
+findroot () {
+ source_common_env && return
+ if [ "$VESPA_HOME" ]; then
+ echo "FATAL: bad VESPA_HOME value '$VESPA_HOME'"
+ exit 1
+ fi
+ if [ "$ROOT" ] && [ -d "$ROOT" ]; then
+ VESPA_HOME="$ROOT"
+ source_common_env && return
+ fi
+ findpath
+ while [ "$mypath" ]; do
+ VESPA_HOME=${mypath}
+ source_common_env && return
+ mypath=${mypath%/*}
+ done
+ echo "FATAL: missing VESPA_HOME environment variable"
+ echo "Could not locate $COMMON_ENV anywhere"
+ exit 1
+}
+
+findroot
+
+# END environment bootstrap section
+
+exec java -jar $VESPA_HOME/lib/jars/vespa_feed_perf-jar-with-dependencies.jar "$@"
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
new file mode 100644
index 00000000000..dec31d9e1d9
--- /dev/null
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
@@ -0,0 +1,88 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.messagebus.routing.Route;
+import org.apache.commons.cli.ParseException;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.InputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class FeederParamsTest {
+
+ @Test
+ public void requireThatAccessorsWork() {
+ FeederParams params = new FeederParams();
+
+ InputStream stdIn = Mockito.mock(InputStream.class);
+ params.setStdIn(stdIn);
+ assertSame(stdIn, params.getStdIn());
+
+ PrintStream stdErr = Mockito.mock(PrintStream.class);
+ params.setStdErr(stdErr);
+ assertSame(stdErr, params.getStdErr());
+
+ PrintStream stdOut = Mockito.mock(PrintStream.class);
+ params.setStdOut(stdOut);
+ assertSame(stdOut, params.getStdOut());
+
+ Route route = Route.parse("my_route");
+ params.setRoute(route);
+ assertEquals(route, params.getRoute());
+ assertNotSame(route, params.getRoute());
+
+ params.setConfigId("my_config_id");
+ assertEquals("my_config_id", params.getConfigId());
+
+ params.setSerialTransfer(false);
+ assertFalse(params.isSerialTransferEnabled());
+ params.setSerialTransfer(true);
+ assertTrue(params.isSerialTransferEnabled());
+ }
+
+ @Test
+ public void requireThatParamsHaveReasonableDefaults() {
+ FeederParams params = new FeederParams();
+ assertSame(System.in, params.getStdIn());
+ assertSame(System.err, params.getStdErr());
+ assertSame(System.out, params.getStdOut());
+ assertEquals(Route.parse("default"), params.getRoute());
+ assertEquals("client", params.getConfigId());
+ assertFalse(params.isSerialTransferEnabled());
+ }
+
+ @Test
+ public void requireThatSerialTransferOptionIsParsed() throws ParseException {
+ assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled());
+ assertTrue(new FeederParams().parseArgs("foo", "-s").isSerialTransferEnabled());
+ assertTrue(new FeederParams().parseArgs("-s", "foo").isSerialTransferEnabled());
+ assertTrue(new FeederParams().parseArgs("--serial").isSerialTransferEnabled());
+ assertTrue(new FeederParams().parseArgs("foo", "--serial").isSerialTransferEnabled());
+ assertTrue(new FeederParams().parseArgs("--serial", "foo").isSerialTransferEnabled());
+ }
+
+ @Test
+ public void requireThatArgumentsAreParsedAsRoute() throws ParseException {
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar").getRoute());
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-s", "foo", "bar").getRoute());
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "-s", "bar").getRoute());
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar", "-s").getRoute());
+ }
+
+ @Test
+ public void requireThatRouteIsAnOptionalArgument() throws ParseException {
+ assertEquals(Route.parse("default"), new FeederParams().parseArgs().getRoute());
+ assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute());
+ }
+
+}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
new file mode 100644
index 00000000000..809277670ad
--- /dev/null
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
@@ -0,0 +1,198 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.document.serialization.DeserializationException;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.messagebus.ThrottlePolicy;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class SimpleFeederTest {
+
+ private static final String CONFIG_DIR = "target/test-classes/";
+
+ @Test
+ public void requireThatFeederWorks() throws Throwable {
+ assertFeed("<vespafeed>" +
+ " <document documenttype='simple' documentid='doc:scheme:0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <update documenttype='simple' documentid='doc:scheme:1'>" +
+ " <assign field='my_str'>bar</assign>" +
+ " </update>" +
+ " <remove documenttype='simple' documentid='doc:scheme:2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ }
+
+ @Test
+ public void requireThatParseFailuresThrowInMainThread() throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(),
+ "<vespafeed>" +
+ " <document documenttype='unknown' documentid='doc:scheme:0'/>" +
+ "</vespafeed>",
+ null);
+ try {
+ driver.run();
+ fail();
+ } catch (DeserializationException e) {
+ assertEquals("Field 'doc:scheme:0': Must specify an existing document type, not 'unknown' (at line 1, column 76)",
+ e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatSyncFailuresThrowInMainThread() throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='doc:scheme:0'/>" +
+ "</vespafeed>",
+ null);
+ getSourceSession(driver).close();
+ try {
+ driver.run();
+ fail();
+ } catch (IOException e) {
+ assertEquals("[SEND_QUEUE_CLOSED @ localhost]: Source session is closed.", e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatAsyncFailuresThrowInMainThread() throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(),
+ "<vespafeed><document documenttype='simple' documentid='doc:scheme:0'/></vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 6, "foo"));
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 9, "bar"));
+ reply.popHandler().handleReply(reply);
+ }
+ });
+ try {
+ driver.run();
+ fail();
+ } catch (IOException e) {
+ assertMatches("com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage@.+\n" +
+ "\\[UNKNOWN\\(250006\\) @ .+\\]: foo\n" +
+ "\\[UNKNOWN\\(250009\\) @ .+\\]: bar\n",
+ e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatDynamicThrottlingIsDefault() throws Exception {
+ TestDriver driver = new TestDriver(new FeederParams(), "", null);
+ assertEquals(DynamicThrottlePolicy.class, getThrottlePolicy(driver).getClass());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception {
+ TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(true), "", null);
+ assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass());
+ assertTrue(driver.close());
+ }
+
+ private static SourceSession getSourceSession(TestDriver driver) {
+ return (SourceSession)getField(driver.feeder, "session");
+ }
+
+ private static ThrottlePolicy getThrottlePolicy(TestDriver driver) {
+ return (ThrottlePolicy)getField(getSourceSession(driver), "throttlePolicy");
+ }
+
+ private static Object getField(Object obj, String fieldName) {
+ try {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(obj);
+ } catch (IllegalAccessException | NoSuchFieldException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut)
+ throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(), in, validator);
+ driver.run();
+ assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8));
+ assertMatches(expectedOut, new String(driver.out.toByteArray(), StandardCharsets.UTF_8));
+ assertTrue(driver.close());
+ }
+
+ private static void assertMatches(String expected, String actual) {
+ if (!Pattern.matches(expected, actual)) {
+ assertEquals(expected, actual);
+ }
+ }
+
+ private static class TestDriver {
+
+ final ByteArrayOutputStream err = new ByteArrayOutputStream();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final SimpleFeeder feeder;
+ final SimpleServer server;
+
+ public TestDriver(FeederParams params, String in, MessageHandler validator)
+ throws IOException, ListenFailedException {
+ server = new SimpleServer(CONFIG_DIR, validator);
+ feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR)
+ .setStdErr(new PrintStream(err))
+ .setStdIn(new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)))
+ .setStdOut(new PrintStream(out)));
+ }
+
+ void run() throws Throwable {
+ feeder.run();
+ }
+
+ boolean close() {
+ feeder.close();
+ server.close();
+ return true;
+ }
+ }
+
+}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java
new file mode 100644
index 00000000000..5df725865dd
--- /dev/null
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java
@@ -0,0 +1,63 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a>
+ */
+public class SimpleServer {
+
+ private final DocumentTypeManager documentMgr;
+ private final Slobrok slobrok;
+ private final MessageBus mbus;
+ private final DestinationSession session;
+
+ public SimpleServer(String configDir, MessageHandler msgHandler) throws IOException, ListenFailedException {
+ slobrok = new Slobrok();
+ documentMgr = new DocumentTypeManager();
+ documentMgr.configure("dir:" + configDir);
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
+ .setSlobrokConfigId(slobrok.configId())
+ .setIdentity(new Identity("server"))),
+ new MessageBusParams().addProtocol(new DocumentProtocol(documentMgr)));
+ session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(msgHandler));
+
+ PrintWriter writer = new PrintWriter(new FileWriter(configDir + "/messagebus.cfg"));
+ writer.println("routingtable[1]\n" +
+ "routingtable[0].protocol \"document\"\n" +
+ "routingtable[0].hop[0]\n" +
+ "routingtable[0].route[1]\n" +
+ "routingtable[0].route[0].name \"default\"\n" +
+ "routingtable[0].route[0].hop[1]\n" +
+ "routingtable[0].route[0].hop[0] \"" + session.getConnectionSpec() + "\"");
+ writer.close();
+
+ writer = new PrintWriter(new FileWriter(configDir + "/slobroks.cfg"));
+ writer.println(slobrok.configId().substring(4));
+ writer.close();
+ }
+
+ public final void close() {
+ session.destroy();
+ mbus.destroy();
+ documentMgr.shutdown();
+ slobrok.stop();
+ }
+
+}
diff --git a/vespa_feed_perf/src/test/resources/documentmanager.cfg b/vespa_feed_perf/src/test/resources/documentmanager.cfg
new file mode 100644
index 00000000000..ebb6e767bef
--- /dev/null
+++ b/vespa_feed_perf/src/test/resources/documentmanager.cfg
@@ -0,0 +1,50 @@
+enablecompression false
+annotationtype[0]
+datatype[3]
+datatype[0].id -1101456630
+datatype[0].annotationreftype[0]
+datatype[0].arraytype[0]
+datatype[0].documenttype[0]
+datatype[0].maptype[0]
+datatype[0].structtype[1]
+datatype[0].structtype[0].compresslevel 0
+datatype[0].structtype[0].compressminsize 800
+datatype[0].structtype[0].compressthreshold 95
+datatype[0].structtype[0].compresstype NONE
+datatype[0].structtype[0].name "simple.body"
+datatype[0].structtype[0].version 0
+datatype[0].structtype[0].field[0]
+datatype[0].structtype[0].inherits[0]
+datatype[0].weightedsettype[0]
+datatype[1].id -459619403
+datatype[1].annotationreftype[0]
+datatype[1].arraytype[0]
+datatype[1].documenttype[0]
+datatype[1].maptype[0]
+datatype[1].structtype[1]
+datatype[1].structtype[0].compresslevel 0
+datatype[1].structtype[0].compressminsize 800
+datatype[1].structtype[0].compressthreshold 95
+datatype[1].structtype[0].compresstype NONE
+datatype[1].structtype[0].name "simple.header"
+datatype[1].structtype[0].version 0
+datatype[1].structtype[0].field[1]
+datatype[1].structtype[0].field[0].datatype 2
+datatype[1].structtype[0].field[0].name "my_str"
+datatype[1].structtype[0].field[0].id[0]
+datatype[1].structtype[0].inherits[0]
+datatype[1].weightedsettype[0]
+datatype[2].id -1668955062
+datatype[2].annotationreftype[0]
+datatype[2].arraytype[0]
+datatype[2].documenttype[1]
+datatype[2].documenttype[0].bodystruct -1101456630
+datatype[2].documenttype[0].headerstruct -459619403
+datatype[2].documenttype[0].name "simple"
+datatype[2].documenttype[0].version 0
+datatype[2].documenttype[0].inherits[1]
+datatype[2].documenttype[0].inherits[0].name "document"
+datatype[2].documenttype[0].inherits[0].version 0
+datatype[2].maptype[0]
+datatype[2].structtype[0]
+datatype[2].weightedsettype[0]