aboutsummaryrefslogtreecommitdiffstats
path: root/vespaclient-java/src/main/java/com/yahoo
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-29 21:45:44 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-29 21:45:44 +0100
commit13984de4d6ab9120920c6c2c54aecb1ea9deee08 (patch)
tree1c95e65073717d96d71b944539573c41cbd43e43 /vespaclient-java/src/main/java/com/yahoo
parent0bc82ef441d34bce1ae79b797973393c0675184c (diff)
Collapse the vespa_feed_perf into the other feed clients.
Diffstat (limited to 'vespaclient-java/src/main/java/com/yahoo')
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java205
-rw-r--r--vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java518
2 files changed, 723 insertions, 0 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
new file mode 100644
index 00000000000..0ecc4198e46
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -0,0 +1,205 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.messagebus.routing.Route;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+class FeederParams {
+
+ private static final int BUFFER_SIZE = 0x100000;
+ enum DumpFormat {JSON, VESPA}
+ private PrintStream stdErr = System.err;
+ private PrintStream stdOut = System.out;
+ private Route route = Route.parse("default");
+ private String configId = "client";
+ private OutputStream dumpStream = null;
+ private DumpFormat dumpFormat = DumpFormat.JSON;
+ private boolean benchmarkMode = false;
+ private int numDispatchThreads = 1;
+ private int maxPending = 0;
+ private double timeout = 180.0;
+
+ private double windowSizeBackOff = 0.95;
+ private double windowDecrementFactor = 1.2;
+ private double windowResizeRate = 3;
+ private int windowIncrementSize = 20;
+
+ private int numConnectionsPerTarget = 1;
+ private long numMessagesToSend = Long.MAX_VALUE;
+ private List<InputStream> inputStreams = new ArrayList<>();
+
+ FeederParams() {
+ inputStreams.add(System.in);
+ }
+
+ PrintStream getStdErr() {
+ return stdErr;
+ }
+
+ FeederParams setStdErr(PrintStream stdErr) {
+ this.stdErr = stdErr;
+ return this;
+ }
+
+ PrintStream getStdOut() {
+ return stdOut;
+ }
+
+ FeederParams setStdOut(PrintStream stdOut) {
+ this.stdOut = stdOut;
+ return this;
+ }
+
+ Route getRoute() {
+ return route;
+ }
+ OutputStream getDumpStream() { return dumpStream; }
+ FeederParams setDumpStream(OutputStream dumpStream) {
+ this.dumpStream = dumpStream;
+ return this;
+ }
+
+ DumpFormat getDumpFormat() { return dumpFormat; }
+ FeederParams setDumpFormat(DumpFormat dumpFormat) {
+ this.dumpFormat = dumpFormat;
+ return this;
+ }
+
+ String getConfigId() {
+ return configId;
+ }
+
+ FeederParams setConfigId(String configId) {
+ this.configId = configId;
+ return this;
+ }
+ public double getWindowSizeBackOff() {
+ return windowSizeBackOff;
+ }
+
+ public double getWindowDecrementFactor() {
+ return windowDecrementFactor;
+ }
+
+ public double getWindowResizeRate() {
+ return windowResizeRate;
+ }
+ public double getTimeout() {
+ return timeout;
+ }
+
+ public int getWindowIncrementSize() {
+ return windowIncrementSize;
+ }
+
+ int getNumConnectionsPerTarget() { return numConnectionsPerTarget; }
+
+ long getNumMessagesToSend() { return numMessagesToSend; }
+
+ boolean isSerialTransferEnabled() {
+ return maxPending == 1;
+ }
+
+ FeederParams setSerialTransfer() {
+ maxPending = 1;
+ numDispatchThreads = 1;
+ return this;
+ }
+ List<InputStream> getInputStreams() { return inputStreams; }
+ FeederParams setInputStreams(List<InputStream> inputStreams) {
+ this.inputStreams = inputStreams;
+ return this;
+ }
+
+ int getNumDispatchThreads() { return numDispatchThreads; }
+ int getMaxPending() { return maxPending; }
+ boolean isBenchmarkMode() { return benchmarkMode; }
+
+ FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException {
+ Options opts = new Options();
+ opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation and a single thread");
+ opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id.");
+ opts.addOption("m", "maxpending", true, "Max number of inflights messages. Default is auto.");
+ opts.addOption("r", "route", true, "Route for sending messages. default is 'default'....");
+ opts.addOption("b", "mode", true, "Mode for benchmarking.");
+ opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension.");
+ opts.addOption("c", "numconnections", true, "Number of connections per host.");
+ opts.addOption("t", "timeout", true, "Timeout for a message in seconds. default = " + timeout);
+ opts.addOption("l", "nummessages", true, "Number of messages to send (all is default).");
+ opts.addOption("wi", "window_incrementsize", true, "Dynamic window increment step size. default = " + windowIncrementSize);
+ opts.addOption("wd", "window_decrementfactor", true, "Dynamic window decrement step size factor. default = " + windowDecrementFactor);
+ opts.addOption("wb", "window_backoffactor", true, "Dynamic window backoff factor. default = " + windowSizeBackOff);
+ opts.addOption("wr", "window_resizerate", true, "Dynamic window resize rate. default = " + windowResizeRate);
+
+ CommandLine cmd = new DefaultParser().parse(opts, args);
+
+ if (cmd.hasOption('n')) {
+ numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim());
+ }
+ if (cmd.hasOption('m')) {
+ maxPending = Integer.valueOf(cmd.getOptionValue('m').trim());
+ }
+ if (cmd.hasOption('c')) {
+ numConnectionsPerTarget = Integer.valueOf(cmd.getOptionValue('c').trim());
+ }
+ if (cmd.hasOption("wi")) {
+ windowIncrementSize = Integer.valueOf(cmd.getOptionValue("wi").trim());
+ }
+ if (cmd.hasOption("wd")) {
+ windowDecrementFactor = Double.valueOf(cmd.getOptionValue("wd").trim());
+ }
+ if (cmd.hasOption("wb")) {
+ windowSizeBackOff = Double.valueOf(cmd.getOptionValue("wb").trim());
+ }
+ if (cmd.hasOption("wr")) {
+ windowResizeRate = Double.valueOf(cmd.getOptionValue("wr").trim());
+ }
+ if (cmd.hasOption('r')) {
+ route = Route.parse(cmd.getOptionValue('r').trim());
+ }
+ if (cmd.hasOption("t")) {
+ timeout = Double.valueOf(cmd.getOptionValue("t").trim());
+ }
+ benchmarkMode = cmd.hasOption('b');
+ if (cmd.hasOption('o')) {
+ String fileName = cmd.getOptionValue('o').trim();
+ dumpStream = new FileOutputStream(new File(fileName));
+ if (fileName.endsWith(".vespa")) {
+ dumpFormat = DumpFormat.VESPA;
+ }
+ }
+ if (cmd.hasOption('s')) {
+ setSerialTransfer();
+ }
+ if (cmd.hasOption('l')) {
+ numMessagesToSend = Long.valueOf(cmd.getOptionValue('l').trim());
+ }
+
+ if ( !cmd.getArgList().isEmpty()) {
+ inputStreams.clear();
+ for (String fileName : cmd.getArgList()) {
+ inputStreams.add(new BufferedInputStream(new FileInputStream(new File(fileName)), BUFFER_SIZE));
+ }
+ }
+
+ return this;
+ }
+
+}
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
new file mode 100644
index 00000000000..c40e2c21561
--- /dev/null
+++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -0,0 +1,518 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.feed.perf;
+
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.config.subscription.ConfigSubscriber;
+import com.yahoo.document.Document;
+import com.yahoo.document.DocumentId;
+import com.yahoo.document.DocumentPut;
+import com.yahoo.document.DocumentTypeManager;
+import com.yahoo.document.DocumentTypeManagerConfigurer;
+import com.yahoo.document.DocumentUpdate;
+import com.yahoo.document.TestAndSetCondition;
+import com.yahoo.document.json.JsonFeedReader;
+import com.yahoo.document.json.JsonWriter;
+import com.yahoo.document.serialization.DocumentDeserializer;
+import com.yahoo.document.serialization.DocumentDeserializerFactory;
+import com.yahoo.document.serialization.DocumentSerializer;
+import com.yahoo.document.serialization.DocumentSerializerFactory;
+import com.yahoo.document.serialization.DocumentWriter;
+import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol;
+import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage;
+import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage;
+import com.yahoo.io.GrowableByteBuffer;
+import com.yahoo.messagebus.DynamicThrottlePolicy;
+import com.yahoo.messagebus.Error;
+import com.yahoo.messagebus.Message;
+import com.yahoo.messagebus.MessageBusParams;
+import com.yahoo.messagebus.RPCMessageBus;
+import com.yahoo.messagebus.Reply;
+import com.yahoo.messagebus.ReplyHandler;
+import com.yahoo.messagebus.SourceSession;
+import com.yahoo.messagebus.SourceSessionParams;
+import com.yahoo.messagebus.StaticThrottlePolicy;
+import com.yahoo.messagebus.network.rpc.RPCNetworkParams;
+import com.yahoo.messagebus.routing.Route;
+import com.yahoo.vespaxmlparser.ConditionalFeedOperation;
+import com.yahoo.vespaxmlparser.FeedReader;
+import com.yahoo.vespaxmlparser.FeedOperation;
+import com.yahoo.vespaxmlparser.RemoveFeedOperation;
+import com.yahoo.vespaxmlparser.VespaXMLFeedReader;
+import net.jpountz.xxhash.XXHashFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Stream;
+
+/**
+ * @author Simon Thoresen Hult
+ */
+public class SimpleFeeder implements ReplyHandler {
+
+ private final DocumentTypeManager docTypeMgr = new DocumentTypeManager();
+ private final ConfigSubscriber documentTypeConfigSubscriber;
+ private final List<InputStream> inputStreams;
+ private final PrintStream out;
+ private final RPCMessageBus mbus;
+ private final SourceSession session;
+ private final int numThreads;
+ private final long numMessagesToSend;
+ private final Destination destination;
+ private final boolean benchmarkMode;
+ private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10);
+ private final long startTime = System.currentTimeMillis();
+ private final AtomicReference<Throwable> failure = new AtomicReference<>(null);
+ private final AtomicLong numReplies = new AtomicLong(0);
+ private long maxLatency = Long.MIN_VALUE;
+ private long minLatency = Long.MAX_VALUE;
+ private long nextReport = startTime + REPORT_INTERVAL;
+ private long sumLatency = 0;
+
+ static class Metrics {
+
+ private final Destination destination;
+ private final FeedReader reader;
+ private final Executor executor;
+ private final long messagesToSend;
+ private final AtomicReference<Throwable> failure;
+
+ Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference<Throwable> failure, long messagesToSend) {
+ this.destination = destination;
+ this.reader = reader;
+ this.executor = executor;
+ this.messagesToSend = messagesToSend;
+ this.failure = failure;
+ }
+
+ long feed() throws Throwable {
+ long numMessages = 0;
+ while ((failure.get() == null) && (numMessages < messagesToSend)) {
+ FeedOperation op = reader.read();
+ if (op.getType() == FeedOperation.Type.INVALID) {
+ break;
+ }
+ if (executor != null) {
+ executor.execute(() -> sendOperation(op));
+ } else {
+ sendOperation(op);
+ }
+ ++numMessages;
+ }
+ return numMessages;
+ }
+ private void sendOperation(FeedOperation op) {
+ destination.send(op);
+ }
+ }
+
+
+ public static void main(String[] args) throws Throwable {
+ Logger.getLogger("").setLevel(Level.WARNING);
+ new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
+ }
+
+ private interface Destination {
+ void send(FeedOperation op);
+ void close() throws Exception;
+ }
+
+ private static class MbusDestination implements Destination {
+ private final PrintStream err;
+ private final Route route;
+ private final SourceSession session;
+ private final long timeoutMS;
+ private final AtomicReference<Throwable> failure;
+ MbusDestination(SourceSession session, Route route, double timeoutS, AtomicReference<Throwable> failure, PrintStream err) {
+ this.route = route;
+ this.err = err;
+ this.session = session;
+ this.timeoutMS = (long)(timeoutS * 1000.0);
+ this.failure = failure;
+ }
+ public void send(FeedOperation op) {
+ Message msg = newMessage(op);
+ if (msg == null) {
+ err.println("ignoring operation; " + op.getType());
+ return;
+ }
+ msg.setTimeRemaining(timeoutMS);
+ msg.setContext(System.currentTimeMillis());
+ msg.setRoute(route);
+ try {
+ Error err = session.sendBlocking(msg).getError();
+ if (err != null) {
+ failure.set(new IOException(err.toString()));
+ }
+ } catch (InterruptedException e) {}
+ }
+ public void close() {
+ session.destroy();
+ }
+ }
+
+ private static class JsonDestination implements Destination {
+ private final OutputStream outputStream;
+ private final DocumentWriter writer;
+ private final AtomicLong numReplies;
+ private final AtomicReference<Throwable> failure;
+ private boolean isFirst = true;
+ JsonDestination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
+ this.outputStream = outputStream;
+ writer = new JsonWriter(outputStream);
+ this.numReplies = numReplies;
+ this.failure = failure;
+ try {
+ outputStream.write('[');
+ outputStream.write('\n');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ }
+ public void send(FeedOperation op) {
+ if (op.getType() == FeedOperation.Type.DOCUMENT) {
+ if (!isFirst) {
+ try {
+ outputStream.write(',');
+ outputStream.write('\n');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ } else {
+ isFirst = false;
+ }
+ writer.write(op.getDocument());
+ }
+ numReplies.incrementAndGet();
+ }
+ public void close() throws Exception {
+ outputStream.write('\n');
+ outputStream.write(']');
+ outputStream.close();
+ }
+ }
+
+ static private final int NONE = 0;
+ static private final int DOCUMENT = 1;
+ static private final int UPDATE = 2;
+ static private final int REMOVE = 3;
+ private static class VespaV1Destination implements Destination {
+ private final OutputStream outputStream;
+ GrowableByteBuffer buffer = new GrowableByteBuffer(16384);
+ ByteBuffer header = ByteBuffer.allocate(16);
+ private final AtomicLong numReplies;
+ private final AtomicReference<Throwable> failure;
+ VespaV1Destination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) {
+ this.outputStream = outputStream;
+ this.numReplies = numReplies;
+ this.failure = failure;
+ try {
+ outputStream.write('V');
+ outputStream.write('1');
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ }
+ public void send(FeedOperation op) {
+ TestAndSetCondition cond = op.getCondition();
+ buffer.putUtf8String(cond.getSelection());
+ DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer);
+ int type = NONE;
+ if (op.getType() == FeedOperation.Type.DOCUMENT) {
+ writer.write(op.getDocument());
+ type = DOCUMENT;
+ } else if (op.getType() == FeedOperation.Type.UPDATE) {
+ writer.write(op.getDocumentUpdate());
+ type = UPDATE;
+ } else if (op.getType() == FeedOperation.Type.REMOVE) {
+ writer.write(op.getRemove());
+ type = REMOVE;
+ }
+ int sz = buffer.position();
+ long hash = hash(buffer.array(), sz);
+ try {
+ header.putInt(sz);
+ header.putInt(type);
+ header.putLong(hash);
+ outputStream.write(header.array(), 0, header.position());
+ outputStream.write(buffer.array(), 0, buffer.position());
+ header.clear();
+ buffer.clear();
+ } catch (IOException e) {
+ failure.set(e);
+ }
+ numReplies.incrementAndGet();
+ }
+ public void close() throws Exception {
+ outputStream.close();
+ }
+ static long hash(byte [] buf, int length) {
+ return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0);
+ }
+ }
+
+ private static int readExact(InputStream in, byte [] buf) throws IOException {
+ return in.readNBytes(buf, 0, buf.length);
+ }
+
+ static class VespaV1FeedReader implements FeedReader {
+ private final InputStream in;
+ private final DocumentTypeManager mgr;
+ private final byte[] prefix = new byte[16];
+ VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException {
+ this.in = in;
+ this.mgr = mgr;
+ byte [] header = new byte[2];
+ int read = readExact(in, header);
+ if ((read != header.length) || (header[0] != 'V') || (header[1] != '1')) {
+ throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header));
+ }
+ }
+
+ static class LazyDocumentOperation extends ConditionalFeedOperation {
+ private final DocumentDeserializer deserializer;
+ LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
+ super(Type.DOCUMENT, condition);
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public Document getDocument() {
+ return new Document(deserializer);
+ }
+ }
+ static class LazyUpdateOperation extends ConditionalFeedOperation {
+ private final DocumentDeserializer deserializer;
+ LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) {
+ super(Type.UPDATE, condition);
+ this.deserializer = deserializer;
+ }
+
+ @Override
+ public DocumentUpdate getDocumentUpdate() {
+ return new DocumentUpdate(deserializer);
+ }
+ }
+ @Override
+ public FeedOperation read() throws Exception {
+ int read = readExact(in, prefix);
+ if (read != prefix.length) {
+ return FeedOperation.INVALID;
+ }
+ ByteBuffer header = ByteBuffer.wrap(prefix);
+ int sz = header.getInt();
+ int type = header.getInt();
+ long hash = header.getLong();
+ byte [] blob = new byte[sz];
+ read = readExact(in, blob);
+ if (read != blob.length) {
+ throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read);
+ }
+ long computedHash = VespaV1Destination.hash(blob, blob.length);
+ if (computedHash != hash) {
+ throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash);
+ }
+ GrowableByteBuffer buf = GrowableByteBuffer.wrap(blob);
+ String condition = buf.getUtf8String();
+ DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, buf);
+ TestAndSetCondition testAndSetCondition = condition.isEmpty()
+ ? TestAndSetCondition.NOT_PRESENT_CONDITION
+ : new TestAndSetCondition(condition);
+ if (type == DOCUMENT) {
+ return new LazyDocumentOperation(deser, testAndSetCondition);
+ } else if (type == UPDATE) {
+ return new LazyUpdateOperation(deser, testAndSetCondition);
+ } else if (type == REMOVE) {
+ return new RemoveFeedOperation(new DocumentId(deser), testAndSetCondition);
+ } else {
+ throw new IllegalArgumentException("Unknown operation " + type);
+ }
+ }
+ }
+
+ private Destination createDumper(FeederParams params) {
+ if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) {
+ return new VespaV1Destination(params.getDumpStream(), failure, numReplies);
+ }
+ return new JsonDestination(params.getDumpStream(), failure, numReplies);
+ }
+
+
+ @SuppressWarnings("deprecation")
+ SimpleFeeder(FeederParams params) {
+ inputStreams = params.getInputStreams();
+ out = params.getStdOut();
+ numThreads = params.getNumDispatchThreads();
+ numMessagesToSend = params.getNumMessagesToSend();
+ mbus = newMessageBus(docTypeMgr, params);
+ session = newSession(mbus, this, params);
+ documentTypeConfigSubscriber = DocumentTypeManagerConfigurer.configure(docTypeMgr, params.getConfigId());
+ benchmarkMode = params.isBenchmarkMode();
+ destination = (params.getDumpStream() != null)
+ ? createDumper(params)
+ : new MbusDestination(session, params.getRoute(), params.getTimeout(), failure, params.getStdErr());
+ }
+
+ SourceSession getSourceSession() { return session; }
+ private FeedReader createFeedReader(InputStream in) throws Exception {
+ in.mark(8);
+ byte [] b = new byte[2];
+ int numRead = readExact(in, b);
+ in.reset();
+ if (numRead != b.length) {
+ throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes.");
+ }
+ if (b[0] == '[') {
+ return new JsonFeedReader(in, docTypeMgr);
+ } else if ((b[0] == 'V') && (b[1] == '1')) {
+ return new VespaV1FeedReader(in, docTypeMgr);
+ } else {
+ return new VespaXMLFeedReader(in, docTypeMgr);
+ }
+ }
+
+
+ static class RetryExecutionHandler implements RejectedExecutionHandler {
+
+ @Override
+ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
+ try {
+ executor.getQueue().put(r);
+ } catch (InterruptedException e) {}
+ }
+ }
+
+ SimpleFeeder run() throws Throwable {
+ ExecutorService executor = (numThreads > 1)
+ ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
+ new ArrayBlockingQueue<>(numThreads*100),
+ ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"),
+ new RetryExecutionHandler())
+ : null;
+ printHeader(out);
+ long numMessagesSent = 0;
+ for (InputStream in : inputStreams) {
+ Metrics m = new Metrics(destination, createFeedReader(in), executor, failure, numMessagesToSend);
+ numMessagesSent += m.feed();
+ }
+ while (failure.get() == null && numReplies.get() < numMessagesSent) {
+ Thread.sleep(100);
+ }
+ if (failure.get() != null) {
+ throw failure.get();
+ }
+ printReport(out);
+ return this;
+ }
+
+ void close() throws Exception {
+ destination.close();
+ mbus.destroy();
+ }
+
+ private static Message newMessage(FeedOperation op) {
+ switch (op.getType()) {
+ case DOCUMENT: {
+ PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument()));
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ case REMOVE: {
+ RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove());
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ case UPDATE: {
+ UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate());
+ message.setCondition(op.getCondition());
+ return message;
+ }
+ default:
+ return null;
+ }
+ }
+
+ private static boolean containsFatalErrors(Stream<Error> errors) {
+ return errors.anyMatch(e -> e.getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED);
+ }
+
+ @Override
+ public void handleReply(Reply reply) {
+ if (failure.get() != null) {
+ return;
+ }
+ if (containsFatalErrors(reply.getErrors())) {
+ failure.compareAndSet(null, new IOException(formatErrors(reply)));
+ return;
+ }
+ long now = System.currentTimeMillis();
+ long latency = now - (long) reply.getContext();
+ numReplies.incrementAndGet();
+ accumulateReplies(now, latency);
+ }
+ private synchronized void accumulateReplies(long now, long latency) {
+ minLatency = Math.min(minLatency, latency);
+ maxLatency = Math.max(maxLatency, latency);
+ sumLatency += latency;
+ if (benchmarkMode) { return; }
+ if (now > nextReport) {
+ printReport(out);
+ nextReport += REPORT_INTERVAL;
+ }
+ }
+ private static void printHeader(PrintStream out) {
+ out.println("# Time used, num ok, num error, min latency, max latency, average latency");
+ }
+
+ private synchronized void printReport(PrintStream out) {
+ out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
+ numReplies.get(), minLatency, maxLatency, sumLatency / Long.max(1, numReplies.get()));
+ }
+
+ private static String formatErrors(Reply reply) {
+ StringBuilder out = new StringBuilder();
+ out.append(reply.getMessage().toString()).append('\n');
+ for (int i = 0, len = reply.getNumErrors(); i < len; ++i) {
+ out.append(reply.getError(i).toString()).append('\n');
+ }
+ return out.toString();
+ }
+
+ private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, FeederParams params) {
+ return new RPCMessageBus(new MessageBusParams().addProtocol(new DocumentProtocol(docTypeMgr)),
+ new RPCNetworkParams().setSlobrokConfigId(params.getConfigId())
+ .setNumTargetsPerSpec(params.getNumConnectionsPerTarget()),
+ params.getConfigId());
+ }
+
+ private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams ) {
+ SourceSessionParams params = new SourceSessionParams();
+ params.setReplyHandler(replyHandler);
+ if (feederParams.getMaxPending() > 0) {
+ params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending()));
+ } else {
+ DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy()
+ .setWindowSizeIncrement(feederParams.getWindowIncrementSize())
+ .setResizeRate(feederParams.getWindowResizeRate())
+ .setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor())
+ .setWindowSizeBackOff(feederParams.getWindowSizeBackOff());
+ params.setThrottlePolicy(throttlePolicy);
+ }
+ return mbus.getMessageBus().createSourceSession(params);
+ }
+}