diff options
author | Jon Marius Venstad <venstad@gmail.com> | 2021-06-18 16:00:38 +0200 |
---|---|---|
committer | Jon Marius Venstad <venstad@gmail.com> | 2021-06-21 09:29:13 +0200 |
commit | 5d17f472e3d4bae2afdba93a8e30f79999d292b2 (patch) | |
tree | 2a23074c69e25d7befbe3dc132ab57494c3666f1 /vespa-feed-client | |
parent | 7e20b3b9e65d80cf2240e2afc6c8946db298f7b8 (diff) |
Add Throttler interface
Diffstat (limited to 'vespa-feed-client')
4 files changed, 172 insertions, 41 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java new file mode 100644 index 00000000000..44f5d8b537f --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java @@ -0,0 +1,88 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.Math.log; +import static java.lang.Math.max; +import static java.lang.Math.min; +import static java.lang.Math.pow; +import static java.lang.Math.random; + +/** + * Samples latency as a function of inflight requests, and regularly adjusts to the optimal value. + * + * @author jonmv + */ +class DynamicThrottler extends StaticThrottler { + + private final AtomicLong targetInflight; + private long updateNanos = 0; + private final List<AtomicReference<Double>> latencies = new ArrayList<>(); + private final double weight = 0.5; // Higher weight favours higher (exclusive) throughput, at the cost of (shared) latency. + + public DynamicThrottler(FeedClientBuilder builder) { + super(builder); + this.targetInflight = new AtomicLong((long) (pow(minInflight, 0.5) * pow(maxInflight, 0.5))); + for (int i = 0; i < 1024; i++) + latencies.add(new AtomicReference<>(-1.0)); + } + + @Override + public void sent(long inflight, CompletableFuture<HttpResponse> vessel) { + long startNanos = System.nanoTime(); + if (updateNanos == 0) updateNanos = System.nanoTime(); + boolean update = startNanos - updateNanos >= 1e8; // Ship ten updates per second. + if (update) updateNanos = startNanos; + + vessel.whenComplete((response, thrown) -> { + // Use buckets for latency measurements, with inflight along a log scale, + // and with minInflight and maxInflight at the ends. + int index = (int) (latencies.size() * log(max(1, inflight / minInflight)) + / log((double) maxInflight / minInflight)); + long nowNanos = System.nanoTime(); + long latencyNanos = nowNanos - startNanos; + double w1 = 0.5; // Update values with some of the new measurement, some of the old. + double w2 = response != null && response.code() / 100 == 2 ? 1 - w1 : 1; // Punish non-successes. + latencies.get(index).updateAndGet(latency -> latency < 0 ? latencyNanos : latencyNanos * w1 + latency * w2); + if ( ! update) + return; + + // Loop over latency measurements and pick the one which optimises throughput and latency. + double choice = -1; + double max = -1; + for (int i = latencies.size(); i-- > 0; ) { + double latency = latencies.get(i).get(); + if (latency < 0) continue; // Skip unknown values. + double target = minInflight * pow((double) maxInflight / minInflight, (double) i / latencies.size()); + double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight). + if (objective > max) { + max = objective; + choice = target; + } + } + long target = (long) ((random() * 0.1 + 0.97) * choice); // Random walk, skewed towards increase. + targetInflight.set(max(minInflight, min(maxInflight, target))); + }); + } + + @Override + public void success() { + super.success(); + } + + @Override + public void throttled(long inflight) { + super.throttled(inflight); + } + + @Override + public long targetInflight() { + return min(super.targetInflight(), targetInflight.get()); + } + +} diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java index 952edfb5464..f39b56ad50f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java @@ -92,4 +92,22 @@ public interface FeedClient extends Closeable { } + + /** Determines the number of requests to have inflight at any point. */ + interface Throttler { + + /** A request was just sent with {@code vessel}, with {@code inflight} total in flight. */ + void sent(long inflight, CompletableFuture<HttpResponse> vessel); + + /** A successful response was obtained. */ + void success(); + + /** A throttle signal was obtained from the server. */ + void throttled(long inflight); + + /** The target inflight operations right now. */ + long targetInflight(); + + } + } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java index 1bd69d1e1f6..32032cda54b 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java @@ -21,8 +21,6 @@ import java.util.logging.Logger; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN; import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; -import static java.lang.Math.max; -import static java.lang.Math.min; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.logging.Level.FINE; import static java.util.logging.Level.WARNING; @@ -46,14 +44,11 @@ class HttpRequestStrategy implements RequestStrategy { private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>(); private final RetryStrategy strategy; private final CircuitBreaker breaker; + final FeedClient.Throttler throttler; private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>(); - private final long maxInflight; - private final long minInflight; - private final AtomicLong targetInflightX10; // 10x target, so we can increment one every tenth success. private final AtomicLong inflight = new AtomicLong(0); private final AtomicBoolean destroyed = new AtomicBoolean(false); private final AtomicLong delayedCount = new AtomicLong(0); - private final AtomicLong retries = new AtomicLong(0); private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread(runnable, "feed-client-result-executor"); thread.setDaemon(true); @@ -68,9 +63,7 @@ class HttpRequestStrategy implements RequestStrategy { this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster; this.strategy = builder.retryStrategy; this.breaker = builder.circuitBreaker; - this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; - this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection); - this.targetInflightX10 = new AtomicLong(10 * (long) (Math.sqrt(minInflight) * Math.sqrt(maxInflight))); + this.throttler = new DynamicThrottler(builder); Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher"); dispatcher.setDaemon(true); @@ -102,9 +95,12 @@ class HttpRequestStrategy implements RequestStrategy { destroy(); } - private void offer(Runnable task) { + private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) { delayedCount.incrementAndGet(); - queue.offer(task); + queue.offer(() -> { + cluster.dispatch(request, vessel); + throttler.sent(inflight.get(), vessel); + }); } private boolean poll() { @@ -115,8 +111,9 @@ class HttpRequestStrategy implements RequestStrategy { return true; } + private boolean isInExcess() { - return inflight.get() - delayedCount.get() > targetInflight(); + return inflight.get() - delayedCount.get() > throttler.targetInflight(); } private boolean retry(HttpRequest request, int attempt) { @@ -147,23 +144,11 @@ class HttpRequestStrategy implements RequestStrategy { return false; } - private void incrementTargetInflight() { - targetInflightX10.incrementAndGet(); - } - - private void decreaseTargetInflight() { - targetInflightX10.set(max((inflight.get() - delayedCount.get()) * 9, minInflight * 10)); - } - - private long targetInflight() { - return min(targetInflightX10.get() / 10, maxInflight); - } - /** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */ private boolean retry(HttpRequest request, HttpResponse response, int attempt) { if (response.code() / 100 == 2) { breaker.success(); - incrementTargetInflight(); + throttler.success(); return false; } @@ -171,7 +156,7 @@ class HttpRequestStrategy implements RequestStrategy { ") on attempt " + attempt + " at " + request); if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight. - decreaseTargetInflight(); + throttler.throttled((inflight.get() - delayedCount.get())); return true; } @@ -184,7 +169,7 @@ class HttpRequestStrategy implements RequestStrategy { private void acquireSlot() { try { - while (inflight.get() >= targetInflight()) + while (inflight.get() >= throttler.targetInflight()) Thread.sleep(1); inflight.incrementAndGet(); @@ -220,27 +205,23 @@ class HttpRequestStrategy implements RequestStrategy { if (previous == null) { acquireSlot(); - offer(() -> cluster.dispatch(request, vessel)); + offer(request, vessel); } else - previous.whenComplete((__, ___) -> offer(() -> cluster.dispatch(request, vessel))); + previous.whenComplete((__, ___) -> offer(request, vessel)); handleAttempt(vessel, request, result, 1); - result.whenComplete((__, ___) -> { + return result.handle((response, error) -> { if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null) releaseSlot(); - }); - result.handle((response, error) -> { if (error != null) { - if (error instanceof FeedException) throw (FeedException)error; + if (error instanceof FeedException) throw (FeedException) error; throw new FeedException(documentId, error); } return response; }); - - return result; } /** Handles the result of one attempt at the given operation, retrying if necessary. */ @@ -249,10 +230,9 @@ class HttpRequestStrategy implements RequestStrategy { // Retry the operation if it failed with a transient error ... if (thrown != null ? retry(request, thrown, attempt) : retry(request, response, attempt)) { - retries.incrementAndGet(); CircuitBreaker.State state = breaker.state(); CompletableFuture<HttpResponse> retry = new CompletableFuture<>(); - offer(() -> cluster.dispatch(request, retry)); + offer(request, retry); handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1)); } // ... or accept the outcome and mark the operation as complete. @@ -266,11 +246,11 @@ class HttpRequestStrategy implements RequestStrategy { @Override public void destroy() { - if ( ! destroyed.getAndSet(true)) + if ( ! destroyed.getAndSet(true)) { inflightById.values().forEach(result -> result.cancel(true)); - - cluster.close(); - resultExecutor.shutdown(); + cluster.close(); + resultExecutor.shutdown(); + } } } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java new file mode 100644 index 00000000000..4e0c4fe90f0 --- /dev/null +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java @@ -0,0 +1,45 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package ai.vespa.feed.client; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; + +import static java.lang.Math.max; +import static java.lang.Math.min; + +/** + * Reduces max throughput whenever throttled; increases it slowly whenever successful responses are obtained. + * + * @author jonmv + */ +public class StaticThrottler implements FeedClient.Throttler { + + protected final long maxInflight; + protected final long minInflight; + private final AtomicLong targetX10; + + public StaticThrottler(FeedClientBuilder builder) { + this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection; + this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection); + this.targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. + } + + @Override + public void sent(long inflight, CompletableFuture<HttpResponse> vessel) { } + + @Override + public void success() { + targetX10.incrementAndGet(); + } + + @Override + public void throttled(long inflight) { + targetX10.set(max(inflight * 5, minInflight * 10)); + } + + @Override + public long targetInflight() { + return min(maxInflight, targetX10.get() / 10); + } + +} |