summaryrefslogtreecommitdiffstats
path: root/vespaclient-java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-29 21:45:44 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-29 21:45:44 +0100
commit13984de4d6ab9120920c6c2c54aecb1ea9deee08 (patch)
tree1c95e65073717d96d71b944539573c41cbd43e43 /vespaclient-java
parent0bc82ef441d34bce1ae79b797973393c0675184c (diff)
Collapse the vespa_feed_perf into the other feed clients.
Diffstat (limited to 'vespaclient-java')
-rw-r--r--vespaclient-java/CMakeLists.txt1
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java205
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java518
-rwxr-xr-xvespaclient-java/src/main/sh/vespa-feed-perf89
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java176
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java332
-rw-r--r--vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java63
-rw-r--r--vespaclient-java/src/test/resources/documentmanager.cfg59
8 files changed, 1443 insertions, 0 deletions
diff --git a/vespaclient-java/CMakeLists.txt b/vespaclient-java/CMakeLists.txt
index 242743a06e7..2af1bbe55cd 100644
--- a/vespaclient-java/CMakeLists.txt
+++ b/vespaclient-java/CMakeLists.txt
@@ -11,3 +11,4 @@ vespa_install_script(src/main/sh/vespa-feeder.sh vespa-feeder bin)
vespa_install_script(src/main/sh/vespa-get.sh vespa-get bin)
vespa_install_script(src/main/sh/vespa-visit.sh vespa-visit bin)
vespa_install_script(src/main/sh/vespa-visit-target.sh vespa-visit-target bin)
+vespa_install_script(src/main/sh/vespa-feed-perf vespa-feed-perf bin)
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
new file mode 100644
index 00000000000..0ecc4198e46
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -0,0 +1,205 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.messagebus.routing.Route;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+class FeederParams {
+
+ private static final int BUFFER_SIZE = 0x100000;
+ enum DumpFormat {JSON, VESPA}
+ private PrintStream stdErr = System.err;
+ private PrintStream stdOut = System.out;
+ private Route route = Route.parse("default");
+ private String configId = "client";
+ private OutputStream dumpStream = null;
+ private DumpFormat dumpFormat = DumpFormat.JSON;
+ private boolean benchmarkMode = false;
+ private int numDispatchThreads = 1;
+ private int maxPending = 0;
+ private double timeout = 180.0;
+
+ private double windowSizeBackOff = 0.95;
+ private double windowDecrementFactor = 1.2;
+ private double windowResizeRate = 3;
+ private int windowIncrementSize = 20;
+
+ private int numConnectionsPerTarget = 1;
+ private long numMessagesToSend = Long.MAX_VALUE;
+ private List<InputStream> inputStreams = new ArrayList<>();
+
+ FeederParams() {
+ inputStreams.add(System.in);
+ }
+
+ PrintStream getStdErr() {
+ return stdErr;
+ }
+
+ FeederParams setStdErr(PrintStream stdErr) {
+ this.stdErr = stdErr;
+ return this;
+ }
+
+ PrintStream getStdOut() {
+ return stdOut;
+ }
+
+ FeederParams setStdOut(PrintStream stdOut) {
+ this.stdOut = stdOut;
+ return this;
+ }
+
+ Route getRoute() {
+ return route;
+ }
+ OutputStream getDumpStream() { return dumpStream; }
+ FeederParams setDumpStream(OutputStream dumpStream) {
+ this.dumpStream = dumpStream;
+ return this;
+ }
+
+ DumpFormat getDumpFormat() { return dumpFormat; }
+ FeederParams setDumpFormat(DumpFormat dumpFormat) {
+ this.dumpFormat = dumpFormat;
+ return this;
+ }
+
+ String getConfigId() {
+ return configId;
+ }
+
+ FeederParams setConfigId(String configId) {
+ this.configId = configId;
+ return this;
+ }
+ public double getWindowSizeBackOff() {
+ return windowSizeBackOff;
+ }
+
+ public double getWindowDecrementFactor() {
+ return windowDecrementFactor;
+ }
+
+ public double getWindowResizeRate() {
+ return windowResizeRate;
+ }
+ public double getTimeout() {
+ return timeout;
+ }
+
+ public int getWindowIncrementSize() {
+ return windowIncrementSize;
+ }
+
+ int getNumConnectionsPerTarget() { return numConnectionsPerTarget; }
+
+ long getNumMessagesToSend() { return numMessagesToSend; }
+
+ boolean isSerialTransferEnabled() {
+ return maxPending == 1;
+ }
+
+ FeederParams setSerialTransfer() {
+ maxPending = 1;
+ numDispatchThreads = 1;
+ return this;
+ }
+ List<InputStream> getInputStreams() { return inputStreams; }
+ FeederParams setInputStreams(List<InputStream> inputStreams) {
+ this.inputStreams = inputStreams;
+ return this;
+ }
+
+ int getNumDispatchThreads() { return numDispatchThreads; }
+ int getMaxPending() { return maxPending; }
+ boolean isBenchmarkMode() { return benchmarkMode; }
+
+ FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException {
+ Options opts = new Options();
+ opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation and a single thread");
+ opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id.");
+ opts.addOption("m", "maxpending", true, "Max number of inflights messages. Default is auto.");
+ opts.addOption("r", "route", true, "Route for sending messages. default is 'default'....");
+ opts.addOption("b", "mode", true, "Mode for benchmarking.");
+ opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension.");
+ opts.addOption("c", "numconnections", true, "Number of connections per host.");
+ opts.addOption("t", "timeout", true, "Timeout for a message in seconds. default = " + timeout);
+ opts.addOption("l", "nummessages", true, "Number of messages to send (all is default).");
+ opts.addOption("wi", "window_incrementsize", true, "Dynamic window increment step size. default = " + windowIncrementSize);
+ opts.addOption("wd", "window_decrementfactor", true, "Dynamic window decrement step size factor. default = " + windowDecrementFactor);
+ opts.addOption("wb", "window_backoffactor", true, "Dynamic window backoff factor. default = " + windowSizeBackOff);
+ opts.addOption("wr", "window_resizerate", true, "Dynamic window resize rate. default = " + windowResizeRate);
+
+ CommandLine cmd = new DefaultParser().parse(opts, args);
+
+ if (cmd.hasOption('n')) {
+ numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim());
+ }
+ if (cmd.hasOption('m')) {
+ maxPending = Integer.valueOf(cmd.getOptionValue('m').trim());
+ }
+ if (cmd.hasOption('c')) {
+ numConnectionsPerTarget = Integer.valueOf(cmd.getOptionValue('c').trim());
+ }
+ if (cmd.hasOption("wi")) {
+ windowIncrementSize = Integer.valueOf(cmd.getOptionValue("wi").trim());
+ }
+ if (cmd.hasOption("wd")) {
+ windowDecrementFactor = Double.valueOf(cmd.getOptionValue("wd").trim());
+ }
+ if (cmd.hasOption("wb")) {
+ windowSizeBackOff = Double.valueOf(cmd.getOptionValue("wb").trim());
+ }
+ if (cmd.hasOption("wr")) {
+ windowResizeRate = Double.valueOf(cmd.getOptionValue("wr").trim());
+ }
+ if (cmd.hasOption('r')) {
+ route = Route.parse(cmd.getOptionValue('r').trim());
+ }
+ if (cmd.hasOption("t")) {
+ timeout = Double.valueOf(cmd.getOptionValue("t").trim());
+ }
+ benchmarkMode = cmd.hasOption('b');
+ if (cmd.hasOption('o')) {
+ String fileName = cmd.getOptionValue('o').trim();
+ dumpStream = new FileOutputStream(new File(fileName));
+ if (fileName.endsWith(".vespa")) {
+ dumpFormat = DumpFormat.VESPA;
+ }
+ }
+ if (cmd.hasOption('s')) {
+ setSerialTransfer();
+ }
+ if (cmd.hasOption('l')) {
+ numMessagesToSend = Long.valueOf(cmd.getOptionValue('l').trim());
+ }
+
+ if ( !cmd.getArgList().isEmpty()) {
+ inputStreams.clear();
+ for (String fileName : cmd.getArgList()) {
+ inputStreams.add(new BufferedInputStream(new FileInputStream(new File(fileName)), BUFFER_SIZE));
+ }
+ }
+
+ return this;
+ }
+
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
new file mode 100644
index 00000000000..c40e2c21561
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -0,0 +1,518 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentTypeManagerConfigurer;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.json.JsonFeedReader;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.document.serialization.DocumentSerializerFactory;
+import com.yahoo.document.serialization.DocumentWriter;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.RPCMessageBus;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespaxmlparser.ConditionalFeedOperation;
+import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.FeedOperation;
+import com.yahoo.vespaxmlparser.RemoveFeedOperation;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SimpleFeeder implements ReplyHandler {
+
+ private final DocumentTypeManager docTypeMgr = new DocumentTypeManager();
+ private final ConfigSubscriber documentTypeConfigSubscriber;
+ private final List<InputStream> inputStreams;
+ private final PrintStream out;
+ private final RPCMessageBus mbus;
+ private final SourceSession session;
+ private final int numThreads;
+ private final long numMessagesToSend;
+ private final Destination destination;
+ private final boolean benchmarkMode;
+ private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10);
+ private final long startTime = System.currentTimeMillis();
+ private final AtomicReference<Throwable> failure = new AtomicReference<>(null);
+ private final AtomicLong numReplies = new AtomicLong(0);
+ private long maxLatency = Long.MIN_VALUE;
+ private long minLatency = Long.MAX_VALUE;
+ private long nextReport = startTime + REPORT_INTERVAL;
+ private long sumLatency = 0;
+
+ static class Metrics {
+
+ private final Destination destination;
+ private final FeedReader reader;
+ private final Executor executor;
+ private final long messagesToSend;
+ private final AtomicReference<Throwable> failure;
+
+ Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference<Throwable> failure, long messagesToSend) {
+ this.destination = destination;
+ this.reader = reader;
+ this.executor = executor;
+ this.messagesToSend = messagesToSend;
+ this.failure = failure;
+ }
+
+ long feed() throws Throwable {
+ long numMessages = 0;
+ while ((failure.get() == null) && (numMessages < messagesToSend)) {
+ FeedOperation op = reader.read();
+ if (op.getType() == FeedOperation.Type.INVALID) {
+ break;
+ }
+ if (executor != null) {
+ executor.execute(() -> sendOperation(op));
+ } else {
+ sendOperation(op);
+ }
+ ++numMessages;
+ }
+ return numMessages;
+ }
+ private void sendOperation(FeedOperation op) {
+ destination.send(op);
+ }
+ }
+
+
+ public static void main(String[] args) throws Throwable {
+ Logger.getLogger("").setLevel(Level.WARNING);
+ new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
+ }
+
+ private interface Destination {
+ void send(FeedOperation op);
+ void close() throws Exception;
+ }
+
+ private static class MbusDestination implements Destination {
+ private final PrintStream err;
+ private final Route route;
+ private final SourceSession session;
+ private final long timeoutMS;
+ private final AtomicReference<Throwable> failure;
+ MbusDestination(SourceSession session, Route route, double timeoutS, AtomicReference<Throwable> failure, PrintStream err) {
+ this.route = route;
+ this.err = err;
+ this.session = session;
+ this.timeoutMS = (long)(timeoutS * 1000.0);
+ this.failure = failure;
+ }
+ public void send(FeedOperation op) {
+ Message msg = newMessage(op);
+ if (msg == null) {
+ err.println("ignoring operation; " + op.getType());
+ return;
+ }
+ msg.setTimeRemaining(timeoutMS);
+ msg.setContext(System.currentTimeMillis());
+ msg.setRoute(route);
+ try {
+ Error err = session.sendBlocking(msg).getError();
+ if (err != null) {
+ failure.set(new IOException(err.toString()));
+ }
+ } catch (InterruptedException e) {}
+ }
+ public void close() {
+ session.destroy();
+ }
+ }
+
+ private static class JsonDestination implements Destination {
+ private final OutputStream outputStream;
+ private final DocumentWriter writer;
+ private final AtomicLong numReplies;
+ private final AtomicReference<Throwable> failure;
+ private boolean isFirst = true;
+ JsonDestination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
+ this.outputStream = outputStream;
+ writer = new JsonWriter(outputStream);
+ this.numReplies = numReplies;
+ this.failure = failure;
+ try {
+ outputStream.write('[');
+ outputStream.write('\n');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ }
+ public void send(FeedOperation op) {
+ if (op.getType() == FeedOperation.Type.DOCUMENT) {
+ if (!isFirst) {
+ try {
+ outputStream.write(',');
+ outputStream.write('\n');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ } else {
+ isFirst = false;
+ }
+ writer.write(op.getDocument());
+ }
+ numReplies.incrementAndGet();
+ }
+ public void close() throws Exception {
+ outputStream.write('\n');
+ outputStream.write(']');
+ outputStream.close();
+ }
+ }
+
+ static private final int NONE = 0;
+ static private final int DOCUMENT = 1;
+ static private final int UPDATE = 2;
+ static private final int REMOVE = 3;
+ private static class VespaV1Destination implements Destination {
+ private final OutputStream outputStream;
+ GrowableByteBuffer buffer = new GrowableByteBuffer(16384);
+ ByteBuffer header = ByteBuffer.allocate(16);
+ private final AtomicLong numReplies;
+ private final AtomicReference<Throwable> failure;
+ VespaV1Destination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
+ this.outputStream = outputStream;
+ this.numReplies = numReplies;
+ this.failure = failure;
+ try {
+ outputStream.write('V');
+ outputStream.write('1');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ }
+ public void send(FeedOperation op) {
+ TestAndSetCondition cond = op.getCondition();
+ buffer.putUtf8String(cond.getSelection());
+ DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer);
+ int type = NONE;
+ if (op.getType() == FeedOperation.Type.DOCUMENT) {
+ writer.write(op.getDocument());
+ type = DOCUMENT;
+ } else if (op.getType() == FeedOperation.Type.UPDATE) {
+ writer.write(op.getDocumentUpdate());
+ type = UPDATE;
+ } else if (op.getType() == FeedOperation.Type.REMOVE) {
+ writer.write(op.getRemove());
+ type = REMOVE;
+ }
+ int sz = buffer.position();
+ long hash = hash(buffer.array(), sz);
+ try {
+ header.putInt(sz);
+ header.putInt(type);
+ header.putLong(hash);
+ outputStream.write(header.array(), 0, header.position());
+ outputStream.write(buffer.array(), 0, buffer.position());
+ header.clear();
+ buffer.clear();
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ numReplies.incrementAndGet();
+ }
+ public void close() throws Exception {
+ outputStream.close();
+ }
+ static long hash(byte [] buf, int length) {
+ return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0);
+ }
+ }
+
+ private static int readExact(InputStream in, byte [] buf) throws IOException {
+ return in.readNBytes(buf, 0, buf.length);
+ }
+
+ static class VespaV1FeedReader implements FeedReader {
+ private final InputStream in;
+ private final DocumentTypeManager mgr;
+ private final byte[] prefix = new byte[16];
+ VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException {
+ this.in = in;
+ this.mgr = mgr;
+ byte [] header = new byte[2];
+ int read = readExact(in, header);
+ if ((read != header.length) || (header[0] != 'V') || (header[1] != '1')) {
+ throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header));
+ }
+ }
+
+ static class LazyDocumentOperation extends ConditionalFeedOperation {
+ private final DocumentDeserializer deserializer;
+ LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
+ super(Type.DOCUMENT, condition);
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public Document getDocument() {
+ return new Document(deserializer);
+ }
+ }
+ static class LazyUpdateOperation extends ConditionalFeedOperation {
+ private final DocumentDeserializer deserializer;
+ LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
+ super(Type.UPDATE, condition);
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public DocumentUpdate getDocumentUpdate() {
+ return new DocumentUpdate(deserializer);
+ }
+ }
+ @Override
+ public FeedOperation read() throws Exception {
+ int read = readExact(in, prefix);
+ if (read != prefix.length) {
+ return FeedOperation.INVALID;
+ }
+ ByteBuffer header = ByteBuffer.wrap(prefix);
+ int sz = header.getInt();
+ int type = header.getInt();
+ long hash = header.getLong();
+ byte [] blob = new byte[sz];
+ read = readExact(in, blob);
+ if (read != blob.length) {
+ throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read);
+ }
+ long computedHash = VespaV1Destination.hash(blob, blob.length);
+ if (computedHash != hash) {
+ throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash);
+ }
+ GrowableByteBuffer buf = GrowableByteBuffer.wrap(blob);
+ String condition = buf.getUtf8String();
+ DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, buf);
+ TestAndSetCondition testAndSetCondition = condition.isEmpty()
+ ? TestAndSetCondition.NOT_PRESENT_CONDITION
+ : new TestAndSetCondition(condition);
+ if (type == DOCUMENT) {
+ return new LazyDocumentOperation(deser, testAndSetCondition);
+ } else if (type == UPDATE) {
+ return new LazyUpdateOperation(deser, testAndSetCondition);
+ } else if (type == REMOVE) {
+ return new RemoveFeedOperation(new DocumentId(deser), testAndSetCondition);
+ } else {
+ throw new IllegalArgumentException("Unknown operation " + type);
+ }
+ }
+ }
+
+ private Destination createDumper(FeederParams params) {
+ if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) {
+ return new VespaV1Destination(params.getDumpStream(), failure, numReplies);
+ }
+ return new JsonDestination(params.getDumpStream(), failure, numReplies);
+ }
+
+
+ @SuppressWarnings("deprecation")
+ SimpleFeeder(FeederParams params) {
+ inputStreams = params.getInputStreams();
+ out = params.getStdOut();
+ numThreads = params.getNumDispatchThreads();
+ numMessagesToSend = params.getNumMessagesToSend();
+ mbus = newMessageBus(docTypeMgr, params);
+ session = newSession(mbus, this, params);
+ documentTypeConfigSubscriber = DocumentTypeManagerConfigurer.configure(docTypeMgr, params.getConfigId());
+ benchmarkMode = params.isBenchmarkMode();
+ destination = (params.getDumpStream() != null)
+ ? createDumper(params)
+ : new MbusDestination(session, params.getRoute(), params.getTimeout(), failure, params.getStdErr());
+ }
+
+ SourceSession getSourceSession() { return session; }
+ private FeedReader createFeedReader(InputStream in) throws Exception {
+ in.mark(8);
+ byte [] b = new byte[2];
+ int numRead = readExact(in, b);
+ in.reset();
+ if (numRead != b.length) {
+ throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes.");
+ }
+ if (b[0] == '[') {
+ return new JsonFeedReader(in, docTypeMgr);
+ } else if ((b[0] == 'V') && (b[1] == '1')) {
+ return new VespaV1FeedReader(in, docTypeMgr);
+ } else {
+ return new VespaXMLFeedReader(in, docTypeMgr);
+ }
+ }
+
+
+ static class RetryExecutionHandler implements RejectedExecutionHandler {
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ SimpleFeeder run() throws Throwable {
+ ExecutorService executor = (numThreads > 1)
+ ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(numThreads*100),
+ ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"),
+ new RetryExecutionHandler())
+ : null;
+ printHeader(out);
+ long numMessagesSent = 0;
+ for (InputStream in : inputStreams) {
+ Metrics m = new Metrics(destination, createFeedReader(in), executor, failure, numMessagesToSend);
+ numMessagesSent += m.feed();
+ }
+ while (failure.get() == null && numReplies.get() < numMessagesSent) {
+ Thread.sleep(100);
+ }
+ if (failure.get() != null) {
+ throw failure.get();
+ }
+ printReport(out);
+ return this;
+ }
+
+ void close() throws Exception {
+ destination.close();
+ mbus.destroy();
+ }
+
+ private static Message newMessage(FeedOperation op) {
+ switch (op.getType()) {
+ case DOCUMENT: {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument()));
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ case REMOVE: {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove());
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ case UPDATE: {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate());
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ default:
+ return null;
+ }
+ }
+
+ private static boolean containsFatalErrors(Stream<Error> errors) {
+ return errors.anyMatch(e -> e.getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ if (failure.get() != null) {
+ return;
+ }
+ if (containsFatalErrors(reply.getErrors())) {
+ failure.compareAndSet(null, new IOException(formatErrors(reply)));
+ return;
+ }
+ long now = System.currentTimeMillis();
+ long latency = now - (long) reply.getContext();
+ numReplies.incrementAndGet();
+ accumulateReplies(now, latency);
+ }
+ private synchronized void accumulateReplies(long now, long latency) {
+ minLatency = Math.min(minLatency, latency);
+ maxLatency = Math.max(maxLatency, latency);
+ sumLatency += latency;
+ if (benchmarkMode) { return; }
+ if (now > nextReport) {
+ printReport(out);
+ nextReport += REPORT_INTERVAL;
+ }
+ }
+ private static void printHeader(PrintStream out) {
+ out.println("# Time used, num ok, num error, min latency, max latency, average latency");
+ }
+
+ private synchronized void printReport(PrintStream out) {
+ out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
+ numReplies.get(), minLatency, maxLatency, sumLatency / Long.max(1, numReplies.get()));
+ }
+
+ private static String formatErrors(Reply reply) {
+ StringBuilder out = new StringBuilder();
+ out.append(reply.getMessage().toString()).append('\n');
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ out.append(reply.getError(i).toString()).append('\n');
+ }
+ return out.toString();
+ }
+
+ private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, FeederParams params) {
+ return new RPCMessageBus(new MessageBusParams().addProtocol(new DocumentProtocol(docTypeMgr)),
+ new RPCNetworkParams().setSlobrokConfigId(params.getConfigId())
+ .setNumTargetsPerSpec(params.getNumConnectionsPerTarget()),
+ params.getConfigId());
+ }
+
+ private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams ) {
+ SourceSessionParams params = new SourceSessionParams();
+ params.setReplyHandler(replyHandler);
+ if (feederParams.getMaxPending() > 0) {
+ params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending()));
+ } else {
+ DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy()
+ .setWindowSizeIncrement(feederParams.getWindowIncrementSize())
+ .setResizeRate(feederParams.getWindowResizeRate())
+ .setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor())
+ .setWindowSizeBackOff(feederParams.getWindowSizeBackOff());
+ params.setThrottlePolicy(throttlePolicy);
+ }
+ return mbus.getMessageBus().createSourceSession(params);
+ }
+}
diff --git a/vespaclient-java/src/main/sh/vespa-feed-perf b/vespaclient-java/src/main/sh/vespa-feed-perf
new file mode 100755
index 00000000000..bdee1142a2d
--- /dev/null
+++ b/vespaclient-java/src/main/sh/vespa-feed-perf
@@ -0,0 +1,89 @@
+#!/bin/sh
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+# BEGIN environment bootstrap section
+# Do not edit between here and END as this section should stay identical in all scripts
+
+findpath () {
+ myname=${0}
+ mypath=${myname%/*}
+ myname=${myname##*/}
+ empty_if_start_slash=${mypath%%/*}
+ if [ "${empty_if_start_slash}" ]; then
+ mypath=$(pwd)/${mypath}
+ fi
+ if [ "$mypath" ] && [ -d "$mypath" ]; then
+ return
+ fi
+ mypath=$(pwd)
+ if [ -f "${mypath}/${myname}" ]; then
+ return
+ fi
+ echo "FATAL: Could not figure out the path where $myname lives from $0"
+ exit 1
+}
+
+COMMON_ENV=libexec/vespa/common-env.sh
+
+source_common_env () {
+ if [ "$VESPA_HOME" ] && [ -d "$VESPA_HOME" ]; then
+ export VESPA_HOME
+ common_env=$VESPA_HOME/$COMMON_ENV
+ if [ -f "$common_env" ]; then
+ . $common_env
+ return
+ fi
+ fi
+ return 1
+}
+
+findroot () {
+ source_common_env && return
+ if [ "$VESPA_HOME" ]; then
+ echo "FATAL: bad VESPA_HOME value '$VESPA_HOME'"
+ exit 1
+ fi
+ if [ "$ROOT" ] && [ -d "$ROOT" ]; then
+ VESPA_HOME="$ROOT"
+ source_common_env && return
+ fi
+ findpath
+ while [ "$mypath" ]; do
+ VESPA_HOME=${mypath}
+ source_common_env && return
+ mypath=${mypath%/*}
+ done
+ echo "FATAL: missing VESPA_HOME environment variable"
+ echo "Could not locate $COMMON_ENV anywhere"
+ exit 1
+}
+
+findhost () {
+ if [ "${VESPA_HOSTNAME}" = "" ]; then
+ VESPA_HOSTNAME=$(vespa-detect-hostname || hostname -f || hostname || echo "localhost") || exit 1
+ fi
+ validate="${VESPA_HOME}/bin/vespa-validate-hostname"
+ if [ -f "$validate" ]; then
+ "$validate" "${VESPA_HOSTNAME}" || exit 1
+ fi
+ export VESPA_HOSTNAME
+}
+
+findroot
+findhost
+
+ROOT=${VESPA_HOME%/}
+export ROOT
+
+# END environment bootstrap section
+
+export VESPA_LOG_TARGET=file:/dev/null
+export MALLOC_ARENA_MAX=1 # Does not need fast allocation
+java \
+-server -enableassertions \
+-XX:ThreadStackSize=512 \
+-XX:MaxJavaStackTraceDepth=1000000 \
+-Djava.library.path=${VESPA_HOME}/libexec64/native:${VESPA_HOME}/lib64 \
+-XX:MaxDirectMemorySize=64m -Djava.awt.headless=true \
+-Xms128m -Xmx1024m $(getJavaOptionsIPV46) \
+-cp ${VESPA_HOME}/lib/jars/vespaclient-java-jar-with-dependencies.jar com.yahoo.vespa.feed.perf.SimpleFeeder "$@"
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java
new file mode 100644
index 00000000000..fbb38083aad
--- /dev/null
+++ b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java
@@ -0,0 +1,176 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.messagebus.routing.Route;
+import org.apache.commons.cli.ParseException;
+import org.junit.Test;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class FeederParamsTest {
+ private static final String TESTFILE_JSON = "test.json";
+ private static final String TESTFILE_VESPA = "test.vespa";
+ private static final String TESTFILE_UNKNOWN = "test.xyz";
+ private static final double EPSILON = 0.000000000001;
+
+
+ @Test
+ public void requireThatAccessorsWork() {
+ FeederParams params = new FeederParams();
+
+ PrintStream stdErr = new PrintStream(new ByteArrayOutputStream());
+ params.setStdErr(stdErr);
+ assertSame(stdErr, params.getStdErr());
+
+ PrintStream stdOut = new PrintStream(new ByteArrayOutputStream());
+ params.setStdOut(stdOut);
+ assertSame(stdOut, params.getStdOut());
+
+ params.setConfigId("my_config_id");
+ assertEquals("my_config_id", params.getConfigId());
+
+ assertFalse(params.isSerialTransferEnabled());
+ params.setSerialTransfer();
+ assertTrue(params.isSerialTransferEnabled());
+ }
+
+ @Test
+ public void requireThatParamsHaveReasonableDefaults() {
+ FeederParams params = new FeederParams();
+ assertSame(System.in, params.getInputStreams().get(0));
+ assertSame(System.err, params.getStdErr());
+ assertSame(System.out, params.getStdOut());
+ assertEquals(Route.parse("default"), params.getRoute());
+ assertEquals("client", params.getConfigId());
+ assertFalse(params.isSerialTransferEnabled());
+ }
+
+ @Test
+ public void requireThatSerialTransferOptionIsParsed() throws ParseException, FileNotFoundException {
+ assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled());
+ assertTrue(new FeederParams().parseArgs("--serial").isSerialTransferEnabled());
+ assertEquals(1, new FeederParams().parseArgs("-s").getMaxPending());
+ assertEquals(1, new FeederParams().parseArgs("-s").getNumDispatchThreads());
+ }
+
+ @Test
+ public void requireThatArgumentsAreParsedAsRoute() throws ParseException, FileNotFoundException {
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r", "foo bar").getRoute());
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("--route","foo bar").getRoute());
+ }
+
+ @Test
+ public void requireThatRouteIsAnOptionalArgument() throws ParseException, FileNotFoundException {
+ assertEquals(Route.parse("default"), new FeederParams().parseArgs().getRoute());
+ assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute());
+ }
+
+ @Test
+ public void requireThatNumThreadsAreParsed() throws ParseException, FileNotFoundException {
+ assertEquals(1, new FeederParams().getNumDispatchThreads());
+ assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads());
+ assertEquals(17, new FeederParams().parseArgs("--numthreads", "17").getNumDispatchThreads());
+ }
+ @Test
+ public void requireThatNumConnectionsAreParsed() throws ParseException, FileNotFoundException {
+ assertEquals(1, new FeederParams().getNumConnectionsPerTarget());
+ assertEquals(16, new FeederParams().parseArgs("-c 16").getNumConnectionsPerTarget());
+ assertEquals(17, new FeederParams().parseArgs("--numconnections", "17").getNumConnectionsPerTarget());
+ }
+
+ @Test
+ public void requireThatTimeoutIsParsed() throws ParseException, FileNotFoundException {
+ assertEquals(180.0, new FeederParams().getTimeout(), EPSILON);
+ assertEquals(16.7, new FeederParams().parseArgs("-t 16.7").getTimeout(), EPSILON);
+ assertEquals(1700.9, new FeederParams().parseArgs("--timeout", "1700.9").getTimeout(), EPSILON);
+ }
+
+ @Test
+ public void requireThatNumMessagesToSendAreParsed() throws ParseException, FileNotFoundException {
+ assertEquals(Long.MAX_VALUE, new FeederParams().getNumMessagesToSend());
+ assertEquals(18, new FeederParams().parseArgs("-l 18").getNumMessagesToSend());
+ assertEquals(19, new FeederParams().parseArgs("--nummessages", "19").getNumMessagesToSend());
+ }
+
+ @Test
+ public void requireThatWindowSizeIncrementIsParsed() throws ParseException, FileNotFoundException {
+ assertEquals(20, new FeederParams().getWindowIncrementSize());
+ assertEquals(17, new FeederParams().parseArgs("--window_incrementsize", "17").getWindowIncrementSize());
+ }
+
+ @Test
+ public void requireThatWindowSizeDecrementFactorIsParsed() throws ParseException, FileNotFoundException {
+ assertEquals(1.2, new FeederParams().getWindowDecrementFactor(), EPSILON);
+ assertEquals(1.3, new FeederParams().parseArgs("--window_decrementfactor", "1.3").getWindowDecrementFactor(), EPSILON);
+ }
+
+ @Test
+ public void requireThatWindowResizeRateIsParsed() throws ParseException, FileNotFoundException {
+ assertEquals(3.0, new FeederParams().getWindowResizeRate(), EPSILON);
+ assertEquals(5.5, new FeederParams().parseArgs("--window_resizerate", "5.5").getWindowResizeRate(), EPSILON);
+ }
+
+ @Test
+ public void requireThatWindowBackOffIsParsed() throws ParseException, FileNotFoundException {
+ assertEquals(0.95, new FeederParams().getWindowSizeBackOff(), EPSILON);
+ assertEquals(0.97, new FeederParams().parseArgs("--window_backoff", "0.97").getWindowSizeBackOff(), EPSILON);
+ }
+
+ @Test
+ public void requireThatDumpStreamAreParsed() throws ParseException, IOException {
+ assertNull(new FeederParams().getDumpStream());
+
+ FeederParams p = new FeederParams().parseArgs("-o " + TESTFILE_JSON);
+ assertNotNull(p.getDumpStream());
+ assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat());
+ p.getDumpStream().close();
+
+ p = new FeederParams().parseArgs("-o " + TESTFILE_VESPA);
+ assertNotNull(p.getDumpStream());
+ assertEquals(FeederParams.DumpFormat.VESPA, p.getDumpFormat());
+ p.getDumpStream().close();
+
+ p = new FeederParams().parseArgs("-o " + TESTFILE_UNKNOWN);
+ assertNotNull(p.getDumpStream());
+ assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat());
+ p.getDumpStream().close();
+
+ assertTrue(new File(TESTFILE_JSON).delete());
+ assertTrue(new File(TESTFILE_VESPA).delete());
+ assertTrue(new File(TESTFILE_UNKNOWN).delete());
+ }
+
+ @Test
+ public void requireThatInputFilesAreAggregated() throws ParseException, IOException {
+ File json = new File(TESTFILE_JSON);
+ File vespa = new File(TESTFILE_VESPA);
+ new FileOutputStream(json).close();
+ new FileOutputStream(vespa).close();
+ FeederParams p = new FeederParams();
+ p.parseArgs("-n", "3", TESTFILE_JSON, TESTFILE_VESPA);
+ assertEquals(3, p.getNumDispatchThreads());
+ assertEquals(2, p.getInputStreams().size());
+ assertTrue(p.getInputStreams().get(0) instanceof BufferedInputStream);
+ assertTrue(p.getInputStreams().get(1) instanceof BufferedInputStream);
+ json.delete();
+ vespa.delete();
+ }
+
+}
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java
new file mode 100644
index 00000000000..5380796086a
--- /dev/null
+++ b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java
@@ -0,0 +1,332 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.document.serialization.DeserializationException;
+import com.yahoo.documentapi.messagebus.protocol.DocumentMessage;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.EmptyReply;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.ErrorCode;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.messagebus.ThrottlePolicy;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SimpleFeederTest {
+
+ private static final String CONFIG_DIR = "target/test-classes/";
+
+ @Test
+ public void requireThatXMLFeederWorks() throws Throwable {
+ assertFeed("<vespafeed>" +
+ " <document documenttype='simple' documentid='id:scheme:simple::0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <update documenttype='simple' documentid='id:scheme:simple::1'>" +
+ " <assign field='my_str'>bar</assign>" +
+ " </update>" +
+ " <remove documenttype='simple' documentid='id:scheme:simple::2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ }
+
+ @Test
+ public void requireThatXML2JsonFeederWorks() throws Throwable {
+ ByteArrayOutputStream dump = new ByteArrayOutputStream();
+ assertFeed(new FeederParams().setDumpStream(dump),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='id:simple:simple::0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <update documenttype='simple' documentid='id:simple:simple::1'>" +
+ " <assign field='my_str'>bar</assign>" +
+ " </update>" +
+ " <remove documenttype='simple' documentid='id:simple:simple::2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ assertEquals(58, dump.size());
+ assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}}\n]", dump.toString());
+ }
+
+ @Test
+ public void requireThatDualPutXML2JsonFeederWorks() throws Throwable {
+ ByteArrayOutputStream dump = new ByteArrayOutputStream();
+ assertFeed(new FeederParams().setDumpStream(dump),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='id:simple:simple::0'>" +
+ " <my_str>foo</my_str>" +
+ " </document>" +
+ " <document documenttype='simple' documentid='id:simple:simple::1'>" +
+ " <my_str>bar</my_str>" +
+ " </document>" +
+ " <remove documenttype='simple' documentid='id:simple:simple::2'/>" +
+ "</vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ assertEquals(115, dump.size());
+ assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}},\n {\"id\":\"id:simple:simple::1\",\"fields\":{\"my_str\":\"bar\"}}\n]", dump.toString());
+ assertFeed(dump.toString(),
+ new MessageHandler() {
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*2,.+\n");
+ }
+
+ @Test
+ public void requireThatJson2VespaFeederWorks() throws Throwable {
+ ByteArrayOutputStream dump = new ByteArrayOutputStream();
+ assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA),
+ "[" +
+ " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," +
+ " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," +
+ " { \"remove\": \"id:simple:simple::2\", \"condition\":\"true\"}" +
+ "]",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ assertEquals(187, dump.size());
+ assertFeed(new ByteArrayInputStream(dump.toByteArray()),
+ new MessageHandler() {
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ }
+
+ @Test
+ public void requireThatJsonFeederWorks() throws Throwable {
+ assertFeed("[" +
+ " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," +
+ " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," +
+ " { \"remove\": \"id:simple:simple::2\"}" +
+ "]",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = ((DocumentMessage)msg).createReply();
+ reply.swapState(msg);
+ reply.popHandler().handleReply(reply);
+ }
+ },
+ "",
+ "(.+\n)+" +
+ "\\s*\\d+,\\s*3,.+\n");
+ }
+
+ @Test
+ public void requireThatParseFailuresThrowInMainThread() throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(),
+ "<vespafeed>" +
+ " <document documenttype='unknown' documentid='id:scheme:simple::0'/>" +
+ "</vespafeed>",
+ null);
+ try {
+ driver.run();
+ fail();
+ } catch (DeserializationException e) {
+ assertEquals("Field 'id:scheme:simple::0': Must specify an existing document type, not 'unknown' (at line 1, column 83)",
+ e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatSyncFailuresThrowInMainThread() throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(),
+ "<vespafeed>" +
+ " <document documenttype='simple' documentid='id:scheme:simple::0'/>" +
+ "</vespafeed>",
+ null);
+ driver.feeder.getSourceSession().close();
+ try {
+ driver.run();
+ fail();
+ } catch (IOException e) {
+ assertEquals("[SEND_QUEUE_CLOSED @ localhost]: Source session is closed.", e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatAsyncFailuresThrowInMainThread() throws Throwable {
+ TestDriver driver = new TestDriver(new FeederParams(),
+ "<vespafeed><document documenttype='simple' documentid='id:scheme:simple::0'/></vespafeed>",
+ new MessageHandler() {
+
+ @Override
+ public void handleMessage(Message msg) {
+ Reply reply = new EmptyReply();
+ reply.swapState(msg);
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 6, "foo"));
+ reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 9, "bar"));
+ reply.popHandler().handleReply(reply);
+ }
+ });
+ try {
+ driver.run();
+ fail();
+ } catch (IOException e) {
+ assertMatches("com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage@.+\n" +
+ "\\[UNKNOWN\\(250006\\) @ .+\\]: foo\n" +
+ "\\[UNKNOWN\\(250009\\) @ .+\\]: bar\n",
+ e.getMessage());
+ }
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatDynamicThrottlingIsDefault() throws Exception {
+ TestDriver driver = new TestDriver(new FeederParams(), "", null);
+ assertEquals(DynamicThrottlePolicy.class, getThrottlePolicy(driver).getClass());
+ assertTrue(driver.close());
+ }
+
+ @Test
+ public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception {
+ TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(), "", null);
+ assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass());
+ assertTrue(driver.close());
+ }
+
+ private static ThrottlePolicy getThrottlePolicy(TestDriver driver) {
+ return (ThrottlePolicy)getField(driver.feeder.getSourceSession(), "throttlePolicy");
+ }
+
+ private static Object getField(Object obj, String fieldName) {
+ try {
+ Field field = obj.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(obj);
+ } catch (IllegalAccessException | NoSuchFieldException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut);
+ }
+ private static void assertFeed(InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut);
+ }
+ private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ assertFeed(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator, expectedErr, expectedOut);
+ }
+ private static void assertFeed(FeederParams params, InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable {
+ TestDriver driver = new TestDriver(params, in, validator);
+ driver.run();
+ assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8));
+ assertMatches(expectedOut, new String(driver.out.toByteArray(), StandardCharsets.UTF_8));
+ assertTrue(driver.close());
+ }
+
+ private static void assertMatches(String expected, String actual) {
+ if (!Pattern.matches(expected, actual)) {
+ assertEquals(expected, actual);
+ }
+ }
+
+ private static class TestDriver {
+
+ final ByteArrayOutputStream err = new ByteArrayOutputStream();
+ final ByteArrayOutputStream out = new ByteArrayOutputStream();
+ final SimpleFeeder feeder;
+ final SimpleServer server;
+
+ TestDriver(FeederParams params, String in, MessageHandler validator) throws IOException, ListenFailedException {
+ this(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator);
+ }
+ TestDriver(FeederParams params, InputStream in, MessageHandler validator) throws IOException, ListenFailedException {
+ server = new SimpleServer(CONFIG_DIR, validator);
+ feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR)
+ .setStdErr(new PrintStream(err))
+ .setInputStreams(Arrays.asList(in))
+ .setStdOut(new PrintStream(out)));
+ }
+
+ void run() throws Throwable {
+ feeder.run();
+ }
+
+ boolean close() throws Exception {
+ feeder.close();
+ server.close();
+ return true;
+ }
+ }
+
+}
diff --git a/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java
new file mode 100644
index 00000000000..a458a59f997
--- /dev/null
+++ b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java
@@ -0,0 +1,63 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.jrt.ListenFailedException;
+import com.yahoo.jrt.slobrok.server.Slobrok;
+import com.yahoo.messagebus.DestinationSession;
+import com.yahoo.messagebus.DestinationSessionParams;
+import com.yahoo.messagebus.MessageBus;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.MessageHandler;
+import com.yahoo.messagebus.network.Identity;
+import com.yahoo.messagebus.network.rpc.RPCNetwork;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SimpleServer {
+
+ private final DocumentTypeManager documentMgr;
+ private final Slobrok slobrok;
+ private final MessageBus mbus;
+ private final DestinationSession session;
+
+ @SuppressWarnings("deprecation")
+ public SimpleServer(String configDir, MessageHandler msgHandler) throws IOException, ListenFailedException {
+ slobrok = new Slobrok();
+ documentMgr = DocumentTypeManager.fromFile(configDir + "/documentmanager.cfg");
+ mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams()
+ .setSlobrokConfigId(slobrok.configId())
+ .setIdentity(new Identity("server"))),
+ new MessageBusParams().addProtocol(new DocumentProtocol(documentMgr)));
+ session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(msgHandler));
+
+ PrintWriter writer = new PrintWriter(new FileWriter(configDir + "/messagebus.cfg"));
+ writer.println("routingtable[1]\n" +
+ "routingtable[0].protocol \"document\"\n" +
+ "routingtable[0].hop[0]\n" +
+ "routingtable[0].route[1]\n" +
+ "routingtable[0].route[0].name \"default\"\n" +
+ "routingtable[0].route[0].hop[1]\n" +
+ "routingtable[0].route[0].hop[0] \"" + session.getConnectionSpec() + "\"");
+ writer.close();
+
+ writer = new PrintWriter(new FileWriter(configDir + "/slobroks.cfg"));
+ writer.println(slobrok.configId().substring(4));
+ writer.close();
+ }
+
+ @SuppressWarnings("deprecation")
+ public final void close() {
+ session.destroy();
+ mbus.destroy();
+ slobrok.stop();
+ }
+
+}
diff --git a/vespaclient-java/src/test/resources/documentmanager.cfg b/vespaclient-java/src/test/resources/documentmanager.cfg
new file mode 100644
index 00000000000..2bd7a3c6080
--- /dev/null
+++ b/vespaclient-java/src/test/resources/documentmanager.cfg
@@ -0,0 +1,59 @@
+doctype[2]
+doctype[0].name "document"
+doctype[0].idx 10000
+doctype[0].contentstruct 10001
+doctype[0].primitivetype[0].idx 10002
+doctype[0].primitivetype[0].internalid 0
+doctype[0].primitivetype[0].name "int"
+doctype[0].primitivetype[1].idx 10003
+doctype[0].primitivetype[1].internalid 5
+doctype[0].primitivetype[1].name "double"
+doctype[0].primitivetype[2].idx 10004
+doctype[0].primitivetype[2].internalid 2
+doctype[0].primitivetype[2].name "string"
+doctype[0].annotationtype[0].idx 10005
+doctype[0].annotationtype[0].name "proximity_break"
+doctype[0].annotationtype[0].internalid 8
+doctype[0].annotationtype[0].datatype 10003
+doctype[0].annotationtype[1].idx 10006
+doctype[0].annotationtype[1].name "normalized"
+doctype[0].annotationtype[1].internalid 4
+doctype[0].annotationtype[1].datatype 10004
+doctype[0].annotationtype[2].idx 10007
+doctype[0].annotationtype[2].name "reading"
+doctype[0].annotationtype[2].internalid 5
+doctype[0].annotationtype[2].datatype 10004
+doctype[0].annotationtype[3].idx 10008
+doctype[0].annotationtype[3].name "term"
+doctype[0].annotationtype[3].internalid 1
+doctype[0].annotationtype[3].datatype 10004
+doctype[0].annotationtype[4].idx 10009
+doctype[0].annotationtype[4].name "transformed"
+doctype[0].annotationtype[4].internalid 7
+doctype[0].annotationtype[4].datatype 10004
+doctype[0].annotationtype[5].idx 10010
+doctype[0].annotationtype[5].name "canonical"
+doctype[0].annotationtype[5].internalid 3
+doctype[0].annotationtype[5].datatype 10004
+doctype[0].annotationtype[6].idx 10011
+doctype[0].annotationtype[6].name "token_type"
+doctype[0].annotationtype[6].internalid 2
+doctype[0].annotationtype[6].datatype 10002
+doctype[0].annotationtype[7].idx 10012
+doctype[0].annotationtype[7].name "special_token"
+doctype[0].annotationtype[7].internalid 9
+doctype[0].annotationtype[8].idx 10013
+doctype[0].annotationtype[8].name "stem"
+doctype[0].annotationtype[8].internalid 6
+doctype[0].annotationtype[8].datatype 10004
+doctype[0].structtype[0].idx 10001
+doctype[0].structtype[0].name document.header
+doctype[1].name "simple"
+doctype[1].idx 10014
+doctype[1].inherits[0].idx 10000
+doctype[1].contentstruct 10015
+doctype[1].structtype[0].idx 10015
+doctype[1].structtype[0].name simple.header
+doctype[1].structtype[0].field[0].name "my_str"
+doctype[1].structtype[0].field[0].internalid 1874040549
+doctype[1].structtype[0].field[0].type 10004