summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBjørn Christian Seime <bjorncs@verizonmedia.com>2021-05-27 16:37:53 +0200
committerBjørn Christian Seime <bjorncs@verizonmedia.com>2021-05-27 16:37:53 +0200
commitcf82ff60e85efb24517551d0450d6647bb0aa651 (patch)
treed1b147ac5e7f751ec1361e9be82c9e29e9acb3e0
parenta6130bd49cf86df4a81aacaa068583d41723956a (diff)
Add benchmark mode to JsonStreamFeeder + CliClient wiring
-rw-r--r--vespa-feed-client-cli/pom.xml5
-rw-r--r--vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java29
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java40
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java2
4 files changed, 66 insertions, 10 deletions
diff --git a/vespa-feed-client-cli/pom.xml b/vespa-feed-client-cli/pom.xml
index 62ff5c149ec..28e1ab01dae 100644
--- a/vespa-feed-client-cli/pom.xml
+++ b/vespa-feed-client-cli/pom.xml
@@ -31,6 +31,11 @@
<artifactId>commons-cli</artifactId>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <scope>compile</scope>
+ </dependency>
<!-- test scope -->
<dependency>
diff --git a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
index d20a9bed286..4e0640bc891 100644
--- a/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
+++ b/vespa-feed-client-cli/src/main/java/ai/vespa/feed/client/CliClient.java
@@ -1,6 +1,9 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.feed.client;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.SSLSession;
import java.io.BufferedInputStream;
@@ -47,17 +50,25 @@ public class CliClient {
systemOut.println(Vespa.VERSION);
return 0;
}
- try (InputStream in = createFeedInputStream(cliArgs); JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) {
- feeder.feed(in);
+ try (InputStream in = createFeedInputStream(cliArgs);
+ JsonStreamFeeder feeder = createJsonFeeder(cliArgs)) {
+ if (cliArgs.benchmarkModeEnabled()) {
+ printBenchmarkResult(feeder.benchmark(in));
+ } else {
+ feeder.feed(in);
+ }
}
return 0;
} catch (CliArguments.CliArgumentsException | IOException e) {
boolean verbose = cliArgs != null && cliArgs.verboseSpecified();
return handleException(verbose, e);
+ } catch (Exception e) {
+ boolean verbose = cliArgs != null && cliArgs.verboseSpecified();
+ return handleException(verbose, "Unknown failure: " + e.getMessage(), e);
}
}
- private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException, IOException {
+ private static FeedClient createFeedClient(CliArguments cliArgs) throws CliArguments.CliArgumentsException {
FeedClientBuilder builder = FeedClientBuilder.create(cliArgs.endpoint());
cliArgs.connections().ifPresent(builder::setMaxConnections);
cliArgs.maxStreamsPerConnection().ifPresent(builder::setMaxConnections);
@@ -84,6 +95,18 @@ public class CliClient {
cliArgs.readFeedFromStandardInput() ? systemIn : Files.newInputStream(cliArgs.inputFile().get()));
}
+ private void printBenchmarkResult(JsonStreamFeeder.BenchmarkResult result) throws IOException {
+ JsonFactory factory = new JsonFactory();
+ try (JsonGenerator generator = factory.createGenerator(systemOut).useDefaultPrettyPrinter()) {
+ generator.writeStartObject();
+ generator.writeNumberField("feeder.runtime", result.duration.toMillis());
+ generator.writeNumberField("feeder.okcount", result.okCount);
+ generator.writeNumberField("feeder.errorcount", result.errorCount);
+ generator.writeNumberField("feeder.throughput", result.throughput);
+ generator.writeEndObject();
+ }
+ }
+
private int handleException(boolean verbose, Exception e) { return handleException(verbose, e.getMessage(), e); }
private int handleException(boolean verbose, String message, Exception exception) {
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
index 72eb96c2356..17162f19d3f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/JsonStreamFeeder.java
@@ -12,7 +12,9 @@ import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.time.Duration;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static ai.vespa.feed.client.FeedClient.OperationType.put;
@@ -67,24 +69,36 @@ public class JsonStreamFeeder implements Closeable {
* Note that {@code "id"} is an alias for the document put operation.
*/
public void feed(InputStream jsonStream) throws IOException {
- feed(jsonStream, 1 << 26);
+ feed(jsonStream, 1 << 26, false);
}
- void feed(InputStream jsonStream, int size) throws IOException {
+ BenchmarkResult benchmark(InputStream jsonStream) throws IOException {
+ return feed(jsonStream, 1 << 26, true).get();
+ }
+
+ Optional<BenchmarkResult> feed(InputStream jsonStream, int size, boolean benchmark) throws IOException {
RingBufferStream buffer = new RingBufferStream(jsonStream, size);
buffer.expect(JsonToken.START_ARRAY);
+ AtomicInteger okCount = new AtomicInteger();
+ AtomicInteger failedCount = new AtomicInteger();
+ long startTime = System.nanoTime();
CompletableFuture<Result> result;
AtomicReference<Throwable> thrown = new AtomicReference<>();
while ((result = buffer.next()) != null) {
result.whenComplete((r, t) -> {
- if (t != null)
- thrown.set(t);
- else
- ; // Aggregate stats.
+ if (t != null) {
+ failedCount.incrementAndGet();
+ if (!benchmark) thrown.set(t);
+ } else
+ okCount.incrementAndGet();
});
if (thrown.get() != null)
sneakyThrow(thrown.get());
}
+ if (!benchmark) return Optional.empty();
+ Duration duration = Duration.ofNanos(System.nanoTime() - startTime);
+ double throughPut = (double)okCount.get() / duration.toMillis() * 1000D;
+ return Optional.of(new BenchmarkResult(okCount.get(), failedCount.get(), duration, throughPut));
}
@SuppressWarnings("unchecked")
@@ -333,4 +347,18 @@ public class JsonStreamFeeder implements Closeable {
}
+ static class BenchmarkResult {
+ final int okCount;
+ final int errorCount;
+ final Duration duration;
+ final double throughput;
+
+ BenchmarkResult(int okCount, int errorCount, Duration duration, double throughput) {
+ this.okCount = okCount;
+ this.errorCount = errorCount;
+ this.duration = duration;
+ this.throughput = throughput;
+ }
+ }
+
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
index 25f64e3c98a..8ef8ae57f5e 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/JsonStreamFeederTest.java
@@ -60,7 +60,7 @@ class JsonStreamFeederTest {
public void close() throws IOException {
}
- }).build().feed(in, 1 << 7); // TODO: hangs on 1 << 6.
+ }).build().feed(in, 1 << 7, false); // TODO: hangs on 1 << 6.
assertEquals(docs + 1, ids.size());
}