summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorjonmv <venstad@gmail.com>2024-05-13 11:21:39 +0200
committerjonmv <venstad@gmail.com>2024-05-13 11:21:39 +0200
commitf951681e3e1076606da092437a32c92454533e14 (patch)
tree4333d09e872ed11cf2dfd7d5a08ab6ebe219ff34
parent532efd915a6604bd4a48de0de097f15cc24d4594 (diff)
Revert "Merge pull request #31177 from vespa-engine/revert-31158-jonmv/start-lower-when-feeding"
This reverts commit dc1d2982e21098e0dee252f229762960259382ac, reversing changes made to 1073327b4b38213311e992f2c2353a376d3404de.
-rw-r--r--client/go/internal/vespa/document/throttler.go8
-rw-r--r--client/go/internal/vespa/document/throttler_test.go12
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java5
6 files changed, 18 insertions, 17 deletions
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index 39900156563..0400ba1a150 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -37,8 +37,8 @@ type dynamicThrottler struct {
func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
var (
- minInflight = 16 * int64(connections)
- maxInflight = 256 * minInflight // 4096 max streams per connection on the server side
+ minInflight = 2 * int64(connections)
+ maxInflight = 256 * minInflight // 512 max streams per connection on the server side
)
t := &dynamicThrottler{
minInflight: minInflight,
@@ -49,7 +49,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
start: nowFunc(),
now: nowFunc,
}
- t.targetInflight.Store(8 * minInflight)
+ t.targetInflight.Store(minInflight)
t.targetTimesTen.Store(10 * maxInflight)
return t
}
@@ -57,7 +57,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func (t *dynamicThrottler) Sent() {
- currentInflight := t.targetInflight.Load()
+ currentInflight := t.TargetInflight()
t.sent++
if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
return
diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go
index 03f0bc75bdc..410e518e596 100644
--- a/client/go/internal/vespa/document/throttler_test.go
+++ b/client/go/internal/vespa/document/throttler_test.go
@@ -9,14 +9,14 @@ import (
func TestThrottler(t *testing.T) {
clock := &manualClock{tick: time.Second}
tr := newThrottler(8, clock.now)
- for i := 0; i < 100; i++ {
- tr.Sent()
- }
- if got, want := tr.TargetInflight(), int64(1024); got != want {
+ if got, want := tr.TargetInflight(), int64(16); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
- tr.Throttled(5)
- if got, want := tr.TargetInflight(), int64(128); got != want {
+ for tr.TargetInflight() < int64(18) {
+ tr.Sent()
+ }
+ tr.Throttled(34)
+ if got, want := tr.TargetInflight(), int64(17); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 97f681404e9..1a42b688437 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -167,10 +167,10 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
/**
- * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*.
+ * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this go *boom*.
* 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this.
*/
- // Original javadoc is non-sense, but kept for historical reasons.
+ // Original javadoc is nonsense, but kept for historical reasons.
/*
* Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is
* the correlation between throughput and window size. The algorithm will increase the window size until efficiency
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index 951a1776b6f..81e972fe45d 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler {
public DynamicThrottler(FeedClientBuilderImpl builder) {
super(builder);
- targetInflight = new AtomicLong(8 * minInflight);
+ targetInflight = new AtomicLong(minInflight);
}
@Override
public void sent(long __, CompletableFuture<HttpResponse> ___) {
- double currentInflight = targetInflight.get();
+ double currentInflight = targetInflight();
if (++sent * sent * sent < 1e2 * currentInflight * currentInflight)
return;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
index 9010b0a7ad8..f0ee434e87c 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
@@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler {
public StaticThrottler(FeedClientBuilderImpl builder) {
minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size();
- maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side.
+ maxInflight = 256 * minInflight; // 512 max streams per connection on the server side.
targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index 54fab9b859b..b1a04ac9ed4 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -33,6 +33,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -105,7 +106,7 @@ class HttpRequestStrategyTest {
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
- assertTrue(expected.getCause() instanceof FeedException);
+ assertInstanceOf(FeedException.class, expected.getCause());
assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
@@ -200,7 +201,7 @@ class HttpRequestStrategyTest {
@Override public int retries() { return 1; }
})
.setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(1),
+ .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");