summaryrefslogtreecommitdiffstats
path: root/vespa-feed-client/src
diff options
context:
space:
mode:
Diffstat (limited to 'vespa-feed-client/src')
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java28
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java30
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java5
4 files changed, 55 insertions, 10 deletions
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index 951a1776b6f..567788b8501 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler {
public DynamicThrottler(FeedClientBuilderImpl builder) {
super(builder);
- targetInflight = new AtomicLong(8 * minInflight);
+ targetInflight = new AtomicLong(minInflight);
}
@Override
public void sent(long __, CompletableFuture<HttpResponse> ___) {
- double currentInflight = targetInflight.get();
+ double currentInflight = targetInflight();
if (++sent * sent * sent < 1e2 * currentInflight * currentInflight)
return;
@@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler {
// Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight).
int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight)))
- / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection)
+ / log(256)); // 512 (server max streams per connection) / 2 (our min per connection)
throughputs[index] = currentThroughput;
// Loop over throughput measurements and pick the one which optimises throughput and latency.
- double choice = currentInflight;
+ double best = currentInflight;
double max = -1;
- for (int i = throughputs.length; i-- > 0; ) {
+ int j = -1, k = -1, choice = 0;
+ double s = 0;
+ for (int i = 0; i < throughputs.length; i++) {
if (throughputs[i] == 0) continue; // Skip unknown values.
double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length);
double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight).
if (objective > max) {
max = objective;
- choice = inflight;
+ best = inflight;
+ choice = i;
}
+ // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness.
+ if (j != -1) {
+ double t = throughputs[j];
+ if (k != -1) throughputs[j] = (2 * t + throughputs[i] + s) / 4;
+ s = t;
+ }
+ k = j;
+ j = i;
}
- long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase.
+ long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase.
+ // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration.
+ if (choice == j && choice + 1 < throughputs.length)
+ target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length));
targetInflight.set(max(minInflight, min(maxInflight, target)));
}
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
index 9010b0a7ad8..f0ee434e87c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
@@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler {
public StaticThrottler(FeedClientBuilderImpl builder) {
minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size();
- maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side.
+ maxInflight = 256 * minInflight; // 512 max streams per connection on the server side.
targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java
new file mode 100644
index 00000000000..7e07fc6e116
--- /dev/null
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java
@@ -0,0 +1,30 @@
+package ai.vespa.feed.client.impl;
+
+import org.junit.jupiter.api.Test;
+
+import java.net.URI;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * @author jonmv
+ */
+class DynamicThrottlerTest {
+
+ @Test
+ void testThrottler() {
+ DynamicThrottler throttler = new DynamicThrottler(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:8080"))));
+ assertEquals(16, throttler.targetInflight());
+
+ for (int i = 0; i < 30; i++) {
+ throttler.sent(1, null);
+ throttler.success();
+ }
+ assertEquals(18, throttler.targetInflight());
+
+ throttler.throttled(34);
+ assertEquals(17, throttler.targetInflight());
+ }
+
+}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index 54fab9b859b..b1a04ac9ed4 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -33,6 +33,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -105,7 +106,7 @@ class HttpRequestStrategyTest {
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
- assertTrue(expected.getCause() instanceof FeedException);
+ assertInstanceOf(FeedException.class, expected.getCause());
assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
@@ -200,7 +201,7 @@ class HttpRequestStrategyTest {
@Override public int retries() { return 1; }
})
.setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1),
+ .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");