aboutsummaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-09 14:03:41 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-09 14:07:43 +0200
commit46c91ff61b279ef89cf41a9b28f921c42c9de464 (patch)
tree6341635bf64fc2e8333d037fda069ee8b81e6a14 /vespa-feed-client
parentc1df455e40f18e7a6e8814fb40d725bab07eb601 (diff)
Add BenchmarkingCluster to the mix
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java105
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java89
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java62
3 files changed, 255 insertions, 1 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
new file mode 100644
index 00000000000..a7d4e79356c
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java
@@ -0,0 +1,105 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.util.Objects.requireNonNull;
+
+public class BenchmarkingCluster implements Cluster {
+
+ private final Cluster delegate;
+ private final ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> {
+ Thread thread = new Thread(runnable, "cluster-stats-collector");
+ thread.setDaemon(true);
+ return thread;
+ });
+
+ private final AtomicLong requests = new AtomicLong();
+ private long results = 0;
+ private long responses = 0;
+ private final long[] responsesByCode = new long[600];
+ private long exceptions = 0;
+ private long totalLatencyMillis = 0;
+ private long minLatencyMillis = 0;
+ private long maxLatencyMillis = 0;
+ private long bytesSent = 0;
+ private long bytesReceived = 0;
+
+ public BenchmarkingCluster(Cluster delegate) {
+ this.delegate = requireNonNull(delegate);
+ }
+
+ @Override
+ public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) {
+ requests.incrementAndGet();
+ long startMillis = System.currentTimeMillis();
+ delegate.dispatch(request, vessel);
+ vessel.whenCompleteAsync((response, thrown) -> {
+ results++;
+ if (thrown == null) {
+ responses++;
+ responsesByCode[response.getCode()]++;
+ long latency = System.currentTimeMillis() - startMillis;
+ totalLatencyMillis += latency;
+ minLatencyMillis = Math.min(minLatencyMillis, latency);
+ maxLatencyMillis = Math.max(maxLatencyMillis, latency);
+ bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length;
+ bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length;
+ }
+ else
+ exceptions++;
+ },
+ executor);
+ }
+
+ @Override
+ public Stats stats() {
+ try {
+ try {
+ return executor.submit(this::getStats).get();
+ }
+ catch (RejectedExecutionException ignored) {
+ executor.awaitTermination(10, TimeUnit.SECONDS);
+ return getStats();
+ }
+ }
+ catch (InterruptedException | ExecutionException ignored) {
+ throw new RuntimeException(ignored);
+ }
+ }
+
+ private Stats getStats() {
+ Map<Integer, Long> responses = new HashMap<>();
+ for (int code = 0; code < responsesByCode.length; code++)
+ if (responsesByCode[code] > 0)
+ responses.put(code, responsesByCode[code]);
+
+ return new Stats(requests.get(),
+ responses,
+ exceptions,
+ requests.get() - results,
+ totalLatencyMillis / this.responses,
+ minLatencyMillis,
+ maxLatencyMillis,
+ bytesSent,
+ bytesReceived);
+ }
+
+ @Override
+ public void close() {
+ delegate.close();
+ executor.shutdown();
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
index fde230d3ca4..9ffccd740e8 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java
@@ -5,10 +5,12 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
import java.io.Closeable;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
- * Allows dispatch to a Vespa cluster. {@link #dispatch} should be called by a single thread, i.e., it is not thread-safe.
+ * Allows dispatch to a Vespa cluster.
*/
interface Cluster extends Closeable {
@@ -18,4 +20,89 @@ interface Cluster extends Closeable {
@Override
default void close() { }
+ default Stats stats() { return new Stats(0, Collections.emptyMap(), 0, 0, 0, 0, 0, 0, 0); }
+
+ class Stats {
+
+ private final long requests;
+ private final Map<Integer, Long> responsesByCode;
+ private final long inflight;
+ private final long exceptions;
+ private final long averageLatencyMillis;
+ private final long minLatencyMillis;
+ private final long maxLatencyMillis;
+ private final long bytesSent;
+ private final long bytesReceived;
+
+ public Stats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight,
+ long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis,
+ long bytesSent, long bytesReceived) {
+ this.requests = requests;
+ this.responsesByCode = responsesByCode;
+ this.exceptions = exceptions;
+ this.inflight = inflight;
+ this.averageLatencyMillis = averageLatencyMillis;
+ this.minLatencyMillis = minLatencyMillis;
+ this.maxLatencyMillis = maxLatencyMillis;
+ this.bytesSent = bytesSent;
+ this.bytesReceived = bytesReceived;
+ }
+
+ public long requests() {
+ return requests;
+ }
+
+ public long responses() {
+ return requests - inflight;
+ }
+
+ public Map<Integer, Long> responsesByCode() {
+ return responsesByCode;
+ }
+
+ public long exceptions() {
+ return exceptions;
+ }
+
+ public long inflight() {
+ return inflight;
+ }
+
+ public long averageLatencyMillis() {
+ return averageLatencyMillis;
+ }
+
+ public long minLatencyMillis() {
+ return minLatencyMillis;
+ }
+
+ public long maxLatencyMillis() {
+ return maxLatencyMillis;
+ }
+
+ public long bytesSent() {
+ return bytesSent;
+ }
+
+ public long bytesReceived() {
+ return bytesReceived;
+ }
+
+ @Override
+ public String toString() {
+ return "Stats{" +
+ "requests=" + requests +
+ ", responsesByCode=" + responsesByCode +
+ ", exceptions=" + exceptions +
+ ", inflight=" + inflight +
+ ", averageLatencyMillis=" + averageLatencyMillis +
+ ", minLatencyMillis=" + minLatencyMillis +
+ ", maxLatencyMillis=" + maxLatencyMillis +
+ ", bytesSent=" + bytesSent +
+ ", bytesReceived=" + bytesReceived +
+ '}';
+ }
+
+ }
+
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
new file mode 100644
index 00000000000..cee292a41e0
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -0,0 +1,62 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
+import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
+import org.apache.hc.core5.http.ContentType;
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+class HttpRequestStrategyTest {
+
+ @Test
+ void testConcurrency() {
+ int documents = 1 << 16;
+ SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/");
+ SimpleHttpResponse response = new SimpleHttpResponse(200);
+ response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON);
+ ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+ Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS));
+
+ HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
+ .setConnectionsPerEndpoint(1 << 12)
+ .setMaxStreamPerConnection(1 << 4),
+ cluster);
+ long startNanos = System.nanoTime();
+ for (int i = 0; i < documents; i++)
+ strategy.enqueue(DocumentId.of("ns", "type", Integer.toString(i)), request);
+
+ strategy.await();
+ executor.shutdown();
+ cluster.close();
+ Cluster.Stats stats = cluster.stats();
+ long successes = stats.responsesByCode().get(200);
+ System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds");
+ System.err.println(stats);
+
+ assertEquals(documents, stats.requests());
+ assertEquals(documents, stats.responses());
+ assertEquals(documents, stats.responsesByCode().get(200));
+ assertEquals(0, stats.inflight());
+ assertEquals(0, stats.exceptions());
+ assertEquals(0, stats.bytesSent());
+ assertEquals(2 * documents, stats.bytesReceived());
+ }
+
+ @Test
+ void test() {
+ }
+
+}