diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2024-04-05 12:07:07 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-04-05 12:07:07 +0200 |
commit | ac578c9e185cb47071a1e86b56ff286d66241386 (patch) | |
tree | d82d24f57f66beaab6dc45c38deda4be856047fb | |
parent | 64da5d03db27aa11b70a6591b2167719bad1d0c0 (diff) | |
parent | 63e4294e0c4f68d2d4f6ddd292e5bc67b741473c (diff) |
Merge pull request #30826 from vespa-engine/balder/nap-insteadof-beautysleep
Balder/nap insteadof beautysleep
6 files changed, 42 insertions, 29 deletions
diff --git a/vespa-feed-client-api/abi-spec.json b/vespa-feed-client-api/abi-spec.json index 907d974ff09..9065edad92a 100644 --- a/vespa-feed-client-api/abi-spec.json +++ b/vespa-feed-client-api/abi-spec.json @@ -308,7 +308,7 @@ "public" ], "methods" : [ - "public void <init>(long, java.util.Map, long, long, long, long, long, long, long)", + "public void <init>(double, long, java.util.Map, long, long, long, long, long, long, long, long)", "public ai.vespa.feed.client.OperationStats since(ai.vespa.feed.client.OperationStats)", "public long requests()", "public long responses()", diff --git a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java index 2eb41838560..33d23b114fd 100644 --- a/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java +++ b/vespa-feed-client-api/src/main/java/ai/vespa/feed/client/OperationStats.java @@ -12,9 +12,11 @@ import java.util.stream.Collectors; */ public class OperationStats { + private final double duration; private final long requests; private final Map<Integer, Long> responsesByCode; private final long inflight; + private final long targetInflight; private final long exceptions; private final long averageLatencyMillis; private final long minLatencyMillis; @@ -22,13 +24,15 @@ public class OperationStats { private final long bytesSent; private final long bytesReceived; - public OperationStats(long requests, Map<Integer, Long> responsesByCode, long exceptions, long inflight, - long averageLatencyMillis, long minLatencyMillis, long maxLatencyMillis, - long bytesSent, long bytesReceived) { + public OperationStats(double duration, long requests, Map<Integer, Long> responsesByCode, long exceptions, + long inflight, long targetInFlight, long averageLatencyMillis, long minLatencyMillis, + long maxLatencyMillis, long bytesSent, long bytesReceived) { + this.duration = duration; this.requests = requests; this.responsesByCode = responsesByCode; this.exceptions = exceptions; this.inflight = inflight; + this.targetInflight = targetInFlight; this.averageLatencyMillis = averageLatencyMillis; this.minLatencyMillis = minLatencyMillis; this.maxLatencyMillis = maxLatencyMillis; @@ -36,14 +40,18 @@ public class OperationStats { this.bytesReceived = bytesReceived; } - /** Returns the difference between this and the initial. Min and max latency are not modified. */ + /** Returns the difference between this and the initial. + * Min and max latency, inflight and targetInflight are not modified. + */ public OperationStats since(OperationStats initial) { - return new OperationStats(requests - initial.requests, + return new OperationStats(duration - initial.duration, + requests - initial.requests, responsesByCode.entrySet().stream() - .collect(Collectors.toMap(entry -> entry.getKey(), + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue() - initial.responsesByCode.getOrDefault(entry.getKey(), 0L))), exceptions - initial.exceptions, - inflight - initial.inflight, + inflight, + targetInflight, responsesByCode.size() == initial.responsesByCode.size() ? 0 : (averageLatencyMillis * responsesByCode.size() - initial.averageLatencyMillis * initial.responsesByCode.size()) / (responsesByCode.size() - initial.responsesByCode.size()), @@ -123,11 +131,15 @@ public class OperationStats { @Override public String toString() { + Map<Integer, Double> rateByCode = responsesByCode.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()/duration)); return "Stats{" + "requests=" + requests + ", responsesByCode=" + responsesByCode + + ", responseRateByCode=" + rateByCode + ", exceptions=" + exceptions + ", inflight=" + inflight + + ", targetInflight=" + targetInflight + ", averageLatencyMillis=" + averageLatencyMillis + ", minLatencyMillis=" + minLatencyMillis + ", maxLatencyMillis=" + maxLatencyMillis + diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java index 753bc0240d3..99c891696f5 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/BenchmarkingCluster.java @@ -26,7 +26,9 @@ public class BenchmarkingCluster implements Cluster { return thread; }); + private final AtomicLong timeOfFirstDispatch = new AtomicLong(0); private final AtomicLong requests = new AtomicLong(); + private final Throttler throttler; private long results = 0; private long responses = 0; private final long[] responsesByCode = new long[600]; @@ -37,14 +39,18 @@ public class BenchmarkingCluster implements Cluster { private long bytesSent = 0; private long bytesReceived = 0; - public BenchmarkingCluster(Cluster delegate) { + public BenchmarkingCluster(Cluster delegate, Throttler throttler) { this.delegate = requireNonNull(delegate); + this.throttler = throttler; } @Override public void dispatch(HttpRequest request, CompletableFuture<HttpResponse> vessel) { requests.incrementAndGet(); long startNanos = System.nanoTime(); + if (timeOfFirstDispatch.get() == 0) { + timeOfFirstDispatch.set(startNanos); + } delegate.dispatch(request, vessel); vessel.whenCompleteAsync((response, thrown) -> { results++; @@ -88,15 +94,13 @@ public class BenchmarkingCluster implements Cluster { if (responsesByCode[code] > 0) responses.put(code, responsesByCode[code]); - return new OperationStats(requests, - responses, - exceptions, - requests - results, + double duration = (System.nanoTime() - timeOfFirstDispatch.get())*1e-9; + return new OperationStats(duration, requests, responses, exceptions, + requests - results, throttler.targetInflight(), this.responses == 0 ? -1 : totalLatencyMillis / this.responses, this.responses == 0 ? -1 : minLatencyMillis, this.responses == 0 ? -1 : maxLatencyMillis, - bytesSent, - bytesReceived); + bytesSent, bytesReceived); } @Override diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java index 6b4372fd11a..0268f1a4394 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/FeedClientBuilderImpl.java @@ -20,7 +20,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.function.Supplier; -import java.util.logging.Logger; import static ai.vespa.feed.client.FeedClientBuilder.Compression.auto; import static java.util.Objects.requireNonNull; @@ -33,8 +32,6 @@ import static java.util.Objects.requireNonNull; */ public class FeedClientBuilderImpl implements FeedClientBuilder { - private static final Logger log = Logger.getLogger(FeedClientBuilderImpl.class.getName()); - static final FeedClient.RetryStrategy defaultRetryStrategy = new FeedClient.RetryStrategy() { }; List<URI> endpoints; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java index a495efd87eb..8f0327a1738 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/HttpRequestStrategy.java @@ -67,10 +67,10 @@ class HttpRequestStrategy implements RequestStrategy { }); HttpRequestStrategy(FeedClientBuilderImpl builder, Cluster cluster) { - this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; + this.throttler = new DynamicThrottler(builder); + this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster, throttler) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; - this.throttler = new DynamicThrottler(builder); Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher"); dispatcher.setDaemon(true); @@ -92,7 +92,7 @@ class HttpRequestStrategy implements RequestStrategy { while (breaker.state() != OPEN && ! destroyed.get()) { while ( ! isInExcess() && poll() && breaker.state() == CLOSED); // Sleep when circuit is half-open, nap when queue is empty, or we are throttled. - Thread.sleep(breaker.state() == HALF_OPEN ? 1000 : 10); + Thread.sleep(breaker.state() == HALF_OPEN ? 100 : 1); } } catch (Throwable t) { diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index c943e3b139f..54fab9b859b 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -12,8 +12,8 @@ import org.junit.jupiter.api.Test; import java.io.IOException; import java.net.URI; import java.time.Duration; -import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -44,9 +44,9 @@ class HttpRequestStrategyTest { HttpRequest request = new HttpRequest("PUT", "/", null, null, null); HttpResponse response = HttpResponse.of(200, "{}".getBytes(UTF_8)); ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); - Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS)); + Cluster cluster = (__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS); - HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123"))) + HttpRequestStrategy strategy = new HttpRequestStrategy( new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setConnectionsPerEndpoint(1 << 10) .setMaxStreamPerConnection(1 << 12), cluster); @@ -66,7 +66,7 @@ class HttpRequestStrategyTest { latch.countDown(); executor.shutdown(); cluster.close(); - OperationStats stats = cluster.stats(); + OperationStats stats = strategy.stats(); long successes = stats.responsesByCode().get(200); System.err.println(successes + " successes in " + (System.nanoTime() - startNanos) * 1e-9 + " seconds"); System.err.println(stats); @@ -86,7 +86,7 @@ class HttpRequestStrategyTest { MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); - HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123"))) + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public boolean retry(FeedClient.OperationType type) { return type == FeedClient.OperationType.PUT; } @Override public int retries() { return 1; } @@ -94,7 +94,7 @@ class HttpRequestStrategyTest { .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1) .setMaxStreamPerConnection(minStreams), - new BenchmarkingCluster(cluster)); + cluster); OperationStats initial = strategy.stats(); DocumentId id1 = DocumentId.of("ns", "type", "1"); @@ -195,13 +195,13 @@ class HttpRequestStrategyTest { MockCluster cluster = new MockCluster(); AtomicLong now = new AtomicLong(0); CircuitBreaker breaker = new GracePeriodCircuitBreaker(now::get, Duration.ofSeconds(1), Duration.ofMinutes(10)); - HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(Collections.singletonList(URI.create("https://dummy.com:123"))) + HttpRequestStrategy strategy = new HttpRequestStrategy(new FeedClientBuilderImpl(List.of(URI.create("https://dummy.com:123"))) .setRetryStrategy(new FeedClient.RetryStrategy() { @Override public int retries() { return 1; } }) .setCircuitBreaker(breaker) .setConnectionsPerEndpoint(1), - new BenchmarkingCluster(cluster)); + cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); DocumentId id2 = DocumentId.of("ns", "type", "2"); |