From f951681e3e1076606da092437a32c92454533e14 Mon Sep 17 00:00:00 2001 From: jonmv Date: Mon, 13 May 2024 11:21:39 +0200 Subject: Revert "Merge pull request #31177 from vespa-engine/revert-31158-jonmv/start-lower-when-feeding" This reverts commit dc1d2982e21098e0dee252f229762960259382ac, reversing changes made to 1073327b4b38213311e992f2c2353a376d3404de. --- client/go/internal/vespa/document/throttler.go | 8 ++++---- client/go/internal/vespa/document/throttler_test.go | 12 ++++++------ .../java/com/yahoo/messagebus/DynamicThrottlePolicy.java | 4 ++-- .../java/ai/vespa/feed/client/impl/DynamicThrottler.java | 4 ++-- .../main/java/ai/vespa/feed/client/impl/StaticThrottler.java | 2 +- .../ai/vespa/feed/client/impl/HttpRequestStrategyTest.java | 5 +++-- 6 files changed, 18 insertions(+), 17 deletions(-) diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 39900156563..0400ba1a150 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -37,8 +37,8 @@ type dynamicThrottler struct { func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { var ( - minInflight = 16 * int64(connections) - maxInflight = 256 * minInflight // 4096 max streams per connection on the server side + minInflight = 2 * int64(connections) + maxInflight = 256 * minInflight // 512 max streams per connection on the server side ) t := &dynamicThrottler{ minInflight: minInflight, @@ -49,7 +49,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { start: nowFunc(), now: nowFunc, } - t.targetInflight.Store(8 * minInflight) + t.targetInflight.Store(minInflight) t.targetTimesTen.Store(10 * maxInflight) return t } @@ -57,7 +57,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := t.targetInflight.Load() + currentInflight := t.TargetInflight() t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go index 03f0bc75bdc..410e518e596 100644 --- a/client/go/internal/vespa/document/throttler_test.go +++ b/client/go/internal/vespa/document/throttler_test.go @@ -9,14 +9,14 @@ import ( func TestThrottler(t *testing.T) { clock := &manualClock{tick: time.Second} tr := newThrottler(8, clock.now) - for i := 0; i < 100; i++ { - tr.Sent() - } - if got, want := tr.TargetInflight(), int64(1024); got != want { + if got, want := tr.TargetInflight(), int64(16); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } - tr.Throttled(5) - if got, want := tr.TargetInflight(), int64(128); got != want { + for tr.TargetInflight() < int64(18) { + tr.Sent() + } + tr.Throttled(34) + if got, want := tr.TargetInflight(), int64(17); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 97f681404e9..1a42b688437 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -167,10 +167,10 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** - * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*. + * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this go *boom*. * 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this. */ - // Original javadoc is non-sense, but kept for historical reasons. + // Original javadoc is nonsense, but kept for historical reasons. /* * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is * the correlation between throughput and window size. The algorithm will increase the window size until efficiency diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index 951a1776b6f..81e972fe45d 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler { public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); - targetInflight = new AtomicLong(8 * minInflight); + targetInflight = new AtomicLong(minInflight); } @Override public void sent(long __, CompletableFuture ___) { - double currentInflight = targetInflight.get(); + double currentInflight = targetInflight(); if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) return; diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 9010b0a7ad8..f0ee434e87c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler { public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size(); - maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. + maxInflight = 256 * minInflight; // 512 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 54fab9b859b..b1a04ac9ed4 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -33,6 +33,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -105,7 +106,7 @@ class HttpRequestStrategyTest { cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertTrue(expected.getCause() instanceof FeedException); + assertInstanceOf(FeedException.class, expected.getCause()); assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); @@ -200,7 +201,7 @@ class HttpRequestStrategyTest { @Override public int retries() { return 1; } }) .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1), + .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); -- cgit v1.2.3