diff options
author | Valerij Fredriksen <freva@users.noreply.github.com> | 2024-05-13 09:44:07 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-13 09:44:07 +0200 |
commit | dc1d2982e21098e0dee252f229762960259382ac (patch) | |
tree | 363a52263e8ce9145f2ed8920bce8c3e95d7943b | |
parent | 1073327b4b38213311e992f2c2353a376d3404de (diff) | |
parent | c02438c42e042090bbe6538e4f5a7f2501fa2e4f (diff) |
Merge pull request #31177 from vespa-engine/revert-31158-jonmv/start-lower-when-feeding
Revert "Reduce min inflight for go feeder, and start lower in both go and java"
6 files changed, 17 insertions, 18 deletions
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 0400ba1a150..39900156563 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -37,8 +37,8 @@ type dynamicThrottler struct { func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { var ( - minInflight = 2 * int64(connections) - maxInflight = 256 * minInflight // 512 max streams per connection on the server side + minInflight = 16 * int64(connections) + maxInflight = 256 * minInflight // 4096 max streams per connection on the server side ) t := &dynamicThrottler{ minInflight: minInflight, @@ -49,7 +49,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { start: nowFunc(), now: nowFunc, } - t.targetInflight.Store(minInflight) + t.targetInflight.Store(8 * minInflight) t.targetTimesTen.Store(10 * maxInflight) return t } @@ -57,7 +57,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := t.TargetInflight() + currentInflight := t.targetInflight.Load() t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go index 410e518e596..03f0bc75bdc 100644 --- a/client/go/internal/vespa/document/throttler_test.go +++ b/client/go/internal/vespa/document/throttler_test.go @@ -9,14 +9,14 @@ import ( func TestThrottler(t *testing.T) { clock := &manualClock{tick: time.Second} tr := newThrottler(8, clock.now) - if got, want := tr.TargetInflight(), int64(16); got != want { - t.Errorf("got TargetInflight() = %d, but want %d", got, want) - } - for tr.TargetInflight() < int64(18) { + for i := 0; i < 100; i++ { tr.Sent() } - tr.Throttled(34) - if got, want := tr.TargetInflight(), int64(17); got != want { + if got, want := tr.TargetInflight(), int64(1024); got != want { + t.Errorf("got TargetInflight() = %d, but want %d", got, want) + } + tr.Throttled(5) + if got, want := tr.TargetInflight(), int64(128); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 1a42b688437..97f681404e9 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -167,10 +167,10 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** - * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this go *boom*. + * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*. * 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this. */ - // Original javadoc is nonsense, but kept for historical reasons. + // Original javadoc is non-sense, but kept for historical reasons. /* * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is * the correlation between throughput and window size. The algorithm will increase the window size until efficiency diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index 81e972fe45d..951a1776b6f 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler { public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); - targetInflight = new AtomicLong(minInflight); + targetInflight = new AtomicLong(8 * minInflight); } @Override public void sent(long __, CompletableFuture<HttpResponse> ___) { - double currentInflight = targetInflight(); + double currentInflight = targetInflight.get(); if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) return; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index f0ee434e87c..9010b0a7ad8 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler { public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size(); - maxInflight = 256 * minInflight; // 512 max streams per connection on the server side. + maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index b1a04ac9ed4..54fab9b859b 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -33,7 +33,6 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -106,7 +105,7 @@ class HttpRequestStrategyTest { cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertInstanceOf(FeedException.class, expected.getCause()); + assertTrue(expected.getCause() instanceof FeedException); assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); @@ -201,7 +200,7 @@ class HttpRequestStrategyTest { @Override public int retries() { return 1; } }) .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. + .setConnectionsPerEndpoint(1), cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); |