diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-29 21:45:44 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-29 21:45:44 +0100 |
commit | 13984de4d6ab9120920c6c2c54aecb1ea9deee08 (patch) | |
tree | 1c95e65073717d96d71b944539573c41cbd43e43 /vespaclient-java/src | |
parent | 0bc82ef441d34bce1ae79b797973393c0675184c (diff) |
Collapse the vespa_feed_perf into the other feed clients.
Diffstat (limited to 'vespaclient-java/src')
7 files changed, 1442 insertions, 0 deletions
diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java new file mode 100644 index 00000000000..0ecc4198e46 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java @@ -0,0 +1,205 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.messagebus.routing.Route; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.List; + +/** + * @author Simon Thoresen Hult + */ +class FeederParams { + + private static final int BUFFER_SIZE = 0x100000; + enum DumpFormat {JSON, VESPA} + private PrintStream stdErr = System.err; + private PrintStream stdOut = System.out; + private Route route = Route.parse("default"); + private String configId = "client"; + private OutputStream dumpStream = null; + private DumpFormat dumpFormat = DumpFormat.JSON; + private boolean benchmarkMode = false; + private int numDispatchThreads = 1; + private int maxPending = 0; + private double timeout = 180.0; + + private double windowSizeBackOff = 0.95; + private double windowDecrementFactor = 1.2; + private double windowResizeRate = 3; + private int windowIncrementSize = 20; + + private int numConnectionsPerTarget = 1; + private long numMessagesToSend = Long.MAX_VALUE; + private List<InputStream> inputStreams = new ArrayList<>(); + + FeederParams() { + inputStreams.add(System.in); + } + + PrintStream getStdErr() { + return stdErr; + } + + FeederParams setStdErr(PrintStream stdErr) { + this.stdErr = stdErr; + return this; + } + + PrintStream getStdOut() { + return stdOut; + } + + FeederParams setStdOut(PrintStream stdOut) { + this.stdOut = stdOut; + return this; + } + + Route getRoute() { + return route; + } + OutputStream getDumpStream() { return dumpStream; } + FeederParams setDumpStream(OutputStream dumpStream) { + this.dumpStream = dumpStream; + return this; + } + + DumpFormat getDumpFormat() { return dumpFormat; } + FeederParams setDumpFormat(DumpFormat dumpFormat) { + this.dumpFormat = dumpFormat; + return this; + } + + String getConfigId() { + return configId; + } + + FeederParams setConfigId(String configId) { + this.configId = configId; + return this; + } + public double getWindowSizeBackOff() { + return windowSizeBackOff; + } + + public double getWindowDecrementFactor() { + return windowDecrementFactor; + } + + public double getWindowResizeRate() { + return windowResizeRate; + } + public double getTimeout() { + return timeout; + } + + public int getWindowIncrementSize() { + return windowIncrementSize; + } + + int getNumConnectionsPerTarget() { return numConnectionsPerTarget; } + + long getNumMessagesToSend() { return numMessagesToSend; } + + boolean isSerialTransferEnabled() { + return maxPending == 1; + } + + FeederParams setSerialTransfer() { + maxPending = 1; + numDispatchThreads = 1; + return this; + } + List<InputStream> getInputStreams() { return inputStreams; } + FeederParams setInputStreams(List<InputStream> inputStreams) { + this.inputStreams = inputStreams; + return this; + } + + int getNumDispatchThreads() { return numDispatchThreads; } + int getMaxPending() { return maxPending; } + boolean isBenchmarkMode() { return benchmarkMode; } + + FeederParams parseArgs(String... args) throws ParseException, FileNotFoundException { + Options opts = new Options(); + opts.addOption("s", "serial", false, "use serial transfer mode, at most 1 pending operation and a single thread"); + opts.addOption("n", "numthreads", true, "Number of clients for sending messages. Anything, but 1 will bypass sequencing by document id."); + opts.addOption("m", "maxpending", true, "Max number of inflights messages. Default is auto."); + opts.addOption("r", "route", true, "Route for sending messages. default is 'default'...."); + opts.addOption("b", "mode", true, "Mode for benchmarking."); + opts.addOption("o", "output", true, "File to write to. Extensions gives format (.xml, .json, .vespa) json will be produced if no extension."); + opts.addOption("c", "numconnections", true, "Number of connections per host."); + opts.addOption("t", "timeout", true, "Timeout for a message in seconds. default = " + timeout); + opts.addOption("l", "nummessages", true, "Number of messages to send (all is default)."); + opts.addOption("wi", "window_incrementsize", true, "Dynamic window increment step size. default = " + windowIncrementSize); + opts.addOption("wd", "window_decrementfactor", true, "Dynamic window decrement step size factor. default = " + windowDecrementFactor); + opts.addOption("wb", "window_backoffactor", true, "Dynamic window backoff factor. default = " + windowSizeBackOff); + opts.addOption("wr", "window_resizerate", true, "Dynamic window resize rate. default = " + windowResizeRate); + + CommandLine cmd = new DefaultParser().parse(opts, args); + + if (cmd.hasOption('n')) { + numDispatchThreads = Integer.valueOf(cmd.getOptionValue('n').trim()); + } + if (cmd.hasOption('m')) { + maxPending = Integer.valueOf(cmd.getOptionValue('m').trim()); + } + if (cmd.hasOption('c')) { + numConnectionsPerTarget = Integer.valueOf(cmd.getOptionValue('c').trim()); + } + if (cmd.hasOption("wi")) { + windowIncrementSize = Integer.valueOf(cmd.getOptionValue("wi").trim()); + } + if (cmd.hasOption("wd")) { + windowDecrementFactor = Double.valueOf(cmd.getOptionValue("wd").trim()); + } + if (cmd.hasOption("wb")) { + windowSizeBackOff = Double.valueOf(cmd.getOptionValue("wb").trim()); + } + if (cmd.hasOption("wr")) { + windowResizeRate = Double.valueOf(cmd.getOptionValue("wr").trim()); + } + if (cmd.hasOption('r')) { + route = Route.parse(cmd.getOptionValue('r').trim()); + } + if (cmd.hasOption("t")) { + timeout = Double.valueOf(cmd.getOptionValue("t").trim()); + } + benchmarkMode = cmd.hasOption('b'); + if (cmd.hasOption('o')) { + String fileName = cmd.getOptionValue('o').trim(); + dumpStream = new FileOutputStream(new File(fileName)); + if (fileName.endsWith(".vespa")) { + dumpFormat = DumpFormat.VESPA; + } + } + if (cmd.hasOption('s')) { + setSerialTransfer(); + } + if (cmd.hasOption('l')) { + numMessagesToSend = Long.valueOf(cmd.getOptionValue('l').trim()); + } + + if ( !cmd.getArgList().isEmpty()) { + inputStreams.clear(); + for (String fileName : cmd.getArgList()) { + inputStreams.add(new BufferedInputStream(new FileInputStream(new File(fileName)), BUFFER_SIZE)); + } + } + + return this; + } + +} diff --git a/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java new file mode 100644 index 00000000000..c40e2c21561 --- /dev/null +++ b/vespaclient-java/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java @@ -0,0 +1,518 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.config.subscription.ConfigSubscriber; +import com.yahoo.document.Document; +import com.yahoo.document.DocumentId; +import com.yahoo.document.DocumentPut; +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.document.DocumentTypeManagerConfigurer; +import com.yahoo.document.DocumentUpdate; +import com.yahoo.document.TestAndSetCondition; +import com.yahoo.document.json.JsonFeedReader; +import com.yahoo.document.json.JsonWriter; +import com.yahoo.document.serialization.DocumentDeserializer; +import com.yahoo.document.serialization.DocumentDeserializerFactory; +import com.yahoo.document.serialization.DocumentSerializer; +import com.yahoo.document.serialization.DocumentSerializerFactory; +import com.yahoo.document.serialization.DocumentWriter; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.RemoveDocumentMessage; +import com.yahoo.documentapi.messagebus.protocol.UpdateDocumentMessage; +import com.yahoo.io.GrowableByteBuffer; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.RPCMessageBus; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.ReplyHandler; +import com.yahoo.messagebus.SourceSession; +import com.yahoo.messagebus.SourceSessionParams; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; +import com.yahoo.messagebus.routing.Route; +import com.yahoo.vespaxmlparser.ConditionalFeedOperation; +import com.yahoo.vespaxmlparser.FeedReader; +import com.yahoo.vespaxmlparser.FeedOperation; +import com.yahoo.vespaxmlparser.RemoveFeedOperation; +import com.yahoo.vespaxmlparser.VespaXMLFeedReader; +import net.jpountz.xxhash.XXHashFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Stream; + +/** + * @author Simon Thoresen Hult + */ +public class SimpleFeeder implements ReplyHandler { + + private final DocumentTypeManager docTypeMgr = new DocumentTypeManager(); + private final ConfigSubscriber documentTypeConfigSubscriber; + private final List<InputStream> inputStreams; + private final PrintStream out; + private final RPCMessageBus mbus; + private final SourceSession session; + private final int numThreads; + private final long numMessagesToSend; + private final Destination destination; + private final boolean benchmarkMode; + private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10); + private final long startTime = System.currentTimeMillis(); + private final AtomicReference<Throwable> failure = new AtomicReference<>(null); + private final AtomicLong numReplies = new AtomicLong(0); + private long maxLatency = Long.MIN_VALUE; + private long minLatency = Long.MAX_VALUE; + private long nextReport = startTime + REPORT_INTERVAL; + private long sumLatency = 0; + + static class Metrics { + + private final Destination destination; + private final FeedReader reader; + private final Executor executor; + private final long messagesToSend; + private final AtomicReference<Throwable> failure; + + Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference<Throwable> failure, long messagesToSend) { + this.destination = destination; + this.reader = reader; + this.executor = executor; + this.messagesToSend = messagesToSend; + this.failure = failure; + } + + long feed() throws Throwable { + long numMessages = 0; + while ((failure.get() == null) && (numMessages < messagesToSend)) { + FeedOperation op = reader.read(); + if (op.getType() == FeedOperation.Type.INVALID) { + break; + } + if (executor != null) { + executor.execute(() -> sendOperation(op)); + } else { + sendOperation(op); + } + ++numMessages; + } + return numMessages; + } + private void sendOperation(FeedOperation op) { + destination.send(op); + } + } + + + public static void main(String[] args) throws Throwable { + Logger.getLogger("").setLevel(Level.WARNING); + new SimpleFeeder(new FeederParams().parseArgs(args)).run().close(); + } + + private interface Destination { + void send(FeedOperation op); + void close() throws Exception; + } + + private static class MbusDestination implements Destination { + private final PrintStream err; + private final Route route; + private final SourceSession session; + private final long timeoutMS; + private final AtomicReference<Throwable> failure; + MbusDestination(SourceSession session, Route route, double timeoutS, AtomicReference<Throwable> failure, PrintStream err) { + this.route = route; + this.err = err; + this.session = session; + this.timeoutMS = (long)(timeoutS * 1000.0); + this.failure = failure; + } + public void send(FeedOperation op) { + Message msg = newMessage(op); + if (msg == null) { + err.println("ignoring operation; " + op.getType()); + return; + } + msg.setTimeRemaining(timeoutMS); + msg.setContext(System.currentTimeMillis()); + msg.setRoute(route); + try { + Error err = session.sendBlocking(msg).getError(); + if (err != null) { + failure.set(new IOException(err.toString())); + } + } catch (InterruptedException e) {} + } + public void close() { + session.destroy(); + } + } + + private static class JsonDestination implements Destination { + private final OutputStream outputStream; + private final DocumentWriter writer; + private final AtomicLong numReplies; + private final AtomicReference<Throwable> failure; + private boolean isFirst = true; + JsonDestination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) { + this.outputStream = outputStream; + writer = new JsonWriter(outputStream); + this.numReplies = numReplies; + this.failure = failure; + try { + outputStream.write('['); + outputStream.write('\n'); + } catch (IOException e) { + failure.set(e); + } + } + public void send(FeedOperation op) { + if (op.getType() == FeedOperation.Type.DOCUMENT) { + if (!isFirst) { + try { + outputStream.write(','); + outputStream.write('\n'); + } catch (IOException e) { + failure.set(e); + } + } else { + isFirst = false; + } + writer.write(op.getDocument()); + } + numReplies.incrementAndGet(); + } + public void close() throws Exception { + outputStream.write('\n'); + outputStream.write(']'); + outputStream.close(); + } + } + + static private final int NONE = 0; + static private final int DOCUMENT = 1; + static private final int UPDATE = 2; + static private final int REMOVE = 3; + private static class VespaV1Destination implements Destination { + private final OutputStream outputStream; + GrowableByteBuffer buffer = new GrowableByteBuffer(16384); + ByteBuffer header = ByteBuffer.allocate(16); + private final AtomicLong numReplies; + private final AtomicReference<Throwable> failure; + VespaV1Destination(OutputStream outputStream, AtomicReference<Throwable> failure, AtomicLong numReplies) { + this.outputStream = outputStream; + this.numReplies = numReplies; + this.failure = failure; + try { + outputStream.write('V'); + outputStream.write('1'); + } catch (IOException e) { + failure.set(e); + } + } + public void send(FeedOperation op) { + TestAndSetCondition cond = op.getCondition(); + buffer.putUtf8String(cond.getSelection()); + DocumentSerializer writer = DocumentSerializerFactory.createHead(buffer); + int type = NONE; + if (op.getType() == FeedOperation.Type.DOCUMENT) { + writer.write(op.getDocument()); + type = DOCUMENT; + } else if (op.getType() == FeedOperation.Type.UPDATE) { + writer.write(op.getDocumentUpdate()); + type = UPDATE; + } else if (op.getType() == FeedOperation.Type.REMOVE) { + writer.write(op.getRemove()); + type = REMOVE; + } + int sz = buffer.position(); + long hash = hash(buffer.array(), sz); + try { + header.putInt(sz); + header.putInt(type); + header.putLong(hash); + outputStream.write(header.array(), 0, header.position()); + outputStream.write(buffer.array(), 0, buffer.position()); + header.clear(); + buffer.clear(); + } catch (IOException e) { + failure.set(e); + } + numReplies.incrementAndGet(); + } + public void close() throws Exception { + outputStream.close(); + } + static long hash(byte [] buf, int length) { + return XXHashFactory.fastestJavaInstance().hash64().hash(buf, 0, length, 0); + } + } + + private static int readExact(InputStream in, byte [] buf) throws IOException { + return in.readNBytes(buf, 0, buf.length); + } + + static class VespaV1FeedReader implements FeedReader { + private final InputStream in; + private final DocumentTypeManager mgr; + private final byte[] prefix = new byte[16]; + VespaV1FeedReader(InputStream in, DocumentTypeManager mgr) throws IOException { + this.in = in; + this.mgr = mgr; + byte [] header = new byte[2]; + int read = readExact(in, header); + if ((read != header.length) || (header[0] != 'V') || (header[1] != '1')) { + throw new IllegalArgumentException("Invalid Header " + Arrays.toString(header)); + } + } + + static class LazyDocumentOperation extends ConditionalFeedOperation { + private final DocumentDeserializer deserializer; + LazyDocumentOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { + super(Type.DOCUMENT, condition); + this.deserializer = deserializer; + } + + @Override + public Document getDocument() { + return new Document(deserializer); + } + } + static class LazyUpdateOperation extends ConditionalFeedOperation { + private final DocumentDeserializer deserializer; + LazyUpdateOperation(DocumentDeserializer deserializer, TestAndSetCondition condition) { + super(Type.UPDATE, condition); + this.deserializer = deserializer; + } + + @Override + public DocumentUpdate getDocumentUpdate() { + return new DocumentUpdate(deserializer); + } + } + @Override + public FeedOperation read() throws Exception { + int read = readExact(in, prefix); + if (read != prefix.length) { + return FeedOperation.INVALID; + } + ByteBuffer header = ByteBuffer.wrap(prefix); + int sz = header.getInt(); + int type = header.getInt(); + long hash = header.getLong(); + byte [] blob = new byte[sz]; + read = readExact(in, blob); + if (read != blob.length) { + throw new IllegalArgumentException("Underflow, failed reading " + blob.length + "bytes. Got " + read); + } + long computedHash = VespaV1Destination.hash(blob, blob.length); + if (computedHash != hash) { + throw new IllegalArgumentException("Hash mismatch, expected " + hash + ", got " + computedHash); + } + GrowableByteBuffer buf = GrowableByteBuffer.wrap(blob); + String condition = buf.getUtf8String(); + DocumentDeserializer deser = DocumentDeserializerFactory.createHead(mgr, buf); + TestAndSetCondition testAndSetCondition = condition.isEmpty() + ? TestAndSetCondition.NOT_PRESENT_CONDITION + : new TestAndSetCondition(condition); + if (type == DOCUMENT) { + return new LazyDocumentOperation(deser, testAndSetCondition); + } else if (type == UPDATE) { + return new LazyUpdateOperation(deser, testAndSetCondition); + } else if (type == REMOVE) { + return new RemoveFeedOperation(new DocumentId(deser), testAndSetCondition); + } else { + throw new IllegalArgumentException("Unknown operation " + type); + } + } + } + + private Destination createDumper(FeederParams params) { + if (params.getDumpFormat() == FeederParams.DumpFormat.VESPA) { + return new VespaV1Destination(params.getDumpStream(), failure, numReplies); + } + return new JsonDestination(params.getDumpStream(), failure, numReplies); + } + + + @SuppressWarnings("deprecation") + SimpleFeeder(FeederParams params) { + inputStreams = params.getInputStreams(); + out = params.getStdOut(); + numThreads = params.getNumDispatchThreads(); + numMessagesToSend = params.getNumMessagesToSend(); + mbus = newMessageBus(docTypeMgr, params); + session = newSession(mbus, this, params); + documentTypeConfigSubscriber = DocumentTypeManagerConfigurer.configure(docTypeMgr, params.getConfigId()); + benchmarkMode = params.isBenchmarkMode(); + destination = (params.getDumpStream() != null) + ? createDumper(params) + : new MbusDestination(session, params.getRoute(), params.getTimeout(), failure, params.getStdErr()); + } + + SourceSession getSourceSession() { return session; } + private FeedReader createFeedReader(InputStream in) throws Exception { + in.mark(8); + byte [] b = new byte[2]; + int numRead = readExact(in, b); + in.reset(); + if (numRead != b.length) { + throw new IllegalArgumentException("Need to read " + b.length + " bytes to detect format. Got " + numRead + " bytes."); + } + if (b[0] == '[') { + return new JsonFeedReader(in, docTypeMgr); + } else if ((b[0] == 'V') && (b[1] == '1')) { + return new VespaV1FeedReader(in, docTypeMgr); + } else { + return new VespaXMLFeedReader(in, docTypeMgr); + } + } + + + static class RetryExecutionHandler implements RejectedExecutionHandler { + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + try { + executor.getQueue().put(r); + } catch (InterruptedException e) {} + } + } + + SimpleFeeder run() throws Throwable { + ExecutorService executor = (numThreads > 1) + ? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS, + new ArrayBlockingQueue<>(numThreads*100), + ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"), + new RetryExecutionHandler()) + : null; + printHeader(out); + long numMessagesSent = 0; + for (InputStream in : inputStreams) { + Metrics m = new Metrics(destination, createFeedReader(in), executor, failure, numMessagesToSend); + numMessagesSent += m.feed(); + } + while (failure.get() == null && numReplies.get() < numMessagesSent) { + Thread.sleep(100); + } + if (failure.get() != null) { + throw failure.get(); + } + printReport(out); + return this; + } + + void close() throws Exception { + destination.close(); + mbus.destroy(); + } + + private static Message newMessage(FeedOperation op) { + switch (op.getType()) { + case DOCUMENT: { + PutDocumentMessage message = new PutDocumentMessage(new DocumentPut(op.getDocument())); + message.setCondition(op.getCondition()); + return message; + } + case REMOVE: { + RemoveDocumentMessage message = new RemoveDocumentMessage(op.getRemove()); + message.setCondition(op.getCondition()); + return message; + } + case UPDATE: { + UpdateDocumentMessage message = new UpdateDocumentMessage(op.getDocumentUpdate()); + message.setCondition(op.getCondition()); + return message; + } + default: + return null; + } + } + + private static boolean containsFatalErrors(Stream<Error> errors) { + return errors.anyMatch(e -> e.getCode() != DocumentProtocol.ERROR_TEST_AND_SET_CONDITION_FAILED); + } + + @Override + public void handleReply(Reply reply) { + if (failure.get() != null) { + return; + } + if (containsFatalErrors(reply.getErrors())) { + failure.compareAndSet(null, new IOException(formatErrors(reply))); + return; + } + long now = System.currentTimeMillis(); + long latency = now - (long) reply.getContext(); + numReplies.incrementAndGet(); + accumulateReplies(now, latency); + } + private synchronized void accumulateReplies(long now, long latency) { + minLatency = Math.min(minLatency, latency); + maxLatency = Math.max(maxLatency, latency); + sumLatency += latency; + if (benchmarkMode) { return; } + if (now > nextReport) { + printReport(out); + nextReport += REPORT_INTERVAL; + } + } + private static void printHeader(PrintStream out) { + out.println("# Time used, num ok, num error, min latency, max latency, average latency"); + } + + private synchronized void printReport(PrintStream out) { + out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime, + numReplies.get(), minLatency, maxLatency, sumLatency / Long.max(1, numReplies.get())); + } + + private static String formatErrors(Reply reply) { + StringBuilder out = new StringBuilder(); + out.append(reply.getMessage().toString()).append('\n'); + for (int i = 0, len = reply.getNumErrors(); i < len; ++i) { + out.append(reply.getError(i).toString()).append('\n'); + } + return out.toString(); + } + + private static RPCMessageBus newMessageBus(DocumentTypeManager docTypeMgr, FeederParams params) { + return new RPCMessageBus(new MessageBusParams().addProtocol(new DocumentProtocol(docTypeMgr)), + new RPCNetworkParams().setSlobrokConfigId(params.getConfigId()) + .setNumTargetsPerSpec(params.getNumConnectionsPerTarget()), + params.getConfigId()); + } + + private static SourceSession newSession(RPCMessageBus mbus, ReplyHandler replyHandler, FeederParams feederParams ) { + SourceSessionParams params = new SourceSessionParams(); + params.setReplyHandler(replyHandler); + if (feederParams.getMaxPending() > 0) { + params.setThrottlePolicy(new StaticThrottlePolicy().setMaxPendingCount(feederParams.getMaxPending())); + } else { + DynamicThrottlePolicy throttlePolicy = new DynamicThrottlePolicy() + .setWindowSizeIncrement(feederParams.getWindowIncrementSize()) + .setResizeRate(feederParams.getWindowResizeRate()) + .setWindowSizeDecrementFactor(feederParams.getWindowDecrementFactor()) + .setWindowSizeBackOff(feederParams.getWindowSizeBackOff()); + params.setThrottlePolicy(throttlePolicy); + } + return mbus.getMessageBus().createSourceSession(params); + } +} diff --git a/vespaclient-java/src/main/sh/vespa-feed-perf b/vespaclient-java/src/main/sh/vespa-feed-perf new file mode 100755 index 00000000000..bdee1142a2d --- /dev/null +++ b/vespaclient-java/src/main/sh/vespa-feed-perf @@ -0,0 +1,89 @@ +#!/bin/sh +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +# BEGIN environment bootstrap section +# Do not edit between here and END as this section should stay identical in all scripts + +findpath () { + myname=${0} + mypath=${myname%/*} + myname=${myname##*/} + empty_if_start_slash=${mypath%%/*} + if [ "${empty_if_start_slash}" ]; then + mypath=$(pwd)/${mypath} + fi + if [ "$mypath" ] && [ -d "$mypath" ]; then + return + fi + mypath=$(pwd) + if [ -f "${mypath}/${myname}" ]; then + return + fi + echo "FATAL: Could not figure out the path where $myname lives from $0" + exit 1 +} + +COMMON_ENV=libexec/vespa/common-env.sh + +source_common_env () { + if [ "$VESPA_HOME" ] && [ -d "$VESPA_HOME" ]; then + export VESPA_HOME + common_env=$VESPA_HOME/$COMMON_ENV + if [ -f "$common_env" ]; then + . $common_env + return + fi + fi + return 1 +} + +findroot () { + source_common_env && return + if [ "$VESPA_HOME" ]; then + echo "FATAL: bad VESPA_HOME value '$VESPA_HOME'" + exit 1 + fi + if [ "$ROOT" ] && [ -d "$ROOT" ]; then + VESPA_HOME="$ROOT" + source_common_env && return + fi + findpath + while [ "$mypath" ]; do + VESPA_HOME=${mypath} + source_common_env && return + mypath=${mypath%/*} + done + echo "FATAL: missing VESPA_HOME environment variable" + echo "Could not locate $COMMON_ENV anywhere" + exit 1 +} + +findhost () { + if [ "${VESPA_HOSTNAME}" = "" ]; then + VESPA_HOSTNAME=$(vespa-detect-hostname || hostname -f || hostname || echo "localhost") || exit 1 + fi + validate="${VESPA_HOME}/bin/vespa-validate-hostname" + if [ -f "$validate" ]; then + "$validate" "${VESPA_HOSTNAME}" || exit 1 + fi + export VESPA_HOSTNAME +} + +findroot +findhost + +ROOT=${VESPA_HOME%/} +export ROOT + +# END environment bootstrap section + +export VESPA_LOG_TARGET=file:/dev/null +export MALLOC_ARENA_MAX=1 # Does not need fast allocation +java \ +-server -enableassertions \ +-XX:ThreadStackSize=512 \ +-XX:MaxJavaStackTraceDepth=1000000 \ +-Djava.library.path=${VESPA_HOME}/libexec64/native:${VESPA_HOME}/lib64 \ +-XX:MaxDirectMemorySize=64m -Djava.awt.headless=true \ +-Xms128m -Xmx1024m $(getJavaOptionsIPV46) \ +-cp ${VESPA_HOME}/lib/jars/vespaclient-java-jar-with-dependencies.jar com.yahoo.vespa.feed.perf.SimpleFeeder "$@" diff --git a/vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java new file mode 100644 index 00000000000..fbb38083aad --- /dev/null +++ b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/FeederParamsTest.java @@ -0,0 +1,176 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.messagebus.routing.Route; +import org.apache.commons.cli.ParseException; +import org.junit.Test; + +import java.io.BufferedInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +/** + * @author Simon Thoresen Hult + */ +public class FeederParamsTest { + private static final String TESTFILE_JSON = "test.json"; + private static final String TESTFILE_VESPA = "test.vespa"; + private static final String TESTFILE_UNKNOWN = "test.xyz"; + private static final double EPSILON = 0.000000000001; + + + @Test + public void requireThatAccessorsWork() { + FeederParams params = new FeederParams(); + + PrintStream stdErr = new PrintStream(new ByteArrayOutputStream()); + params.setStdErr(stdErr); + assertSame(stdErr, params.getStdErr()); + + PrintStream stdOut = new PrintStream(new ByteArrayOutputStream()); + params.setStdOut(stdOut); + assertSame(stdOut, params.getStdOut()); + + params.setConfigId("my_config_id"); + assertEquals("my_config_id", params.getConfigId()); + + assertFalse(params.isSerialTransferEnabled()); + params.setSerialTransfer(); + assertTrue(params.isSerialTransferEnabled()); + } + + @Test + public void requireThatParamsHaveReasonableDefaults() { + FeederParams params = new FeederParams(); + assertSame(System.in, params.getInputStreams().get(0)); + assertSame(System.err, params.getStdErr()); + assertSame(System.out, params.getStdOut()); + assertEquals(Route.parse("default"), params.getRoute()); + assertEquals("client", params.getConfigId()); + assertFalse(params.isSerialTransferEnabled()); + } + + @Test + public void requireThatSerialTransferOptionIsParsed() throws ParseException, FileNotFoundException { + assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled()); + assertTrue(new FeederParams().parseArgs("--serial").isSerialTransferEnabled()); + assertEquals(1, new FeederParams().parseArgs("-s").getMaxPending()); + assertEquals(1, new FeederParams().parseArgs("-s").getNumDispatchThreads()); + } + + @Test + public void requireThatArgumentsAreParsedAsRoute() throws ParseException, FileNotFoundException { + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r", "foo bar").getRoute()); + assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("--route","foo bar").getRoute()); + } + + @Test + public void requireThatRouteIsAnOptionalArgument() throws ParseException, FileNotFoundException { + assertEquals(Route.parse("default"), new FeederParams().parseArgs().getRoute()); + assertEquals(Route.parse("default"), new FeederParams().parseArgs("-s").getRoute()); + } + + @Test + public void requireThatNumThreadsAreParsed() throws ParseException, FileNotFoundException { + assertEquals(1, new FeederParams().getNumDispatchThreads()); + assertEquals(17, new FeederParams().parseArgs("-n 17").getNumDispatchThreads()); + assertEquals(17, new FeederParams().parseArgs("--numthreads", "17").getNumDispatchThreads()); + } + @Test + public void requireThatNumConnectionsAreParsed() throws ParseException, FileNotFoundException { + assertEquals(1, new FeederParams().getNumConnectionsPerTarget()); + assertEquals(16, new FeederParams().parseArgs("-c 16").getNumConnectionsPerTarget()); + assertEquals(17, new FeederParams().parseArgs("--numconnections", "17").getNumConnectionsPerTarget()); + } + + @Test + public void requireThatTimeoutIsParsed() throws ParseException, FileNotFoundException { + assertEquals(180.0, new FeederParams().getTimeout(), EPSILON); + assertEquals(16.7, new FeederParams().parseArgs("-t 16.7").getTimeout(), EPSILON); + assertEquals(1700.9, new FeederParams().parseArgs("--timeout", "1700.9").getTimeout(), EPSILON); + } + + @Test + public void requireThatNumMessagesToSendAreParsed() throws ParseException, FileNotFoundException { + assertEquals(Long.MAX_VALUE, new FeederParams().getNumMessagesToSend()); + assertEquals(18, new FeederParams().parseArgs("-l 18").getNumMessagesToSend()); + assertEquals(19, new FeederParams().parseArgs("--nummessages", "19").getNumMessagesToSend()); + } + + @Test + public void requireThatWindowSizeIncrementIsParsed() throws ParseException, FileNotFoundException { + assertEquals(20, new FeederParams().getWindowIncrementSize()); + assertEquals(17, new FeederParams().parseArgs("--window_incrementsize", "17").getWindowIncrementSize()); + } + + @Test + public void requireThatWindowSizeDecrementFactorIsParsed() throws ParseException, FileNotFoundException { + assertEquals(1.2, new FeederParams().getWindowDecrementFactor(), EPSILON); + assertEquals(1.3, new FeederParams().parseArgs("--window_decrementfactor", "1.3").getWindowDecrementFactor(), EPSILON); + } + + @Test + public void requireThatWindowResizeRateIsParsed() throws ParseException, FileNotFoundException { + assertEquals(3.0, new FeederParams().getWindowResizeRate(), EPSILON); + assertEquals(5.5, new FeederParams().parseArgs("--window_resizerate", "5.5").getWindowResizeRate(), EPSILON); + } + + @Test + public void requireThatWindowBackOffIsParsed() throws ParseException, FileNotFoundException { + assertEquals(0.95, new FeederParams().getWindowSizeBackOff(), EPSILON); + assertEquals(0.97, new FeederParams().parseArgs("--window_backoff", "0.97").getWindowSizeBackOff(), EPSILON); + } + + @Test + public void requireThatDumpStreamAreParsed() throws ParseException, IOException { + assertNull(new FeederParams().getDumpStream()); + + FeederParams p = new FeederParams().parseArgs("-o " + TESTFILE_JSON); + assertNotNull(p.getDumpStream()); + assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat()); + p.getDumpStream().close(); + + p = new FeederParams().parseArgs("-o " + TESTFILE_VESPA); + assertNotNull(p.getDumpStream()); + assertEquals(FeederParams.DumpFormat.VESPA, p.getDumpFormat()); + p.getDumpStream().close(); + + p = new FeederParams().parseArgs("-o " + TESTFILE_UNKNOWN); + assertNotNull(p.getDumpStream()); + assertEquals(FeederParams.DumpFormat.JSON, p.getDumpFormat()); + p.getDumpStream().close(); + + assertTrue(new File(TESTFILE_JSON).delete()); + assertTrue(new File(TESTFILE_VESPA).delete()); + assertTrue(new File(TESTFILE_UNKNOWN).delete()); + } + + @Test + public void requireThatInputFilesAreAggregated() throws ParseException, IOException { + File json = new File(TESTFILE_JSON); + File vespa = new File(TESTFILE_VESPA); + new FileOutputStream(json).close(); + new FileOutputStream(vespa).close(); + FeederParams p = new FeederParams(); + p.parseArgs("-n", "3", TESTFILE_JSON, TESTFILE_VESPA); + assertEquals(3, p.getNumDispatchThreads()); + assertEquals(2, p.getInputStreams().size()); + assertTrue(p.getInputStreams().get(0) instanceof BufferedInputStream); + assertTrue(p.getInputStreams().get(1) instanceof BufferedInputStream); + json.delete(); + vespa.delete(); + } + +} diff --git a/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java new file mode 100644 index 00000000000..5380796086a --- /dev/null +++ b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleFeederTest.java @@ -0,0 +1,332 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.document.serialization.DeserializationException; +import com.yahoo.documentapi.messagebus.protocol.DocumentMessage; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.messagebus.DynamicThrottlePolicy; +import com.yahoo.messagebus.EmptyReply; +import com.yahoo.messagebus.Error; +import com.yahoo.messagebus.ErrorCode; +import com.yahoo.messagebus.Message; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.Reply; +import com.yahoo.messagebus.StaticThrottlePolicy; +import com.yahoo.messagebus.ThrottlePolicy; +import org.junit.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * @author Simon Thoresen Hult + */ +public class SimpleFeederTest { + + private static final String CONFIG_DIR = "target/test-classes/"; + + @Test + public void requireThatXMLFeederWorks() throws Throwable { + assertFeed("<vespafeed>" + + " <document documenttype='simple' documentid='id:scheme:simple::0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <update documenttype='simple' documentid='id:scheme:simple::1'>" + + " <assign field='my_str'>bar</assign>" + + " </update>" + + " <remove documenttype='simple' documentid='id:scheme:simple::2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + } + + @Test + public void requireThatXML2JsonFeederWorks() throws Throwable { + ByteArrayOutputStream dump = new ByteArrayOutputStream(); + assertFeed(new FeederParams().setDumpStream(dump), + "<vespafeed>" + + " <document documenttype='simple' documentid='id:simple:simple::0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <update documenttype='simple' documentid='id:simple:simple::1'>" + + " <assign field='my_str'>bar</assign>" + + " </update>" + + " <remove documenttype='simple' documentid='id:simple:simple::2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + assertEquals(58, dump.size()); + assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}}\n]", dump.toString()); + } + + @Test + public void requireThatDualPutXML2JsonFeederWorks() throws Throwable { + ByteArrayOutputStream dump = new ByteArrayOutputStream(); + assertFeed(new FeederParams().setDumpStream(dump), + "<vespafeed>" + + " <document documenttype='simple' documentid='id:simple:simple::0'>" + + " <my_str>foo</my_str>" + + " </document>" + + " <document documenttype='simple' documentid='id:simple:simple::1'>" + + " <my_str>bar</my_str>" + + " </document>" + + " <remove documenttype='simple' documentid='id:simple:simple::2'/>" + + "</vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + assertEquals(115, dump.size()); + assertEquals("[\n{\"id\":\"id:simple:simple::0\",\"fields\":{\"my_str\":\"foo\"}},\n {\"id\":\"id:simple:simple::1\",\"fields\":{\"my_str\":\"bar\"}}\n]", dump.toString()); + assertFeed(dump.toString(), + new MessageHandler() { + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*2,.+\n"); + } + + @Test + public void requireThatJson2VespaFeederWorks() throws Throwable { + ByteArrayOutputStream dump = new ByteArrayOutputStream(); + assertFeed(new FeederParams().setDumpStream(dump).setDumpFormat(FeederParams.DumpFormat.VESPA), + "[" + + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + + " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," + + " { \"remove\": \"id:simple:simple::2\", \"condition\":\"true\"}" + + "]", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + assertEquals(187, dump.size()); + assertFeed(new ByteArrayInputStream(dump.toByteArray()), + new MessageHandler() { + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + } + + @Test + public void requireThatJsonFeederWorks() throws Throwable { + assertFeed("[" + + " { \"put\": \"id:simple:simple::0\", \"fields\": { \"my_str\":\"foo\"}}," + + " { \"update\": \"id:simple:simple::1\", \"fields\": { \"my_str\": { \"assign\":\"bar\"}}}," + + " { \"remove\": \"id:simple:simple::2\"}" + + "]", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = ((DocumentMessage)msg).createReply(); + reply.swapState(msg); + reply.popHandler().handleReply(reply); + } + }, + "", + "(.+\n)+" + + "\\s*\\d+,\\s*3,.+\n"); + } + + @Test + public void requireThatParseFailuresThrowInMainThread() throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), + "<vespafeed>" + + " <document documenttype='unknown' documentid='id:scheme:simple::0'/>" + + "</vespafeed>", + null); + try { + driver.run(); + fail(); + } catch (DeserializationException e) { + assertEquals("Field 'id:scheme:simple::0': Must specify an existing document type, not 'unknown' (at line 1, column 83)", + e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatSyncFailuresThrowInMainThread() throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), + "<vespafeed>" + + " <document documenttype='simple' documentid='id:scheme:simple::0'/>" + + "</vespafeed>", + null); + driver.feeder.getSourceSession().close(); + try { + driver.run(); + fail(); + } catch (IOException e) { + assertEquals("[SEND_QUEUE_CLOSED @ localhost]: Source session is closed.", e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatAsyncFailuresThrowInMainThread() throws Throwable { + TestDriver driver = new TestDriver(new FeederParams(), + "<vespafeed><document documenttype='simple' documentid='id:scheme:simple::0'/></vespafeed>", + new MessageHandler() { + + @Override + public void handleMessage(Message msg) { + Reply reply = new EmptyReply(); + reply.swapState(msg); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 6, "foo")); + reply.addError(new Error(ErrorCode.APP_FATAL_ERROR + 9, "bar")); + reply.popHandler().handleReply(reply); + } + }); + try { + driver.run(); + fail(); + } catch (IOException e) { + assertMatches("com.yahoo.documentapi.messagebus.protocol.PutDocumentMessage@.+\n" + + "\\[UNKNOWN\\(250006\\) @ .+\\]: foo\n" + + "\\[UNKNOWN\\(250009\\) @ .+\\]: bar\n", + e.getMessage()); + } + assertTrue(driver.close()); + } + + @Test + public void requireThatDynamicThrottlingIsDefault() throws Exception { + TestDriver driver = new TestDriver(new FeederParams(), "", null); + assertEquals(DynamicThrottlePolicy.class, getThrottlePolicy(driver).getClass()); + assertTrue(driver.close()); + } + + @Test + public void requireThatSerialTransferModeConfiguresStaticThrottling() throws Exception { + TestDriver driver = new TestDriver(new FeederParams().setSerialTransfer(), "", null); + assertEquals(StaticThrottlePolicy.class, getThrottlePolicy(driver).getClass()); + assertTrue(driver.close()); + } + + private static ThrottlePolicy getThrottlePolicy(TestDriver driver) { + return (ThrottlePolicy)getField(driver.feeder.getSourceSession(), "throttlePolicy"); + } + + private static Object getField(Object obj, String fieldName) { + try { + Field field = obj.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + return field.get(obj); + } catch (IllegalAccessException | NoSuchFieldException e) { + throw new AssertionError(e); + } + } + + private static void assertFeed(String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); + } + private static void assertFeed(InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + assertFeed(new FeederParams(), in, validator, expectedErr, expectedOut); + } + private static void assertFeed(FeederParams params, String in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + assertFeed(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator, expectedErr, expectedOut); + } + private static void assertFeed(FeederParams params, InputStream in, MessageHandler validator, String expectedErr, String expectedOut) throws Throwable { + TestDriver driver = new TestDriver(params, in, validator); + driver.run(); + assertMatches(expectedErr, new String(driver.err.toByteArray(), StandardCharsets.UTF_8)); + assertMatches(expectedOut, new String(driver.out.toByteArray(), StandardCharsets.UTF_8)); + assertTrue(driver.close()); + } + + private static void assertMatches(String expected, String actual) { + if (!Pattern.matches(expected, actual)) { + assertEquals(expected, actual); + } + } + + private static class TestDriver { + + final ByteArrayOutputStream err = new ByteArrayOutputStream(); + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final SimpleFeeder feeder; + final SimpleServer server; + + TestDriver(FeederParams params, String in, MessageHandler validator) throws IOException, ListenFailedException { + this(params, new ByteArrayInputStream(in.getBytes(StandardCharsets.UTF_8)), validator); + } + TestDriver(FeederParams params, InputStream in, MessageHandler validator) throws IOException, ListenFailedException { + server = new SimpleServer(CONFIG_DIR, validator); + feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR) + .setStdErr(new PrintStream(err)) + .setInputStreams(Arrays.asList(in)) + .setStdOut(new PrintStream(out))); + } + + void run() throws Throwable { + feeder.run(); + } + + boolean close() throws Exception { + feeder.close(); + server.close(); + return true; + } + } + +} diff --git a/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java new file mode 100644 index 00000000000..a458a59f997 --- /dev/null +++ b/vespaclient-java/src/test/java/com/yahoo/vespa/perf/SimpleServer.java @@ -0,0 +1,63 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.feed.perf; + +import com.yahoo.document.DocumentTypeManager; +import com.yahoo.documentapi.messagebus.protocol.DocumentProtocol; +import com.yahoo.jrt.ListenFailedException; +import com.yahoo.jrt.slobrok.server.Slobrok; +import com.yahoo.messagebus.DestinationSession; +import com.yahoo.messagebus.DestinationSessionParams; +import com.yahoo.messagebus.MessageBus; +import com.yahoo.messagebus.MessageBusParams; +import com.yahoo.messagebus.MessageHandler; +import com.yahoo.messagebus.network.Identity; +import com.yahoo.messagebus.network.rpc.RPCNetwork; +import com.yahoo.messagebus.network.rpc.RPCNetworkParams; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; + +/** + * @author Simon Thoresen Hult + */ +public class SimpleServer { + + private final DocumentTypeManager documentMgr; + private final Slobrok slobrok; + private final MessageBus mbus; + private final DestinationSession session; + + @SuppressWarnings("deprecation") + public SimpleServer(String configDir, MessageHandler msgHandler) throws IOException, ListenFailedException { + slobrok = new Slobrok(); + documentMgr = DocumentTypeManager.fromFile(configDir + "/documentmanager.cfg"); + mbus = new MessageBus(new RPCNetwork(new RPCNetworkParams() + .setSlobrokConfigId(slobrok.configId()) + .setIdentity(new Identity("server"))), + new MessageBusParams().addProtocol(new DocumentProtocol(documentMgr))); + session = mbus.createDestinationSession(new DestinationSessionParams().setMessageHandler(msgHandler)); + + PrintWriter writer = new PrintWriter(new FileWriter(configDir + "/messagebus.cfg")); + writer.println("routingtable[1]\n" + + "routingtable[0].protocol \"document\"\n" + + "routingtable[0].hop[0]\n" + + "routingtable[0].route[1]\n" + + "routingtable[0].route[0].name \"default\"\n" + + "routingtable[0].route[0].hop[1]\n" + + "routingtable[0].route[0].hop[0] \"" + session.getConnectionSpec() + "\""); + writer.close(); + + writer = new PrintWriter(new FileWriter(configDir + "/slobroks.cfg")); + writer.println(slobrok.configId().substring(4)); + writer.close(); + } + + @SuppressWarnings("deprecation") + public final void close() { + session.destroy(); + mbus.destroy(); + slobrok.stop(); + } + +} diff --git a/vespaclient-java/src/test/resources/documentmanager.cfg b/vespaclient-java/src/test/resources/documentmanager.cfg new file mode 100644 index 00000000000..2bd7a3c6080 --- /dev/null +++ b/vespaclient-java/src/test/resources/documentmanager.cfg @@ -0,0 +1,59 @@ +doctype[2] +doctype[0].name "document" +doctype[0].idx 10000 +doctype[0].contentstruct 10001 +doctype[0].primitivetype[0].idx 10002 +doctype[0].primitivetype[0].internalid 0 +doctype[0].primitivetype[0].name "int" +doctype[0].primitivetype[1].idx 10003 +doctype[0].primitivetype[1].internalid 5 +doctype[0].primitivetype[1].name "double" +doctype[0].primitivetype[2].idx 10004 +doctype[0].primitivetype[2].internalid 2 +doctype[0].primitivetype[2].name "string" +doctype[0].annotationtype[0].idx 10005 +doctype[0].annotationtype[0].name "proximity_break" +doctype[0].annotationtype[0].internalid 8 +doctype[0].annotationtype[0].datatype 10003 +doctype[0].annotationtype[1].idx 10006 +doctype[0].annotationtype[1].name "normalized" +doctype[0].annotationtype[1].internalid 4 +doctype[0].annotationtype[1].datatype 10004 +doctype[0].annotationtype[2].idx 10007 +doctype[0].annotationtype[2].name "reading" +doctype[0].annotationtype[2].internalid 5 +doctype[0].annotationtype[2].datatype 10004 +doctype[0].annotationtype[3].idx 10008 +doctype[0].annotationtype[3].name "term" +doctype[0].annotationtype[3].internalid 1 +doctype[0].annotationtype[3].datatype 10004 +doctype[0].annotationtype[4].idx 10009 +doctype[0].annotationtype[4].name "transformed" +doctype[0].annotationtype[4].internalid 7 +doctype[0].annotationtype[4].datatype 10004 +doctype[0].annotationtype[5].idx 10010 +doctype[0].annotationtype[5].name "canonical" +doctype[0].annotationtype[5].internalid 3 +doctype[0].annotationtype[5].datatype 10004 +doctype[0].annotationtype[6].idx 10011 +doctype[0].annotationtype[6].name "token_type" +doctype[0].annotationtype[6].internalid 2 +doctype[0].annotationtype[6].datatype 10002 +doctype[0].annotationtype[7].idx 10012 +doctype[0].annotationtype[7].name "special_token" +doctype[0].annotationtype[7].internalid 9 +doctype[0].annotationtype[8].idx 10013 +doctype[0].annotationtype[8].name "stem" +doctype[0].annotationtype[8].internalid 6 +doctype[0].annotationtype[8].datatype 10004 +doctype[0].structtype[0].idx 10001 +doctype[0].structtype[0].name document.header +doctype[1].name "simple" +doctype[1].idx 10014 +doctype[1].inherits[0].idx 10000 +doctype[1].contentstruct 10015 +doctype[1].structtype[0].idx 10015 +doctype[1].structtype[0].name simple.header +doctype[1].structtype[0].field[0].name "my_str" +doctype[1].structtype[0].field[0].internalid 1874040549 +doctype[1].structtype[0].field[0].type 10004 |