summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-18 16:00:38 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-21 09:29:13 +0200
commit5d17f472e3d4bae2afdba93a8e30f79999d292b2 (patch)
tree2a23074c69e25d7befbe3dc132ab57494c3666f1 /vespa-feed-client
parent7e20b3b9e65d80cf2240e2afc6c8946db298f7b8 (diff)
Add Throttler interface
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java88
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java18
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java62
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java45
4 files changed, 172 insertions, 41 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
new file mode 100644
index 00000000000..44f5d8b537f
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
@@ -0,0 +1,88 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.Math.log;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import static java.lang.Math.pow;
+import static java.lang.Math.random;
+
+/**
+ * Samples latency as a function of inflight requests, and regularly adjusts to the optimal value.
+ *
+ * @author jonmv
+ */
+class DynamicThrottler extends StaticThrottler {
+
+ private final AtomicLong targetInflight;
+ private long updateNanos = 0;
+ private final List<AtomicReference<Double>> latencies = new ArrayList<>();
+ private final double weight = 0.5; // Higher weight favours higher (exclusive) throughput, at the cost of (shared) latency.
+
+ public DynamicThrottler(FeedClientBuilder builder) {
+ super(builder);
+ this.targetInflight = new AtomicLong((long) (pow(minInflight, 0.5) * pow(maxInflight, 0.5)));
+ for (int i = 0; i < 1024; i++)
+ latencies.add(new AtomicReference<>(-1.0));
+ }
+
+ @Override
+ public void sent(long inflight, CompletableFuture<HttpResponse> vessel) {
+ long startNanos = System.nanoTime();
+ if (updateNanos == 0) updateNanos = System.nanoTime();
+ boolean update = startNanos - updateNanos >= 1e8; // Ship ten updates per second.
+ if (update) updateNanos = startNanos;
+
+ vessel.whenComplete((response, thrown) -> {
+ // Use buckets for latency measurements, with inflight along a log scale,
+ // and with minInflight and maxInflight at the ends.
+ int index = (int) (latencies.size() * log(max(1, inflight / minInflight))
+ / log((double) maxInflight / minInflight));
+ long nowNanos = System.nanoTime();
+ long latencyNanos = nowNanos - startNanos;
+ double w1 = 0.5; // Update values with some of the new measurement, some of the old.
+ double w2 = response != null && response.code() / 100 == 2 ? 1 - w1 : 1; // Punish non-successes.
+ latencies.get(index).updateAndGet(latency -> latency < 0 ? latencyNanos : latencyNanos * w1 + latency * w2);
+ if ( ! update)
+ return;
+
+ // Loop over latency measurements and pick the one which optimises throughput and latency.
+ double choice = -1;
+ double max = -1;
+ for (int i = latencies.size(); i-- > 0; ) {
+ double latency = latencies.get(i).get();
+ if (latency < 0) continue; // Skip unknown values.
+ double target = minInflight * pow((double) maxInflight / minInflight, (double) i / latencies.size());
+ double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight).
+ if (objective > max) {
+ max = objective;
+ choice = target;
+ }
+ }
+ long target = (long) ((random() * 0.1 + 0.97) * choice); // Random walk, skewed towards increase.
+ targetInflight.set(max(minInflight, min(maxInflight, target)));
+ });
+ }
+
+ @Override
+ public void success() {
+ super.success();
+ }
+
+ @Override
+ public void throttled(long inflight) {
+ super.throttled(inflight);
+ }
+
+ @Override
+ public long targetInflight() {
+ return min(super.targetInflight(), targetInflight.get());
+ }
+
+}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
index 952edfb5464..f39b56ad50f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/FeedClient.java
@@ -92,4 +92,22 @@ public interface FeedClient extends Closeable {
}
+
+ /** Determines the number of requests to have inflight at any point. */
+ interface Throttler {
+
+ /** A request was just sent with {@code vessel}, with {@code inflight} total in flight. */
+ void sent(long inflight, CompletableFuture<HttpResponse> vessel);
+
+ /** A successful response was obtained. */
+ void success();
+
+ /** A throttle signal was obtained from the server. */
+ void throttled(long inflight);
+
+ /** The target inflight operations right now. */
+ long targetInflight();
+
+ }
+
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 1bd69d1e1f6..32032cda54b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -21,8 +21,6 @@ import java.util.logging.Logger;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.CLOSED;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.HALF_OPEN;
import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
-import static java.lang.Math.max;
-import static java.lang.Math.min;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.WARNING;
@@ -46,14 +44,11 @@ class HttpRequestStrategy implements RequestStrategy {
private final Map<DocumentId, CompletableFuture<?>> inflightById = new ConcurrentHashMap<>();
private final RetryStrategy strategy;
private final CircuitBreaker breaker;
+ final FeedClient.Throttler throttler;
private final Queue<Runnable> queue = new ConcurrentLinkedQueue<>();
- private final long maxInflight;
- private final long minInflight;
- private final AtomicLong targetInflightX10; // 10x target, so we can increment one every tenth success.
private final AtomicLong inflight = new AtomicLong(0);
private final AtomicBoolean destroyed = new AtomicBoolean(false);
private final AtomicLong delayedCount = new AtomicLong(0);
- private final AtomicLong retries = new AtomicLong(0);
private final ExecutorService resultExecutor = Executors.newSingleThreadExecutor(runnable -> {
Thread thread = new Thread(runnable, "feed-client-result-executor");
thread.setDaemon(true);
@@ -68,9 +63,7 @@ class HttpRequestStrategy implements RequestStrategy {
this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
- this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection;
- this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection);
- this.targetInflightX10 = new AtomicLong(10 * (long) (Math.sqrt(minInflight) * Math.sqrt(maxInflight)));
+ this.throttler = new DynamicThrottler(builder);
Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
dispatcher.setDaemon(true);
@@ -102,9 +95,12 @@ class HttpRequestStrategy implements RequestStrategy {
destroy();
}
- private void offer(Runnable task) {
+ private void offer(HttpRequest request, CompletableFuture<HttpResponse> vessel) {
delayedCount.incrementAndGet();
- queue.offer(task);
+ queue.offer(() -> {
+ cluster.dispatch(request, vessel);
+ throttler.sent(inflight.get(), vessel);
+ });
}
private boolean poll() {
@@ -115,8 +111,9 @@ class HttpRequestStrategy implements RequestStrategy {
return true;
}
+
private boolean isInExcess() {
- return inflight.get() - delayedCount.get() > targetInflight();
+ return inflight.get() - delayedCount.get() > throttler.targetInflight();
}
private boolean retry(HttpRequest request, int attempt) {
@@ -147,23 +144,11 @@ class HttpRequestStrategy implements RequestStrategy {
return false;
}
- private void incrementTargetInflight() {
- targetInflightX10.incrementAndGet();
- }
-
- private void decreaseTargetInflight() {
- targetInflightX10.set(max((inflight.get() - delayedCount.get()) * 9, minInflight * 10));
- }
-
- private long targetInflight() {
- return min(targetInflightX10.get() / 10, maxInflight);
- }
-
/** Retries throttled requests (429, 503), adjusting the target inflight count, and server errors (500, 502). */
private boolean retry(HttpRequest request, HttpResponse response, int attempt) {
if (response.code() / 100 == 2) {
breaker.success();
- incrementTargetInflight();
+ throttler.success();
return false;
}
@@ -171,7 +156,7 @@ class HttpRequestStrategy implements RequestStrategy {
") on attempt " + attempt + " at " + request);
if (response.code() == 429 || response.code() == 503) { // Throttling; reduce target inflight.
- decreaseTargetInflight();
+ throttler.throttled((inflight.get() - delayedCount.get()));
return true;
}
@@ -184,7 +169,7 @@ class HttpRequestStrategy implements RequestStrategy {
private void acquireSlot() {
try {
- while (inflight.get() >= targetInflight())
+ while (inflight.get() >= throttler.targetInflight())
Thread.sleep(1);
inflight.incrementAndGet();
@@ -220,27 +205,23 @@ class HttpRequestStrategy implements RequestStrategy {
if (previous == null) {
acquireSlot();
- offer(() -> cluster.dispatch(request, vessel));
+ offer(request, vessel);
}
else
- previous.whenComplete((__, ___) -> offer(() -> cluster.dispatch(request, vessel)));
+ previous.whenComplete((__, ___) -> offer(request, vessel));
handleAttempt(vessel, request, result, 1);
- result.whenComplete((__, ___) -> {
+ return result.handle((response, error) -> {
if (inflightById.compute(documentId, (____, current) -> current == result ? null : current) == null)
releaseSlot();
- });
- result.handle((response, error) -> {
if (error != null) {
- if (error instanceof FeedException) throw (FeedException)error;
+ if (error instanceof FeedException) throw (FeedException) error;
throw new FeedException(documentId, error);
}
return response;
});
-
- return result;
}
/** Handles the result of one attempt at the given operation, retrying if necessary. */
@@ -249,10 +230,9 @@ class HttpRequestStrategy implements RequestStrategy {
// Retry the operation if it failed with a transient error ...
if (thrown != null ? retry(request, thrown, attempt)
: retry(request, response, attempt)) {
- retries.incrementAndGet();
CircuitBreaker.State state = breaker.state();
CompletableFuture<HttpResponse> retry = new CompletableFuture<>();
- offer(() -> cluster.dispatch(request, retry));
+ offer(request, retry);
handleAttempt(retry, request, result, attempt + (state == HALF_OPEN ? 0 : 1));
}
// ... or accept the outcome and mark the operation as complete.
@@ -266,11 +246,11 @@ class HttpRequestStrategy implements RequestStrategy {
@Override
public void destroy() {
- if ( ! destroyed.getAndSet(true))
+ if ( ! destroyed.getAndSet(true)) {
inflightById.values().forEach(result -> result.cancel(true));
-
- cluster.close();
- resultExecutor.shutdown();
+ cluster.close();
+ resultExecutor.shutdown();
+ }
}
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
new file mode 100644
index 00000000000..4e0c4fe90f0
--- /dev/null
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/StaticThrottler.java
@@ -0,0 +1,45 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package ai.vespa.feed.client;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+
+/**
+ * Reduces max throughput whenever throttled; increases it slowly whenever successful responses are obtained.
+ *
+ * @author jonmv
+ */
+public class StaticThrottler implements FeedClient.Throttler {
+
+ protected final long maxInflight;
+ protected final long minInflight;
+ private final AtomicLong targetX10;
+
+ public StaticThrottler(FeedClientBuilder builder) {
+ this.maxInflight = builder.connectionsPerEndpoint * (long) builder.maxStreamsPerConnection;
+ this.minInflight = builder.connectionsPerEndpoint * (long) min(16, builder.maxStreamsPerConnection);
+ this.targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
+ }
+
+ @Override
+ public void sent(long inflight, CompletableFuture<HttpResponse> vessel) { }
+
+ @Override
+ public void success() {
+ targetX10.incrementAndGet();
+ }
+
+ @Override
+ public void throttled(long inflight) {
+ targetX10.set(max(inflight * 5, minInflight * 10));
+ }
+
+ @Override
+ public long targetInflight() {
+ return min(maxInflight, targetX10.get() / 10);
+ }
+
+}