diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 14:03:41 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-09 14:07:43 +0200 |
commit | 46c91ff61b279ef89cf41a9b28f921c42c9de464 (patch) | |
tree | 6341635bf64fc2e8333d037fda069ee8b81e6a14 /vespa-feed-client/src | |
parent | c1df455e40f18e7a6e8814fb40d725bab07eb601 (diff) |
Add BenchmarkingCluster to the mix
Diffstat (limited to 'vespa-feed-client/src')
3 files changed, 255 insertions, 1 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java new file mode 100644 index 00000000000..a7d4e79356c --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/BenchmarkingCluster.java @@ -0,0 +1,105 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.util.Objects.requireNonNull; + +public class BenchmarkingCluster implements Cluster { + + private final Cluster delegate; + private final ExecutorService executor = Executors.newSingleThreadExecutor(runnable -> { + Thread thread = new Thread(runnable, "cluster-stats-collector"); + thread.setDaemon(true); + return thread; + }); + + private final AtomicLong requests = new AtomicLong(); + private long results = 0; + private long responses = 0; + private final long[] responsesByCode = new long[600]; + private long exceptions = 0; + private long totalLatencyMillis = 0; + private long minLatencyMillis = 0; + private long maxLatencyMillis = 0; + private long bytesSent = 0; + private long bytesReceived = 0; + + public BenchmarkingCluster(Cluster delegate) { + this.delegate = requireNonNull(delegate); + } + + @Override + public void dispatch(SimpleHttpRequest request, CompletableFuture<SimpleHttpResponse> vessel) { + requests.incrementAndGet(); + long startMillis = System.currentTimeMillis(); + delegate.dispatch(request, vessel); + vessel.whenCompleteAsync((response, thrown) -> { + results++; + if (thrown == null) { + responses++; + responsesByCode[response.getCode()]++; + long latency = System.currentTimeMillis() - startMillis; + totalLatencyMillis += latency; + minLatencyMillis = Math.min(minLatencyMillis, latency); + maxLatencyMillis = Math.max(maxLatencyMillis, latency); + bytesSent += request.getBodyBytes() == null ? 0 : request.getBodyBytes().length; + bytesReceived += response.getBodyBytes() == null ? 0 : response.getBodyBytes().length; + } + else + exceptions++; + }, + executor); + } + + @Override + public Stats stats() { + try { + try { + return executor.submit(this::getStats).get(); + } + catch (RejectedExecutionException ignored) { + executor.awaitTermination(10, TimeUnit.SECONDS); + return getStats(); + } + } + catch (InterruptedException | ExecutionException ignored) { + throw new RuntimeException(ignored); + } + } + + private Stats getStats() { + Map<Integer, Long> responses = new HashMap<>(); + for (int code = 0; code < responsesByCode.length; code++) + if (responsesByCode[code] > 0) + responses.put(code, responsesByCode[code]); + + return new Stats(requests.get(), + responses, + exceptions, + requests.get() - results, + totalLatencyMillis / this.responses, + minLatencyMillis, + maxLatencyMillis, + bytesSent, + bytesReceived); + } + + @Override + public void close() { + delegate.close(); + executor.shutdown(); + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java index fde230d3ca4..9ffccd740e8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/Cluster.java @@ -5,10 +5,12 @@ import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; import java.io.Closeable; +import java.util.Collections; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** - * Allows dispatch to a Vespa cluster. {@link #dispatch} should be called by a single thread, i.e., it is not thread-safe. + * Allows dispatch to a Vespa cluster. */ interface Cluster extends Closeable { @@ -18,4 +20,89 @@ interface Cluster extends Closeable { @Override default void close() { } + default Stats stats() { return new Stats(0, Collections.emptyMap(), 0, 0, 0, 0, 0, 0, 0); } + + class Stats { + + private final long requests; + private final Map<Integer, Long> responsesByCode; + private final long inflight; + private final long exceptions; + private final long averageLatencyMillis; + private final long minLatencyMillis; + private final long maxLatencyMillis; + private final long bytesSent; + private final long bytesReceived; + + public Stats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight, + long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis, + long bytesSent, long bytesReceived) { + this.requests = requests; + this.responsesByCode = responsesByCode; + this.exceptions = exceptions; + this.inflight = inflight; + this.averageLatencyMillis = averageLatencyMillis; + this.minLatencyMillis = minLatencyMillis; + this.maxLatencyMillis = maxLatencyMillis; + this.bytesSent = bytesSent; + this.bytesReceived = bytesReceived; + } + + public long requests() { + return requests; + } + + public long responses() { + return requests - inflight; + } + + public Map<Integer, Long> responsesByCode() { + return responsesByCode; + } + + public long exceptions() { + return exceptions; + } + + public long inflight() { + return inflight; + } + + public long averageLatencyMillis() { + return averageLatencyMillis; + } + + public long minLatencyMillis() { + return minLatencyMillis; + } + + public long maxLatencyMillis() { + return maxLatencyMillis; + } + + public long bytesSent() { + return bytesSent; + } + + public long bytesReceived() { + return bytesReceived; + } + + @Override + public String toString() { + return "Stats{" + + "requests=" + requests + + ", responsesByCode=" + responsesByCode + + ", exceptions=" + exceptions + + ", inflight=" + inflight + + ", averageLatencyMillis=" + averageLatencyMillis + + ", minLatencyMillis=" + minLatencyMillis + + ", maxLatencyMillis=" + maxLatencyMillis + + ", bytesSent=" + bytesSent + + ", bytesReceived=" + bytesReceived + + '}'; + } + + } + } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java new file mode 100644 index 00000000000..cee292a41e0 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java @@ -0,0 +1,62 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class HttpRequestStrategyTest { + + @Test + void testConcurrency() { + int documents = 1 << 16; + SimpleHttpRequest request = new SimpleHttpRequest("PUT", "/"); + SimpleHttpResponse response = new SimpleHttpResponse(200); + response.setBody("{}".getBytes(UTF_8), ContentType.APPLICATION_JSON); + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), 100, TimeUnit.MILLISECONDS)); + + HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123")) + .setConnectionsPerEndpoint(1 << 12) + .setMaxStreamPerConnection(1 << 4), + cluster); + long startNanos = System.nanoTime(); + for (int i = 0; i < documents; i++) + strategy.enqueue(DocumentId.of("ns", "type", Integer.toString(i)), request); + + strategy.await(); + executor.shutdown(); + cluster.close(); + Cluster.Stats stats = cluster.stats(); + long successes = stats.responsesByCode().get(200); + System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); + System.err.println(stats); + + assertEquals(documents, stats.requests()); + assertEquals(documents, stats.responses()); + assertEquals(documents, stats.responsesByCode().get(200)); + assertEquals(0, stats.inflight()); + assertEquals(0, stats.exceptions()); + assertEquals(0, stats.bytesSent()); + assertEquals(2 * documents, stats.bytesReceived()); + } + + @Test + void test() { + } + +} |