summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-09 21:15:58 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-10 07:50:42 +0200
commit5be6949b581dc9c4b78c8d270a1868eae2bdda0b (patch)
treecaac02229918df118b83445dab3b578c4fed7f88
parent64324d85537dc7714150032a64ae50a5f3e0e702 (diff)
Use stats aggregated by client
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java80
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java10
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java4
3 files changed, 42 insertions, 52 deletions
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index a6119c5de4f..9d4f3525c32 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -8,6 +8,7 @@ import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.file.Files;
import java.time.Duration;
@@ -57,18 +58,12 @@ public class CliClient {
return 0;
}
try (InputStream in = createFeedInputStream(cliArgs);
- JsonFeeder feeder = createJsonFeeder(cliArgs)) {
+ FeedClient feedClient = createFeedClient(cliArgs);
+ JsonFeeder feeder = createJsonFeeder(feedClient, cliArgs)) {
+ long startNanos = System.nanoTime();
+ feeder.feedMany(in).join();
if (cliArgs.benchmarkModeEnabled()) {
- BenchmarkResultAggregator aggregator = new BenchmarkResultAggregator();
- feeder.feedMany(in, aggregator).join();
- aggregator.printBenchmarkResult();
- } else {
- JsonFeeder.ResultCallback emptyCallback = new JsonFeeder.ResultCallback() {
- @Override public void onNextResult(Result result, Throwable error) {}
- @Override public void onError(Throwable error) {}
- @Override public void onComplete() {}
- };
- feeder.feedMany(in, emptyCallback).join();
+ printBenchmarkResult(System.nanoTime() - startNanos, feedClient.stats(), systemOut);
}
}
return 0;
@@ -94,8 +89,7 @@ public class CliClient {
return builder.build();
}
- private static JsonFeeder createJsonFeeder(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
- FeedClient feedClient = createFeedClient(cliArgs);
+ private static JsonFeeder createJsonFeeder(FeedClient feedClient, CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
JsonFeeder.Builder builder = JsonFeeder.builder(feedClient);
cliArgs.timeout().ifPresent(builder::withTimeout);
cliArgs.route().ifPresent(builder::withRoute);
@@ -129,42 +123,30 @@ public class CliClient {
@Override public boolean verify(String hostname, SSLSession session) { return true; }
}
- private class BenchmarkResultAggregator implements JsonFeeder.ResultCallback {
-
- private final AtomicInteger okCount = new AtomicInteger();
- private final AtomicInteger errorCount = new AtomicInteger();
- private volatile long endNanoTime;
- private volatile long startNanoTime;
-
- void start() { this.startNanoTime = System.nanoTime(); }
-
- void printBenchmarkResult() throws IOException {
- JsonFactory factory = new JsonFactory();
- Duration duration = Duration.ofNanos(endNanoTime - startNanoTime);
- int okCount = this.okCount.get();
- int errorCount = this.errorCount.get();
- double throughput = (double) okCount / duration.toMillis() * 1000D;
- try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
- generator.writeStartObject();
- generator.writeNumberField("feeder.runtime", duration.toMillis());
- generator.writeNumberField("feeder.okcount", okCount);
- generator.writeNumberField("feeder.errorcount", errorCount);
- generator.writeNumberField("feeder.throughput", throughput);
- generator.writeEndObject();
- }
- }
-
- @Override
- public void onNextResult(Result result, Throwable error) {
- if (error != null) {
- errorCount.incrementAndGet();
- } else {
- okCount.incrementAndGet();
- }
+ static void printBenchmarkResult(long durationNanos, OperationStats stats, OutputStream systemOut) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ long okCount = stats.successes();
+ long errorCount = stats.requests() - okCount;
+ double throughput = okCount * 1e6D / Math.max(1, durationNanos);
+ try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
+ generator.writeStartObject();
+ generator.writeNumberField("feeder.runtime", durationNanos / 1_000_000);
+ generator.writeNumberField("feeder.okcount", okCount);
+ generator.writeNumberField("feeder.errorcount", errorCount);
+ generator.writeNumberField("feeder.throughput", throughput);
+ generator.writeNumberField("feeder.minlatency", stats.minLatencyMillis());
+ generator.writeNumberField("feeder.avglatency", stats.averageLatencyMillis());
+ generator.writeNumberField("feeder.maxlatency", stats.maxLatencyMillis());
+ generator.writeNumberField("feeder.bytessent", stats.bytesSent());
+ generator.writeNumberField("feeder.bytesreceived", stats.bytesReceived());
+
+ generator.writeObjectFieldStart("feeder.responsecodes");
+ for (Map.Entry<Integer, Long> entry : stats.responsesByCode().entrySet())
+ generator.writeNumberField(Integer.toString(entry.getKey()), entry.getValue());
+ generator.writeEndObject();
+
+ generator.writeEndObject();
}
-
- @Override public void onError(Throwable error) {}
-
- @Override public void onComplete() { this.endNanoTime = System.nanoTime(); }
}
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
index 2a6d2e15747..de32e7abdf5 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonFeeder.java
@@ -58,18 +58,18 @@ public class JsonFeeder implements Closeable {
* @param result Non-null if operation completed successfully
* @param error Non-null if operation failed
*/
- void onNextResult(Result result, Throwable error);
+ default void onNextResult(Result result, Throwable error) { }
/**
* Invoked if an unrecoverable error occurred during feed processing,
* after which no other {@link ResultCallback} methods are invoked.
*/
- void onError(Throwable error);
+ default void onError(Throwable error) { }
/**
* Invoked when all feed operations are either completed successfully or failed.
*/
- void onComplete();
+ default void onComplete() { }
}
public static Builder builder(FeedClient client) { return new Builder(client); }
@@ -103,6 +103,10 @@ public class JsonFeeder implements Closeable {
return feedMany(jsonStream, 1 << 26, resultCallback);
}
+ public CompletableFuture<Void> feedMany(InputStream jsonStream) {
+ return feedMany(jsonStream, new ResultCallback() { });
+ }
+
CompletableFuture<Void> feedMany(InputStream jsonStream, int size, ResultCallback resultCallback) {
RingBufferStream buffer = new RingBufferStream(jsonStream, size);
CompletableFuture<Void> overallResult = new CompletableFuture<>();
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
index 9305709d873..d36475a51fb 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/OperationStats.java
@@ -42,6 +42,10 @@ public class OperationStats {
return requests - inflight;
}
+ public long successes() {
+ return responsesByCode.getOrDefault(200, 0L);
+ }
+
public Map<Integer, Long> responsesByCode() {
return responsesByCode;
}