aboutsummaryrefslogtreecommitdiffstats
path: root/vespa_feed_perf
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-04-29 07:51:54 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2019-04-29 07:51:54 +0200
commit1f2395a64721868956a11184e73d962aaa394f86 (patch)
tree738a0d14f8cec0548da31a49439d48f79ce58134 /vespa_feed_perf
parent7ebc3fe27ab60427ca9b0842730cd9732ada34b5 (diff)
Add support for files, not only stdin
Diffstat (limited to 'vespa_feed_perf')
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java31
-rw-r--r--vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java98
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java40
-rw-r--r--vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java3
4 files changed, 104 insertions, 68 deletions
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
index b4549fe495c..1fcd5d72a00 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/FeederParams.java
@@ -8,11 +8,14 @@ import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
/**
* @author Simon Thoresen Hult
@@ -20,7 +23,6 @@ import java.io.PrintStream;
class FeederParams {
enum DumpFormat {JSON, VESPA};
- private InputStream stdIn = System.in;
private PrintStream stdErr = System.err;
private PrintStream stdOut = System.out;
private Route route = Route.parse("default");
@@ -30,14 +32,10 @@ class FeederParams {
private boolean benchmarkMode = false;
private int numDispatchThreads = 1;
private int maxPending = 0;
+ private List<InputStream> inputStreams = new ArrayList<>();
- InputStream getStdIn() {
- return stdIn;
- }
-
- FeederParams setStdIn(InputStream stdIn) {
- this.stdIn = stdIn;
- return this;
+ FeederParams() {
+ inputStreams.add(System.in);
}
PrintStream getStdErr() {
@@ -82,11 +80,6 @@ class FeederParams {
return this;
}
- FeederParams setMaxPending(int maxPending) {
- this.maxPending = maxPending;
- return this;
- }
-
boolean isSerialTransferEnabled() {
return maxPending == 1;
}
@@ -96,6 +89,11 @@ class FeederParams {
numDispatchThreads = 1;
return this;
}
+ List<InputStream> getInputStreams() { return inputStreams; }
+ FeederParams setInputStreams(List<InputStream> inputStreams) {
+ this.inputStreams = inputStreams;
+ return this;
+ }
int getNumDispatchThreads() { return numDispatchThreads; }
int getMaxPending() { return maxPending; }
@@ -133,6 +131,13 @@ class FeederParams {
setSerialTransfer();
}
+ if ( !cmd.getArgList().isEmpty()) {
+ inputStreams.clear();
+ for (String fileName : cmd.getArgList()) {
+ inputStreams.add(new FileInputStream(new File(fileName)));
+ }
+ }
+
return this;
}
diff --git a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
index 36e5cc37ea5..70bb2c78ce7 100644
--- a/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
+++ b/vespa_feed_perf/src/main/java/com/yahoo/vespa/feed/perf/SimpleFeeder.java
@@ -44,6 +44,8 @@ import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -56,24 +58,59 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class SimpleFeeder implements ReplyHandler {
- private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10);
- private final static long HEADER_INTERVAL = REPORT_INTERVAL * 24;
+
private final DocumentTypeManager docTypeMgr = new DocumentTypeManager();
- private final InputStream in;
+ private final List<InputStream> inputStreams;
private final PrintStream out;
private final RPCMessageBus mbus;
private final SourceSession session;
+ private final int numThreads;
+ private final Destination destination;
+ private final boolean benchmarkMode;
+ private final static long REPORT_INTERVAL = TimeUnit.SECONDS.toMillis(10);
private final long startTime = System.currentTimeMillis();
private final AtomicReference<Throwable> failure = new AtomicReference<>(null);
private final AtomicLong numReplies = new AtomicLong(0);
private long maxLatency = Long.MIN_VALUE;
private long minLatency = Long.MAX_VALUE;
- private long nextHeader = startTime + HEADER_INTERVAL;
private long nextReport = startTime + REPORT_INTERVAL;
private long sumLatency = 0;
- private final int numThreads;
- private final Destination destination;
- private final boolean benchmarkMode;
+
+ static class Metrics {
+
+ private final Destination destination;
+ private final FeedReader reader;
+ private final Executor executor;
+ AtomicReference<Throwable> failure;
+
+ Metrics(Destination destination, FeedReader reader, Executor executor, AtomicReference<Throwable> failure) {
+ this.destination = destination;
+ this.reader = reader;
+ this.executor = executor;
+ this.failure = failure;
+ }
+
+ long feed() throws Throwable {
+ long numMessages = 0;
+ while (failure.get() == null) {
+ FeedOperation op = reader.read();
+ if (op.getType() == FeedOperation.Type.INVALID) {
+ break;
+ }
+ if (executor != null) {
+ executor.execute(() -> sendOperation(op));
+ } else {
+ sendOperation(op);
+ }
+ ++numMessages;
+ }
+ return numMessages;
+ }
+ private void sendOperation(FeedOperation op) {
+ destination.send(op);
+ }
+ }
+
public static void main(String[] args) throws Throwable {
new SimpleFeeder(new FeederParams().parseArgs(args)).run().close();
@@ -301,7 +338,7 @@ public class SimpleFeeder implements ReplyHandler {
return new JsonDestination(params.getDumpStream(), failure, numReplies);
}
SimpleFeeder(FeederParams params) {
- in = params.getStdIn();
+ inputStreams = params.getInputStreams();
out = params.getStdOut();
numThreads = params.getNumDispatchThreads();
mbus = newMessageBus(docTypeMgr, params.getConfigId());
@@ -313,12 +350,8 @@ public class SimpleFeeder implements ReplyHandler {
: new MbusDestination(session, params.getRoute(), failure, params.getStdErr());
}
- private void sendOperation(FeedOperation op) {
- destination.send(op);
- }
-
SourceSession getSourceSession() { return session; }
- private FeedReader createFeedReader() throws Exception {
+ private FeedReader createFeedReader(InputStream in) throws Exception {
in.mark(8);
byte [] b = new byte[2];
int numRead = readExact(in, b);
@@ -335,6 +368,8 @@ public class SimpleFeeder implements ReplyHandler {
}
}
+
+
SimpleFeeder run() throws Throwable {
ExecutorService executor = (numThreads > 1)
? new ThreadPoolExecutor(numThreads, numThreads, 0L, TimeUnit.SECONDS,
@@ -342,29 +377,19 @@ public class SimpleFeeder implements ReplyHandler {
ThreadFactoryFactory.getDaemonThreadFactory("perf-feeder"),
new ThreadPoolExecutor.CallerRunsPolicy())
: null;
- FeedReader reader = createFeedReader();
-
- printHeader();
- long numMessages = 0;
- while (failure.get() == null) {
- FeedOperation op = reader.read();
- if (op.getType() == FeedOperation.Type.INVALID) {
- break;
- }
- if (executor != null) {
- executor.execute(() -> sendOperation(op));
- } else {
- sendOperation(op);
- }
- ++numMessages;
+ printHeader(out);
+ long numMessagesSent = 0;
+ for (InputStream in : inputStreams) {
+ Metrics m = new Metrics(destination, createFeedReader(in), executor, failure);
+ numMessagesSent += m.feed();
}
- while (failure.get() == null && numReplies.get() < numMessages) {
+ while (failure.get() == null && numReplies.get() < numMessagesSent) {
Thread.sleep(100);
}
+ printReport(out);
if (failure.get() != null) {
throw failure.get();
}
- printReport();
return this;
}
@@ -414,23 +439,18 @@ public class SimpleFeeder implements ReplyHandler {
maxLatency = Math.max(maxLatency, latency);
sumLatency += latency;
if (benchmarkMode) { return; }
- if (now > nextHeader) {
- printHeader();
- nextHeader += HEADER_INTERVAL;
- }
if (now > nextReport) {
- printReport();
+ printReport(out);
nextReport += REPORT_INTERVAL;
}
}
-
- private void printHeader() {
+ private static void printHeader(PrintStream out) {
out.println("# Time used, num ok, num error, min latency, max latency, average latency");
}
- private void printReport() {
+ private void printReport(PrintStream out) {
out.format("%10d, %12d, %11d, %11d, %11d\n", System.currentTimeMillis() - startTime,
- numReplies.get(), minLatency, maxLatency, sumLatency / numReplies.get());
+ numReplies.get(), minLatency, maxLatency, sumLatency / numReplies.get());
}
private static String formatErrors(Reply reply) {
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
index d44cf41f9ab..5cc5d0bc018 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/FeederParamsTest.java
@@ -5,12 +5,12 @@ import com.yahoo.messagebus.routing.Route;
import org.apache.commons.cli.ParseException;
import org.junit.Test;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintStream;
import static org.junit.Assert.assertEquals;
@@ -24,18 +24,14 @@ import static org.junit.Assert.assertTrue;
* @author Simon Thoresen Hult
*/
public class FeederParamsTest {
- static final String TESTFILE_JSON = "test.json";
- static final String TESTFILE_VESPA = "test.vespa";
- static final String TESTFILE_UNKNOWN = "test.xyz";
+ private static final String TESTFILE_JSON = "test.json";
+ private static final String TESTFILE_VESPA = "test.vespa";
+ private static final String TESTFILE_UNKNOWN = "test.xyz";
@Test
public void requireThatAccessorsWork() {
FeederParams params = new FeederParams();
- InputStream stdIn = new ByteArrayInputStream(new byte[1]);
- params.setStdIn(stdIn);
- assertSame(stdIn, params.getStdIn());
-
PrintStream stdErr = new PrintStream(new ByteArrayOutputStream());
params.setStdErr(stdErr);
assertSame(stdErr, params.getStdErr());
@@ -55,7 +51,7 @@ public class FeederParamsTest {
@Test
public void requireThatParamsHaveReasonableDefaults() {
FeederParams params = new FeederParams();
- assertSame(System.in, params.getStdIn());
+ assertSame(System.in, params.getInputStreams().get(0));
assertSame(System.err, params.getStdErr());
assertSame(System.out, params.getStdOut());
assertEquals(Route.parse("default"), params.getRoute());
@@ -66,16 +62,14 @@ public class FeederParamsTest {
@Test
public void requireThatSerialTransferOptionIsParsed() throws ParseException, FileNotFoundException {
assertTrue(new FeederParams().parseArgs("-s").isSerialTransferEnabled());
- assertTrue(new FeederParams().parseArgs("foo", "-s").isSerialTransferEnabled());
- assertTrue(new FeederParams().parseArgs("-s", "foo").isSerialTransferEnabled());
assertTrue(new FeederParams().parseArgs("--serial").isSerialTransferEnabled());
- assertTrue(new FeederParams().parseArgs("foo", "--serial").isSerialTransferEnabled());
- assertTrue(new FeederParams().parseArgs("--serial", "foo").isSerialTransferEnabled());
+ assertEquals(1, new FeederParams().parseArgs("-s").getMaxPending());
+ assertEquals(1, new FeederParams().parseArgs("-s").getNumDispatchThreads());
}
@Test
public void requireThatArgumentsAreParsedAsRoute() throws ParseException, FileNotFoundException {
- assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r foo bar").getRoute());
+ assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("-r", "foo bar").getRoute());
assertEquals(Route.parse("foo bar"), new FeederParams().parseArgs("--route","foo bar").getRoute());
}
@@ -115,4 +109,20 @@ public class FeederParamsTest {
assertTrue(new File(TESTFILE_UNKNOWN).delete());
}
+ @Test
+ public void requireThatInputFilesAreAggregated() throws ParseException, IOException {
+ File json = new File(TESTFILE_JSON);
+ File vespa = new File(TESTFILE_VESPA);
+ new FileOutputStream(json).close();
+ new FileOutputStream(vespa).close();
+ FeederParams p = new FeederParams();
+ p.parseArgs("-n", "3", TESTFILE_JSON, TESTFILE_VESPA);
+ assertEquals(3, p.getNumDispatchThreads());
+ assertEquals(2, p.getInputStreams().size());
+ assertTrue(p.getInputStreams().get(0) instanceof FileInputStream);
+ assertTrue(p.getInputStreams().get(1) instanceof FileInputStream);
+ json.delete();
+ vespa.delete();
+ }
+
}
diff --git a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
index 2de7e831d04..8af4dd5dac9 100644
--- a/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
+++ b/vespa_feed_perf/src/test/java/com/yahoo/vespa/feed/perf/SimpleFeederTest.java
@@ -22,6 +22,7 @@ import java.io.InputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
@@ -313,7 +314,7 @@ public class SimpleFeederTest {
server = new SimpleServer(CONFIG_DIR, validator);
feeder = new SimpleFeeder(params.setConfigId("dir:" + CONFIG_DIR)
.setStdErr(new PrintStream(err))
- .setStdIn(in)
+ .setInputStreams(Arrays.asList(in))
.setStdOut(new PrintStream(out)));
}