diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /vespa_feed_perf |
Publish
Diffstat (limited to 'vespa_feed_perf')
10 files changed, 832 insertions, 0 deletions
diff --git a/vespa_feed_perf/.gitignore b/vespa_feed_perf/.gitignore new file mode 100644 index 00000000000..12251442258 --- /dev/null +++ b/vespa_feed_perf/.gitignore @@ -0,0 +1,2 @@ +/target +/pom.xml.build diff --git a/vespa_feed_perf/OWNERS b/vespa_feed_perf/OWNERS new file mode 100644 index 00000000000..0e39145d8c3 --- /dev/null +++ b/vespa_feed_perf/OWNERS @@ -0,0 +1 @@ +dybdahl diff --git a/vespa_feed_perf/pom.xml b/vespa_feed_perf/pom.xml new file mode 100644 index 00000000000..c78d18f055a --- /dev/null +++ b/vespa_feed_perf/pom.xml @@ -0,0 +1,89 @@ +<?xml version="1.0"?> +<!-- Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <!-- This module is used in 4 tests: + interpolated_lookup.rb, match_phase_degradation_test.rb reportcoverage.rb slow_query.rb + TODO: Remove usage and remove this module --> + + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>com.yahoo.vespa</groupId> + <artifactId>parent</artifactId> + <version>6-SNAPSHOT</version> + <relativePath>../parent/pom.xml</relativePath> + </parent> + <artifactId>vespa_feed_perf</artifactId> + <version>6-SNAPSHOT</version> + <packaging>jar</packaging> + <name>${project.artifactId}</name> + <dependencies> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>component</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>documentapi</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>com.yahoo.vespa</groupId> + <artifactId>messagebus</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + <build> + <resources> + <resource> + <directory>src/test/resources</directory> + <filtering>false</filtering> + </resource> + </resources> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <archive> + <manifest> + <mainClass>com.yahoo.vespa.feed.perf.SimpleFeeder</mainClass> + </manifest> + </archive> + <descriptorRefs> + <descriptorRef>jar-with-dependencies</descriptorRef> + </descriptorRefs> + </configuration> + <executions> + <execution> + <id>make-assembly</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java new file mode 100644 index 00000000000..ed73ba72f4b --- /dev/null +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -0,0 +1,99 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.messagebus.routing.Route; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.InputStream; +import java.io.PrintStream; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class FeederParams { + + private InputStream stdIn = System.in; + private PrintStream stdErr = System.err; + private PrintStream stdOut = System.out; + private Route route = Route.parse("default"); + private String configId = "client"; + private boolean serialTransferEnabled = false; + + public InputStream getStdIn() { + return stdIn; + } + + public FeederParams setStdIn(InputStream stdIn) { + this.stdIn = stdIn; + return this; + } + + public PrintStream getStdErr() { + return stdErr; + } + + public FeederParams setStdErr(PrintStream stdErr) { + this.stdErr = stdErr; + return this; + } + + public PrintStream getStdOut() { + return stdOut; + } + + public FeederParams setStdOut(PrintStream stdOut) { + this.stdOut = stdOut; + return this; + } + + public Route getRoute() { + return route; + } + + public FeederParams setRoute(Route route) { + this.route = new Route(route); + return this; + } + + public String getConfigId() { + return configId; + } + + public FeederParams setConfigId(String configId) { + this.configId = configId; + return this; + } + + public boolean isSerialTransferEnabled() { + return serialTransferEnabled; + } + + public FeederParams setSerialTransfer(boolean serial) { + this.serialTransferEnabled = serial; + return this; + } + + public FeederParams parseArgs(String... args) throws ParseException { + Options opts = new Options(); + opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation"); + + CommandLine cmd = new DefaultParser().parse(opts, args); + serialTransferEnabled = cmd.hasOption("s"); + route = newRoute(cmd.getArgs()); + return this; + } + + private static Route newRoute(String... args) { + if (args.length == 0) { + return Route.parse("default"); + } + StringBuilder out = new StringBuilder(); + for (String arg : args) { + out.append(arg).append(' '); + } + return Route.parse(out.toString()); + } +} diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java new file mode 100644 index 00000000000..e5f21506d59 --- /dev/null +++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -0,0 +1,179 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.messagebus.protocol.*; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.RPCMessageBus; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.util.concurrent.TimeUnit; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class SimpleFeeder implements ReplyHandler { + + private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10); + private final static long HEADER_INTERVAL = REPORT_INTERVAL * 24; + private final DocumentTypeManager docTypeMgr = new DocumentTypeManager(); + private final InputStream in; + private final PrintStream out; + private final PrintStream err; + private final RPCMessageBus mbus; + private final Route route; + private final SourceSession session; + private final long startTime = System.currentTimeMillis(); + private volatile Throwable failure; + private volatile long numReplies = 0; + private long maxLatency = Long.MIN_VALUE; + private long minLatency = Long.MAX_VALUE; + private long nextHeader = startTime + HEADER_INTERVAL; + private long nextReport = startTime + REPORT_INTERVAL; + private long numMessages = 0; + private long sumLatency = 0; + + public static void main(String[] args) throws Throwable { + new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); + } + + public SimpleFeeder(FeederParams params) { + this.in = params.getStdIn(); + this.out = params.getStdOut(); + this.err = params.getStdErr(); + this.route = params.getRoute(); + this.mbus = newMessageBus(docTypeMgr, params.getConfigId()); + this.session = newSession(mbus, this, params.isSerialTransferEnabled()); + this.docTypeMgr.configure(params.getConfigId()); + } + + public SimpleFeeder run() throws Throwable { + VespaXMLFeedReader reader = new VespaXMLFeedReader(in, docTypeMgr); + VespaXMLFeedReader.Operation op = new VespaXMLFeedReader.Operation(); + printHeader(); + while (failure == null) { + reader.read(op); + if (op.getType() == VespaXMLFeedReader.OperationType.INVALID) { + break; + } + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + continue; // ignore + } + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + throw new IOException(err.toString()); + } + ++numMessages; + } + while (failure == null && numReplies < numMessages) { + Thread.sleep(100); + } + if (failure != null) { + throw failure; + } + printReport(); + return this; + } + + public void close() { + session.destroy(); + mbus.destroy(); + } + + private Message newMessage(VespaXMLFeedReader.Operation op) { + switch (op.getType()) { + case DOCUMENT: { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument())); + message.setCondition(op.getCondition()); + return message; + } + case REMOVE: { + RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove()); + message.setCondition(op.getCondition()); + return message; + } + case UPDATE: { + UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate()); + message.setCondition(op.getCondition()); + return message; + } + default: + return null; + } + } + + @Override + public void handleReply(Reply reply) { + if (failure != null) { + return; + } + if (reply.hasErrors()) { + failure = new IOException(formatErrors(reply)); + return; + } + long now = System.currentTimeMillis(); + long latency = now - (long)reply.getContext(); + minLatency = Math.min(minLatency, latency); + maxLatency = Math.max(maxLatency, latency); + sumLatency += latency; + ++numReplies; + if (now > nextHeader) { + printHeader(); + nextHeader += HEADER_INTERVAL; + } + if (now > nextReport) { + printReport(); + nextReport += REPORT_INTERVAL; + } + } + + private void printHeader() { + out.println("total time, num messages, min latency, avg latency, max latency"); + } + + private void printReport() { + out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, + numReplies, minLatency, sumLatency / numReplies, maxLatency); + } + + private static String formatErrors(Reply reply) { + StringBuilder out = new StringBuilder(); + out.append(reply.getMessage().toString()).append('\n'); + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + out.append(reply.getError(i).toString()).append('\n'); + } + return out.toString(); + } + + private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, String configId) { + return new RPCMessageBus(new MessageBusParams().addProtocol(new DocumentProtocol(docTypeMgr)), + new RPCNetworkParams().setSlobrokConfigId(configId), + configId); + } + + private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, boolean serial) { + SourceSessionParams params = new SourceSessionParams(); + params.setReplyHandler(replyHandler); + if (serial) { + params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(1)); + } + return mbus.getMessageBus().createSourceSession(params); + } +} diff --git a/vespa_feed_perf/src/main/sh/vespa-feed-perf b/vespa_feed_perf/src/main/sh/vespa-feed-perf new file mode 100755 index 00000000000..208e0e8fe0d --- /dev/null +++ b/vespa_feed_perf/src/main/sh/vespa-feed-perf @@ -0,0 +1,63 @@ +#!/bin/sh +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +# BEGIN environment bootstrap section +# Do not edit between here and END as this section should stay identical in all scripts + +findpath () { + myname=${0} + mypath=${myname%/*} + myname=${myname##*/} + if [ "$mypath" ] && [ -d "$mypath" ]; then + return + fi + mypath=$(pwd) + if [ -f "${mypath}/${myname}" ]; then + return + fi + echo "FATAL: Could not figure out the path where $myname lives from $0" + exit 1 +} + +COMMON_ENV=libexec/vespa/common-env.sh + +source_common_env () { + if [ "$VESPA_HOME" ] && [ -d "$VESPA_HOME" ]; then + # ensure it ends with "/" : + VESPA_HOME=${VESPA_HOME%/}/ + export VESPA_HOME + common_env=$VESPA_HOME/$COMMON_ENV + if [ -f "$common_env" ]; then + . $common_env + return + fi + fi + return 1 +} + +findroot () { + source_common_env && return + if [ "$VESPA_HOME" ]; then + echo "FATAL: bad VESPA_HOME value '$VESPA_HOME'" + exit 1 + fi + if [ "$ROOT" ] && [ -d "$ROOT" ]; then + VESPA_HOME="$ROOT" + source_common_env && return + fi + findpath + while [ "$mypath" ]; do + VESPA_HOME=${mypath} + source_common_env && return + mypath=${mypath%/*} + done + echo "FATAL: missing VESPA_HOME environment variable" + echo "Could not locate $COMMON_ENV anywhere" + exit 1 +} + +findroot + +# END environment bootstrap section + +exec java -jar $VESPA_HOME/lib/jars/vespa_feed_perf-jar-with-dependencies.jar "$@" diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java new file mode 100644 index 00000000000..dec31d9e1d9 --- /dev/null +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java @@ -0,0 +1,88 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.messagebus.routing.Route; +import org.apache.commons.cli.ParseException; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.InputStream; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class FeederParamsTest { + + @Test + public void requireThatAccessorsWork() { + FeederParams params = new FeederParams(); + + InputStream stdIn = Mockito.mock(InputStream.class); + params.setStdIn(stdIn); + assertSame(stdIn, params.getStdIn()); + + PrintStream stdErr = Mockito.mock(PrintStream.class); + params.setStdErr(stdErr); + assertSame(stdErr, params.getStdErr()); + + PrintStream stdOut = Mockito.mock(PrintStream.class); + params.setStdOut(stdOut); + assertSame(stdOut, params.getStdOut()); + + Route route = Route.parse("my_route"); + params.setRoute(route); + assertEquals(route, params.getRoute()); + assertNotSame(route, params.getRoute()); + + params.setConfigId("my_config_id"); + assertEquals("my_config_id", params.getConfigId()); + + params.setSerialTransfer(false); + assertFalse(params.isSerialTransferEnabled()); + params.setSerialTransfer(true); + assertTrue(params.isSerialTransferEnabled()); + } + + @Test + public void requireThatParamsHaveReasonableDefaults() { + FeederParams params = new FeederParams(); + assertSame(System.in, params.getStdIn()); + assertSame(System.err, params.getStdErr()); + assertSame(System.out, params.getStdOut()); + assertEquals(Route.parse("default"), params.getRoute()); + assertEquals("client", params.getConfigId()); + assertFalse(params.isSerialTransferEnabled()); + } + + @Test + public void requireThatSerialTransferOptionIsParsed() throws ParseException { + assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled()); + assertTrue(new FeederParams().parseArgs("foo", "-s").isSerialTransferEnabled()); + assertTrue(new FeederParams().parseArgs("-s", "foo").isSerialTransferEnabled()); + assertTrue(new FeederParams().parseArgs("--serial").isSerialTransferEnabled()); + assertTrue(new FeederParams().parseArgs("foo", "--serial").isSerialTransferEnabled()); + assertTrue(new FeederParams().parseArgs("--serial", "foo").isSerialTransferEnabled()); + } + + @Test + public void requireThatArgumentsAreParsedAsRoute() throws ParseException { + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar").getRoute()); + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-s", "foo", "bar").getRoute()); + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "-s", "bar").getRoute()); + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("foo", "bar", "-s").getRoute()); + } + + @Test + public void requireThatRouteIsAnOptionalArgument() throws ParseException { + assertEquals(Route.parse("default"), new FeederParams().parseArgs().getRoute()); + assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute()); + } + +} diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java new file mode 100644 index 00000000000..809277670ad --- /dev/null +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java @@ -0,0 +1,198 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.document.serialization.DeserializationException; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.messagebus.ThrottlePolicy; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class SimpleFeederTest { + + private static final String CONFIG_DIR = "target/test-classes/"; + + @Test + public void requireThatFeederWorks() throws Throwable { + assertFeed("<vespafeed>" + + " <document documenttype='simple' documentid='doc:scheme:0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <update documenttype='simple' documentid='doc:scheme:1'>" + + " <assign field='my_str'>bar</assign>" + + " </update>" + + " <remove documenttype='simple' documentid='doc:scheme:2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + } + + @Test + public void requireThatParseFailuresThrowInMainThread() throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), + "<vespafeed>" + + " <document documenttype='unknown' documentid='doc:scheme:0'/>" + + "</vespafeed>", + null); + try { + driver.run(); + fail(); + } catch (DeserializationException e) { + assertEquals("Field 'doc:scheme:0': Must specify an existing document type, not 'unknown' (at line 1, column 76)", + e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatSyncFailuresThrowInMainThread() throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), + "<vespafeed>" + + " <document documenttype='simple' documentid='doc:scheme:0'/>" + + "</vespafeed>", + null); + getSourceSession(driver).close(); + try { + driver.run(); + fail(); + } catch (IOException e) { + assertEquals("[SEND_QUEUE_CLOSED @ localhost]: Source session is closed.", e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatAsyncFailuresThrowInMainThread() throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), + "<vespafeed><document documenttype='simple' documentid='doc:scheme:0'/></vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 6, "foo")); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 9, "bar")); + reply.popHandler().handleReply(reply); + } + }); + try { + driver.run(); + fail(); + } catch (IOException e) { + assertMatches("com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage@.+\n" + + "\\[UNKNOWN\\(250006\\) @ .+\\]: foo\n" + + "\\[UNKNOWN\\(250009\\) @ .+\\]: bar\n", + e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatDynamicThrottlingIsDefault() throws Exception { + TestDriver driver = new TestDriver(new FeederParams(), "", null); + assertEquals(DynamicThrottlePolicy.class, getThrottlePolicy(driver).getClass()); + assertTrue(driver.close()); + } + + @Test + public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception { + TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(true), "", null); + assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass()); + assertTrue(driver.close()); + } + + private static SourceSession getSourceSession(TestDriver driver) { + return (SourceSession)getField(driver.feeder, "session"); + } + + private static ThrottlePolicy getThrottlePolicy(TestDriver driver) { + return (ThrottlePolicy)getField(getSourceSession(driver), "throttlePolicy"); + } + + private static Object getField(Object obj, String fieldName) { + try { + Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(obj); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new AssertionError(e); + } + } + + private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) + throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), in, validator); + driver.run(); + assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8)); + assertMatches(expectedOut, new String(driver.out.toByteArray(), StandardCharsets.UTF_8)); + assertTrue(driver.close()); + } + + private static void assertMatches(String expected, String actual) { + if (!Pattern.matches(expected, actual)) { + assertEquals(expected, actual); + } + } + + private static class TestDriver { + + final ByteArrayOutputStream err = new ByteArrayOutputStream(); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final SimpleFeeder feeder; + final SimpleServer server; + + public TestDriver(FeederParams params, String in, MessageHandler validator) + throws IOException, ListenFailedException { + server = new SimpleServer(CONFIG_DIR, validator); + feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR) + .setStdErr(new PrintStream(err)) + .setStdIn(new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8))) + .setStdOut(new PrintStream(out))); + } + + void run() throws Throwable { + feeder.run(); + } + + boolean close() { + feeder.close(); + server.close(); + return true; + } + } + +} diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java new file mode 100644 index 00000000000..5df725865dd --- /dev/null +++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleServer.java @@ -0,0 +1,63 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +/** + * @author <a href="mailto:simon@yahoo-inc.com">Simon Thoresen Hult</a> + */ +public class SimpleServer { + + private final DocumentTypeManager documentMgr; + private final Slobrok slobrok; + private final MessageBus mbus; + private final DestinationSession session; + + public SimpleServer(String configDir, MessageHandler msgHandler) throws IOException, ListenFailedException { + slobrok = new Slobrok(); + documentMgr = new DocumentTypeManager(); + documentMgr.configure("dir:" + configDir); + mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams() + .setSlobrokConfigId(slobrok.configId()) + .setIdentity(new Identity("server"))), + new MessageBusParams().addProtocol(new DocumentProtocol(documentMgr))); + session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(msgHandler)); + + PrintWriter writer = new PrintWriter(new FileWriter(configDir + "/messagebus.cfg")); + writer.println("routingtable[1]\n" + + "routingtable[0].protocol \"document\"\n" + + "routingtable[0].hop[0]\n" + + "routingtable[0].route[1]\n" + + "routingtable[0].route[0].name \"default\"\n" + + "routingtable[0].route[0].hop[1]\n" + + "routingtable[0].route[0].hop[0] \"" + session.getConnectionSpec() + "\""); + writer.close(); + + writer = new PrintWriter(new FileWriter(configDir + "/slobroks.cfg")); + writer.println(slobrok.configId().substring(4)); + writer.close(); + } + + public final void close() { + session.destroy(); + mbus.destroy(); + documentMgr.shutdown(); + slobrok.stop(); + } + +} diff --git a/vespa_feed_perf/src/test/resources/documentmanager.cfg b/vespa_feed_perf/src/test/resources/documentmanager.cfg new file mode 100644 index 00000000000..ebb6e767bef --- /dev/null +++ b/vespa_feed_perf/src/test/resources/documentmanager.cfg @@ -0,0 +1,50 @@ +enablecompression false +annotationtype[0] +datatype[3] +datatype[0].id -1101456630 +datatype[0].annotationreftype[0] +datatype[0].arraytype[0] +datatype[0].documenttype[0] +datatype[0].maptype[0] +datatype[0].structtype[1] +datatype[0].structtype[0].compresslevel 0 +datatype[0].structtype[0].compressminsize 800 +datatype[0].structtype[0].compressthreshold 95 +datatype[0].structtype[0].compresstype NONE +datatype[0].structtype[0].name "simple.body" +datatype[0].structtype[0].version 0 +datatype[0].structtype[0].field[0] +datatype[0].structtype[0].inherits[0] +datatype[0].weightedsettype[0] +datatype[1].id -459619403 +datatype[1].annotationreftype[0] +datatype[1].arraytype[0] +datatype[1].documenttype[0] +datatype[1].maptype[0] +datatype[1].structtype[1] +datatype[1].structtype[0].compresslevel 0 +datatype[1].structtype[0].compressminsize 800 +datatype[1].structtype[0].compressthreshold 95 +datatype[1].structtype[0].compresstype NONE +datatype[1].structtype[0].name "simple.header" +datatype[1].structtype[0].version 0 +datatype[1].structtype[0].field[1] +datatype[1].structtype[0].field[0].datatype 2 +datatype[1].structtype[0].field[0].name "my_str" +datatype[1].structtype[0].field[0].id[0] +datatype[1].structtype[0].inherits[0] +datatype[1].weightedsettype[0] +datatype[2].id -1668955062 +datatype[2].annotationreftype[0] +datatype[2].arraytype[0] +datatype[2].documenttype[1] +datatype[2].documenttype[0].bodystruct -1101456630 +datatype[2].documenttype[0].headerstruct -459619403 +datatype[2].documenttype[0].name "simple" +datatype[2].documenttype[0].version 0 +datatype[2].documenttype[0].inherits[1] +datatype[2].documenttype[0].inherits[0].name "document" +datatype[2].documenttype[0].inherits[0].version 0 +datatype[2].maptype[0] +datatype[2].structtype[0] +datatype[2].weightedsettype[0] |