summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client
diff options
context:
space:
mode:
authorJon Marius Venstad <venstad@gmail.com>2021-06-21 10:03:48 +0200
committerJon Marius Venstad <venstad@gmail.com>2021-06-21 10:03:48 +0200
commitcae18255a54eadc1d7949dd3564a8cc9a6056c75 (patch)
treed08b9b1accd8e6e17c53bfb4e54ba2f12bb67a96 /vespa-feed-client
parent7f0b51b604fd1e84be5684a8389c9e111fc5ede7 (diff)
Some adjustments and fix int -> float division, try dynamic throttler
Diffstat (limited to 'vespa-feed-client')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java16
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java4
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java4
3 files changed, 12 insertions, 12 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
index 6d23fed33f8..53e2ece3125 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/DynamicThrottler.java
@@ -23,12 +23,12 @@ public class DynamicThrottler extends StaticThrottler {
private final AtomicLong targetInflight;
private long updateNanos = 0;
private final List<AtomicReference<Double>> latencies = new ArrayList<>();
- private final double weight = 0.7; // Higher weight favours higher (own) throughput, at the cost of (shared) latency.
+ private final double weight = 0.8; // Higher weight favours higher (own) throughput, at the cost of (shared) latency.
public DynamicThrottler(FeedClientBuilder builder) {
super(builder);
- this.targetInflight = new AtomicLong((long) (pow(minInflight, 0.5) * pow(maxInflight, 0.5)));
- for (int i = 0; i < 1024; i++)
+ this.targetInflight = new AtomicLong(128L * builder.connectionsPerEndpoint * builder.endpoints.size());
+ for (int i = 0; i < 128; i++)
latencies.add(new AtomicReference<>(-1.0));
}
@@ -42,11 +42,11 @@ public class DynamicThrottler extends StaticThrottler {
vessel.whenComplete((response, thrown) -> {
// Use buckets for latency measurements, with inflight along a log scale,
// and with minInflight and maxInflight at the ends.
- int index = (int) (latencies.size() * log(max(1, inflight / minInflight))
- / log((double) maxInflight / minInflight));
+ int index = (int) (latencies.size() * log(max(1, (double) inflight / minInflight))
+ / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
long nowNanos = System.nanoTime();
long latencyNanos = nowNanos - startNanos;
- double w1 = 0.5; // Update values with some of the new measurement, some of the old.
+ double w1 = 1e-2; // Update values with some of the new measurement, some of the old.
double w2 = response != null && response.code() / 100 == 2 ? 1 - w1 : 1; // Punish non-successes.
latencies.get(index).updateAndGet(latency -> latency < 0 ? latencyNanos : latencyNanos * w1 + latency * w2);
if ( ! update)
@@ -58,14 +58,14 @@ public class DynamicThrottler extends StaticThrottler {
for (int i = latencies.size(); i-- > 0; ) {
double latency = latencies.get(i).get();
if (latency < 0) continue; // Skip unknown values.
- double target = minInflight * pow((double) maxInflight / minInflight, (double) i / latencies.size());
+ double target = minInflight * pow(256, (i + 0.5) / latencies.size());
double objective = pow(target, weight) / latency; // Optimise throughput (weight), but also latency (1 - weight).
if (objective > max) {
max = objective;
choice = target;
}
}
- long target = (long) ((random() * 0.1 + 0.97) * choice); // Random walk, skewed towards increase.
+ long target = (long) ((random() * 0.15 + 0.95) * choice); // Random walk, skewed towards increase.
targetInflight.set(max(minInflight, min(maxInflight, target)));
});
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
index 763357d8fbb..e9cd0baba5b 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/HttpRequestStrategy.java
@@ -63,7 +63,7 @@ class HttpRequestStrategy implements RequestStrategy {
this.cluster = builder.benchmark ? new BenchmarkingCluster(cluster) : cluster;
this.strategy = builder.retryStrategy;
this.breaker = builder.circuitBreaker;
- this.throttler = new StaticThrottler(builder);
+ this.throttler = new DynamicThrottler(builder);
Thread dispatcher = new Thread(this::dispatch, "feed-client-dispatcher");
dispatcher.setDaemon(true);
@@ -99,7 +99,6 @@ class HttpRequestStrategy implements RequestStrategy {
delayedCount.incrementAndGet();
queue.offer(() -> {
cluster.dispatch(request, vessel);
- throttler.sent(inflight.get(), vessel);
});
}
@@ -206,6 +205,7 @@ class HttpRequestStrategy implements RequestStrategy {
if (previous == null) {
acquireSlot();
offer(request, vessel);
+ throttler.sent(inflight.get(), result);
}
else
previous.whenComplete((__, ___) -> offer(request, vessel));
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
index 4ef24713e9a..21ab6889e6e 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/HttpRequestStrategyTest.java
@@ -40,8 +40,8 @@ class HttpRequestStrategyTest {
Cluster cluster = new BenchmarkingCluster((__, vessel) -> executor.schedule(() -> vessel.complete(response), (int) (Math.random() * 2 * 10), TimeUnit.MILLISECONDS));
HttpRequestStrategy strategy = new HttpRequestStrategy(FeedClientBuilder.create(URI.create("https://dummy.com:123"))
- .setConnectionsPerEndpoint(1 << 4)
- .setMaxStreamPerConnection(1 << 20),
+ .setConnectionsPerEndpoint(1 << 10)
+ .setMaxStreamPerConnection(1 << 12),
cluster);
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {