summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client-cli
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-09 14:04:51 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-06-09 14:04:51 +0200
commitdaa0ee6329d4a87c1824b01113198c22e566e157 (patch)
tree10cf6b9abc32918e13db61023812a8c4a49c25f5 /vespa-feed-client-cli
parentc3eb4cce689e5657d66761d820a9f3dfc41d4a6a (diff)
Update CliClient to use new JsonFeeder inteface
Diffstat (limited to 'vespa-feed-client-cli')
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java64
1 files changed, 50 insertions, 14 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index 40c6ac56022..a6119c5de4f 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -10,9 +10,11 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.nio.file.Files;
+import java.time.Duration;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Main method for CLI interface
@@ -57,9 +59,16 @@ public class CliClient {
try (InputStream in = createFeedInputStream(cliArgs);
JsonFeeder feeder = createJsonFeeder(cliArgs)) {
if (cliArgs.benchmarkModeEnabled()) {
- printBenchmarkResult(feeder.benchmark(in));
+ BenchmarkResultAggregator aggregator = new BenchmarkResultAggregator();
+ feeder.feedMany(in, aggregator).join();
+ aggregator.printBenchmarkResult();
} else {
- feeder.feedMany(in);
+ JsonFeeder.ResultCallback emptyCallback = new JsonFeeder.ResultCallback() {
+ @Override public void onNextResult(Result result, Throwable error) {}
+ @Override public void onError(Throwable error) {}
+ @Override public void onComplete() {}
+ };
+ feeder.feedMany(in, emptyCallback).join();
}
}
return 0;
@@ -98,18 +107,6 @@ public class CliClient {
return cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get());
}
- private void printBenchmarkResult(JsonFeeder.BenchmarkResult result) throws IOException {
- JsonFactory factory = new JsonFactory();
- try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
- generator.writeStartObject();
- generator.writeNumberField("feeder.runtime", result.duration.toMillis());
- generator.writeNumberField("feeder.okcount", result.okCount);
- generator.writeNumberField("feeder.errorcount", result.errorCount);
- generator.writeNumberField("feeder.throughput", result.throughput);
- generator.writeEndObject();
- }
- }
-
private int handleException(boolean verbose, Exception e) { return handleException(verbose, e.getMessage(), e); }
private int handleException(boolean verbose, String message, Exception exception) {
@@ -131,4 +128,43 @@ public class CliClient {
static final AcceptAllHostnameVerifier INSTANCE = new AcceptAllHostnameVerifier();
@Override public boolean verify(String hostname, SSLSession session) { return true; }
}
+
+ private class BenchmarkResultAggregator implements JsonFeeder.ResultCallback {
+
+ private final AtomicInteger okCount = new AtomicInteger();
+ private final AtomicInteger errorCount = new AtomicInteger();
+ private volatile long endNanoTime;
+ private volatile long startNanoTime;
+
+ void start() { this.startNanoTime = System.nanoTime(); }
+
+ void printBenchmarkResult() throws IOException {
+ JsonFactory factory = new JsonFactory();
+ Duration duration = Duration.ofNanos(endNanoTime - startNanoTime);
+ int okCount = this.okCount.get();
+ int errorCount = this.errorCount.get();
+ double throughput = (double) okCount / duration.toMillis() * 1000D;
+ try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
+ generator.writeStartObject();
+ generator.writeNumberField("feeder.runtime", duration.toMillis());
+ generator.writeNumberField("feeder.okcount", okCount);
+ generator.writeNumberField("feeder.errorcount", errorCount);
+ generator.writeNumberField("feeder.throughput", throughput);
+ generator.writeEndObject();
+ }
+ }
+
+ @Override
+ public void onNextResult(Result result, Throwable error) {
+ if (error != null) {
+ errorCount.incrementAndGet();
+ } else {
+ okCount.incrementAndGet();
+ }
+ }
+
+ @Override public void onError(Throwable error) {}
+
+ @Override public void onComplete() { this.endNanoTime = System.nanoTime(); }
+ }
}