From 13984de4d6ab9120920c6c2c54aecb1ea9deee08 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 29 Nov 2022 21:45:44 +0100 Subject: Collapse the vespa_feed_perf into the other feed clients. --- .../com/yahoo/vespa/feed/perf/FeederParams.java | 205 ++++++++ .../com/yahoo/vespa/feed/perf/SimpleFeeder.java | 518 +++++++++++++++++++++ 2 files changed, 723 insertions(+) create mode 100644 vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java create mode 100644 vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java (limited to 'vespaclient-java/src/main/java/com/yahoo') diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java new file mode 100644 index 00000000000..0ecc4198e46 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -0,0 +1,205 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.messagebus.routing.Route; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Simon Thoresen Hult + */ +class FeederParams { + + private static final int BUFFER_SIZE = 0x100000; + enum DumpFormat {JSON, VESPA} + private PrintStream stdErr = System.err; + private PrintStream stdOut = System.out; + private Route route = Route.parse("default"); + private String configId = "client"; + private OutputStream dumpStream = null; + private DumpFormat dumpFormat = DumpFormat.JSON; + private boolean benchmarkMode = false; + private int numDispatchThreads = 1; + private int maxPending = 0; + private double timeout = 180.0; + + private double windowSizeBackOff = 0.95; + private double windowDecrementFactor = 1.2; + private double windowResizeRate = 3; + private int windowIncrementSize = 20; + + private int numConnectionsPerTarget = 1; + private long numMessagesToSend = Long.MAX_VALUE; + private List inputStreams = new ArrayList<>(); + + FeederParams() { + inputStreams.add(System.in); + } + + PrintStream getStdErr() { + return stdErr; + } + + FeederParams setStdErr(PrintStream stdErr) { + this.stdErr = stdErr; + return this; + } + + PrintStream getStdOut() { + return stdOut; + } + + FeederParams setStdOut(PrintStream stdOut) { + this.stdOut = stdOut; + return this; + } + + Route getRoute() { + return route; + } + OutputStream getDumpStream() { return dumpStream; } + FeederParams setDumpStream(OutputStream dumpStream) { + this.dumpStream = dumpStream; + return this; + } + + DumpFormat getDumpFormat() { return dumpFormat; } + FeederParams setDumpFormat(DumpFormat dumpFormat) { + this.dumpFormat = dumpFormat; + return this; + } + + String getConfigId() { + return configId; + } + + FeederParams setConfigId(String configId) { + this.configId = configId; + return this; + } + public double getWindowSizeBackOff() { + return windowSizeBackOff; + } + + public double getWindowDecrementFactor() { + return windowDecrementFactor; + } + + public double getWindowResizeRate() { + return windowResizeRate; + } + public double getTimeout() { + return timeout; + } + + public int getWindowIncrementSize() { + return windowIncrementSize; + } + + int getNumConnectionsPerTarget() { return numConnectionsPerTarget; } + + long getNumMessagesToSend() { return numMessagesToSend; } + + boolean isSerialTransferEnabled() { + return maxPending == 1; + } + + FeederParams setSerialTransfer() { + maxPending = 1; + numDispatchThreads = 1; + return this; + } + List getInputStreams() { return inputStreams; } + FeederParams setInputStreams(List inputStreams) { + this.inputStreams = inputStreams; + return this; + } + + int getNumDispatchThreads() { return numDispatchThreads; } + int getMaxPending() { return maxPending; } + boolean isBenchmarkMode() { return benchmarkMode; } + + FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException { + Options opts = new Options(); + opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation and a single thread"); + opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id."); + opts.addOption("m", "maxpending", true, "Max number of inflights messages. Default is auto."); + opts.addOption("r", "route", true, "Route for sending messages. default is 'default'...."); + opts.addOption("b", "mode", true, "Mode for benchmarking."); + opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension."); + opts.addOption("c", "numconnections", true, "Number of connections per host."); + opts.addOption("t", "timeout", true, "Timeout for a message in seconds. default = " + timeout); + opts.addOption("l", "nummessages", true, "Number of messages to send (all is default)."); + opts.addOption("wi", "window_incrementsize", true, "Dynamic window increment step size. default = " + windowIncrementSize); + opts.addOption("wd", "window_decrementfactor", true, "Dynamic window decrement step size factor. default = " + windowDecrementFactor); + opts.addOption("wb", "window_backoffactor", true, "Dynamic window backoff factor. default = " + windowSizeBackOff); + opts.addOption("wr", "window_resizerate", true, "Dynamic window resize rate. default = " + windowResizeRate); + + CommandLine cmd = new DefaultParser().parse(opts, args); + + if (cmd.hasOption('n')) { + numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim()); + } + if (cmd.hasOption('m')) { + maxPending = Integer.valueOf(cmd.getOptionValue('m').trim()); + } + if (cmd.hasOption('c')) { + numConnectionsPerTarget = Integer.valueOf(cmd.getOptionValue('c').trim()); + } + if (cmd.hasOption("wi")) { + windowIncrementSize = Integer.valueOf(cmd.getOptionValue("wi").trim()); + } + if (cmd.hasOption("wd")) { + windowDecrementFactor = Double.valueOf(cmd.getOptionValue("wd").trim()); + } + if (cmd.hasOption("wb")) { + windowSizeBackOff = Double.valueOf(cmd.getOptionValue("wb").trim()); + } + if (cmd.hasOption("wr")) { + windowResizeRate = Double.valueOf(cmd.getOptionValue("wr").trim()); + } + if (cmd.hasOption('r')) { + route = Route.parse(cmd.getOptionValue('r').trim()); + } + if (cmd.hasOption("t")) { + timeout = Double.valueOf(cmd.getOptionValue("t").trim()); + } + benchmarkMode = cmd.hasOption('b'); + if (cmd.hasOption('o')) { + String fileName = cmd.getOptionValue('o').trim(); + dumpStream = new FileOutputStream(new File(fileName)); + if (fileName.endsWith(".vespa")) { + dumpFormat = DumpFormat.VESPA; + } + } + if (cmd.hasOption('s')) { + setSerialTransfer(); + } + if (cmd.hasOption('l')) { + numMessagesToSend = Long.valueOf(cmd.getOptionValue('l').trim()); + } + + if ( !cmd.getArgList().isEmpty()) { + inputStreams.clear(); + for (String fileName : cmd.getArgList()) { + inputStreams.add(new BufferedInputStream(new FileInputStream(new File(fileName)), BUFFER_SIZE)); + } + } + + return this; + } + +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java new file mode 100644 index 00000000000..c40e2c21561 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -0,0 +1,518 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentTypeManagerConfigurer; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.document.json.JsonFeedReader; +import com.yahoo.document.json.JsonWriter; +import com.yahoo.document.serialization.DocumentDeserializer; +import com.yahoo.document.serialization.DocumentDeserializerFactory; +import com.yahoo.document.serialization.DocumentSerializer; +import com.yahoo.document.serialization.DocumentSerializerFactory; +import com.yahoo.document.serialization.DocumentWriter; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.RPCMessageBus; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.vespaxmlparser.ConditionalFeedOperation; +import com.yahoo.vespaxmlparser.FeedReader; +import com.yahoo.vespaxmlparser.FeedOperation; +import com.yahoo.vespaxmlparser.RemoveFeedOperation; +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Stream; + +/** + * @author Simon Thoresen Hult + */ +public class SimpleFeeder implements ReplyHandler { + + private final DocumentTypeManager docTypeMgr = new DocumentTypeManager(); + private final ConfigSubscriber documentTypeConfigSubscriber; + private final List inputStreams; + private final PrintStream out; + private final RPCMessageBus mbus; + private final SourceSession session; + private final int numThreads; + private final long numMessagesToSend; + private final Destination destination; + private final boolean benchmarkMode; + private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10); + private final long startTime = System.currentTimeMillis(); + private final AtomicReference failure = new AtomicReference<>(null); + private final AtomicLong numReplies = new AtomicLong(0); + private long maxLatency = Long.MIN_VALUE; + private long minLatency = Long.MAX_VALUE; + private long nextReport = startTime + REPORT_INTERVAL; + private long sumLatency = 0; + + static class Metrics { + + private final Destination destination; + private final FeedReader reader; + private final Executor executor; + private final long messagesToSend; + private final AtomicReference failure; + + Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference failure, long messagesToSend) { + this.destination = destination; + this.reader = reader; + this.executor = executor; + this.messagesToSend = messagesToSend; + this.failure = failure; + } + + long feed() throws Throwable { + long numMessages = 0; + while ((failure.get() == null) && (numMessages < messagesToSend)) { + FeedOperation op = reader.read(); + if (op.getType() == FeedOperation.Type.INVALID) { + break; + } + if (executor != null) { + executor.execute(() -> sendOperation(op)); + } else { + sendOperation(op); + } + ++numMessages; + } + return numMessages; + } + private void sendOperation(FeedOperation op) { + destination.send(op); + } + } + + + public static void main(String[] args) throws Throwable { + Logger.getLogger("").setLevel(Level.WARNING); + new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); + } + + private interface Destination { + void send(FeedOperation op); + void close() throws Exception; + } + + private static class MbusDestination implements Destination { + private final PrintStream err; + private final Route route; + private final SourceSession session; + private final long timeoutMS; + private final AtomicReference failure; + MbusDestination(SourceSession session, Route route, double timeoutS, AtomicReference failure, PrintStream err) { + this.route = route; + this.err = err; + this.session = session; + this.timeoutMS = (long)(timeoutS * 1000.0); + this.failure = failure; + } + public void send(FeedOperation op) { + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + return; + } + msg.setTimeRemaining(timeoutMS); + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + try { + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + failure.set(new IOException(err.toString())); + } + } catch (InterruptedException e) {} + } + public void close() { + session.destroy(); + } + } + + private static class JsonDestination implements Destination { + private final OutputStream outputStream; + private final DocumentWriter writer; + private final AtomicLong numReplies; + private final AtomicReference failure; + private boolean isFirst = true; + JsonDestination(OutputStream outputStream, AtomicReference failure, AtomicLong numReplies) { + this.outputStream = outputStream; + writer = new JsonWriter(outputStream); + this.numReplies = numReplies; + this.failure = failure; + try { + outputStream.write('['); + outputStream.write('\n'); + } catch (IOException e) { + failure.set(e); + } + } + public void send(FeedOperation op) { + if (op.getType() == FeedOperation.Type.DOCUMENT) { + if (!isFirst) { + try { + outputStream.write(','); + outputStream.write('\n'); + } catch (IOException e) { + failure.set(e); + } + } else { + isFirst = false; + } + writer.write(op.getDocument()); + } + numReplies.incrementAndGet(); + } + public void close() throws Exception { + outputStream.write('\n'); + outputStream.write(']'); + outputStream.close(); + } + } + + static private final int NONE = 0; + static private final int DOCUMENT = 1; + static private final int UPDATE = 2; + static private final int REMOVE = 3; + private static class VespaV1Destination implements Destination { + private final OutputStream outputStream; + GrowableByteBuffer buffer = new GrowableByteBuffer(16384); + ByteBuffer header = ByteBuffer.allocate(16); + private final AtomicLong numReplies; + private final AtomicReference failure; + VespaV1Destination(OutputStream outputStream, AtomicReference failure, AtomicLong numReplies) { + this.outputStream = outputStream; + this.numReplies = numReplies; + this.failure = failure; + try { + outputStream.write('V'); + outputStream.write('1'); + } catch (IOException e) { + failure.set(e); + } + } + public void send(FeedOperation op) { + TestAndSetCondition cond = op.getCondition(); + buffer.putUtf8String(cond.getSelection()); + DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer); + int type = NONE; + if (op.getType() == FeedOperation.Type.DOCUMENT) { + writer.write(op.getDocument()); + type = DOCUMENT; + } else if (op.getType() == FeedOperation.Type.UPDATE) { + writer.write(op.getDocumentUpdate()); + type = UPDATE; + } else if (op.getType() == FeedOperation.Type.REMOVE) { + writer.write(op.getRemove()); + type = REMOVE; + } + int sz = buffer.position(); + long hash = hash(buffer.array(), sz); + try { + header.putInt(sz); + header.putInt(type); + header.putLong(hash); + outputStream.write(header.array(), 0, header.position()); + outputStream.write(buffer.array(), 0, buffer.position()); + header.clear(); + buffer.clear(); + } catch (IOException e) { + failure.set(e); + } + numReplies.incrementAndGet(); + } + public void close() throws Exception { + outputStream.close(); + } + static long hash(byte [] buf, int length) { + return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0); + } + } + + private static int readExact(InputStream in, byte [] buf) throws IOException { + return in.readNBytes(buf, 0, buf.length); + } + + static class VespaV1FeedReader implements FeedReader { + private final InputStream in; + private final DocumentTypeManager mgr; + private final byte[] prefix = new byte[16]; + VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException { + this.in = in; + this.mgr = mgr; + byte [] header = new byte[2]; + int read = readExact(in, header); + if ((read != header.length) || (header[0] != 'V') || (header[1] != '1')) { + throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header)); + } + } + + static class LazyDocumentOperation extends ConditionalFeedOperation { + private final DocumentDeserializer deserializer; + LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { + super(Type.DOCUMENT, condition); + this.deserializer = deserializer; + } + + @Override + public Document getDocument() { + return new Document(deserializer); + } + } + static class LazyUpdateOperation extends ConditionalFeedOperation { + private final DocumentDeserializer deserializer; + LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { + super(Type.UPDATE, condition); + this.deserializer = deserializer; + } + + @Override + public DocumentUpdate getDocumentUpdate() { + return new DocumentUpdate(deserializer); + } + } + @Override + public FeedOperation read() throws Exception { + int read = readExact(in, prefix); + if (read != prefix.length) { + return FeedOperation.INVALID; + } + ByteBuffer header = ByteBuffer.wrap(prefix); + int sz = header.getInt(); + int type = header.getInt(); + long hash = header.getLong(); + byte [] blob = new byte[sz]; + read = readExact(in, blob); + if (read != blob.length) { + throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read); + } + long computedHash = VespaV1Destination.hash(blob, blob.length); + if (computedHash != hash) { + throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash); + } + GrowableByteBuffer buf = GrowableByteBuffer.wrap(blob); + String condition = buf.getUtf8String(); + DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, buf); + TestAndSetCondition testAndSetCondition = condition.isEmpty() + ? TestAndSetCondition.NOT_PRESENT_CONDITION + : new TestAndSetCondition(condition); + if (type == DOCUMENT) { + return new LazyDocumentOperation(deser, testAndSetCondition); + } else if (type == UPDATE) { + return new LazyUpdateOperation(deser, testAndSetCondition); + } else if (type == REMOVE) { + return new RemoveFeedOperation(new DocumentId(deser), testAndSetCondition); + } else { + throw new IllegalArgumentException("Unknown operation " + type); + } + } + } + + private Destination createDumper(FeederParams params) { + if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) { + return new VespaV1Destination(params.getDumpStream(), failure, numReplies); + } + return new JsonDestination(params.getDumpStream(), failure, numReplies); + } + + + @SuppressWarnings("deprecation") + SimpleFeeder(FeederParams params) { + inputStreams = params.getInputStreams(); + out = params.getStdOut(); + numThreads = params.getNumDispatchThreads(); + numMessagesToSend = params.getNumMessagesToSend(); + mbus = newMessageBus(docTypeMgr, params); + session = newSession(mbus, this, params); + documentTypeConfigSubscriber = DocumentTypeManagerConfigurer.configure(docTypeMgr, params.getConfigId()); + benchmarkMode = params.isBenchmarkMode(); + destination = (params.getDumpStream() != null) + ? createDumper(params) + : new MbusDestination(session, params.getRoute(), params.getTimeout(), failure, params.getStdErr()); + } + + SourceSession getSourceSession() { return session; } + private FeedReader createFeedReader(InputStream in) throws Exception { + in.mark(8); + byte [] b = new byte[2]; + int numRead = readExact(in, b); + in.reset(); + if (numRead != b.length) { + throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes."); + } + if (b[0] == '[') { + return new JsonFeedReader(in, docTypeMgr); + } else if ((b[0] == 'V') && (b[1] == '1')) { + return new VespaV1FeedReader(in, docTypeMgr); + } else { + return new VespaXMLFeedReader(in, docTypeMgr); + } + } + + + static class RetryExecutionHandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) {} + } + } + + SimpleFeeder run() throws Throwable { + ExecutorService executor = (numThreads > 1) + ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(numThreads*100), + ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), + new RetryExecutionHandler()) + : null; + printHeader(out); + long numMessagesSent = 0; + for (InputStream in : inputStreams) { + Metrics m = new Metrics(destination, createFeedReader(in), executor, failure, numMessagesToSend); + numMessagesSent += m.feed(); + } + while (failure.get() == null && numReplies.get() < numMessagesSent) { + Thread.sleep(100); + } + if (failure.get() != null) { + throw failure.get(); + } + printReport(out); + return this; + } + + void close() throws Exception { + destination.close(); + mbus.destroy(); + } + + private static Message newMessage(FeedOperation op) { + switch (op.getType()) { + case DOCUMENT: { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument())); + message.setCondition(op.getCondition()); + return message; + } + case REMOVE: { + RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove()); + message.setCondition(op.getCondition()); + return message; + } + case UPDATE: { + UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate()); + message.setCondition(op.getCondition()); + return message; + } + default: + return null; + } + } + + private static boolean containsFatalErrors(Stream errors) { + return errors.anyMatch(e -> e.getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED); + } + + @Override + public void handleReply(Reply reply) { + if (failure.get() != null) { + return; + } + if (containsFatalErrors(reply.getErrors())) { + failure.compareAndSet(null, new IOException(formatErrors(reply))); + return; + } + long now = System.currentTimeMillis(); + long latency = now - (long) reply.getContext(); + numReplies.incrementAndGet(); + accumulateReplies(now, latency); + } + private synchronized void accumulateReplies(long now, long latency) { + minLatency = Math.min(minLatency, latency); + maxLatency = Math.max(maxLatency, latency); + sumLatency += latency; + if (benchmarkMode) { return; } + if (now > nextReport) { + printReport(out); + nextReport += REPORT_INTERVAL; + } + } + private static void printHeader(PrintStream out) { + out.println("# Time used, num ok, num error, min latency, max latency, average latency"); + } + + private synchronized void printReport(PrintStream out) { + out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, + numReplies.get(), minLatency, maxLatency, sumLatency / Long.max(1, numReplies.get())); + } + + private static String formatErrors(Reply reply) { + StringBuilder out = new StringBuilder(); + out.append(reply.getMessage().toString()).append('\n'); + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + out.append(reply.getError(i).toString()).append('\n'); + } + return out.toString(); + } + + private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, FeederParams params) { + return new RPCMessageBus(new MessageBusParams().addProtocol(new DocumentProtocol(docTypeMgr)), + new RPCNetworkParams().setSlobrokConfigId(params.getConfigId()) + .setNumTargetsPerSpec(params.getNumConnectionsPerTarget()), + params.getConfigId()); + } + + private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams ) { + SourceSessionParams params = new SourceSessionParams(); + params.setReplyHandler(replyHandler); + if (feederParams.getMaxPending() > 0) { + params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending())); + } else { + DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy() + .setWindowSizeIncrement(feederParams.getWindowIncrementSize()) + .setResizeRate(feederParams.getWindowResizeRate()) + .setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor()) + .setWindowSizeBackOff(feederParams.getWindowSizeBackOff()); + params.setThrottlePolicy(throttlePolicy); + } + return mbus.getMessageBus().createSourceSession(params); + } +} -- cgit v1.2.3