aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorValerij Fredriksen <freva@users.noreply.github.com>2024-05-13 09:44:07 +0200
committerGitHub <noreply@github.com>2024-05-13 09:44:07 +0200
commitdc1d2982e21098e0dee252f229762960259382ac (patch)
tree363a52263e8ce9145f2ed8920bce8c3e95d7943b
parent1073327b4b38213311e992f2c2353a376d3404de (diff)
parentc02438c42e042090bbe6538e4f5a7f2501fa2e4f (diff)
Merge pull request #31177 from vespa-engine/revert-31158-jonmv/start-lower-when-feeding
Revert "Reduce min inflight for go feeder, and start lower in both go and java"
-rw-r--r--client/go/internal/vespa/document/throttler.go8
-rw-r--r--client/go/internal/vespa/document/throttler_test.go12
-rw-r--r--messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java4
-rw-r--r--vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java2
-rw-r--r--vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java5
6 files changed, 17 insertions, 18 deletions
diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go
index 0400ba1a150..39900156563 100644
--- a/client/go/internal/vespa/document/throttler.go
+++ b/client/go/internal/vespa/document/throttler.go
@@ -37,8 +37,8 @@ type dynamicThrottler struct {
func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
var (
- minInflight = 2 * int64(connections)
- maxInflight = 256 * minInflight // 512 max streams per connection on the server side
+ minInflight = 16 * int64(connections)
+ maxInflight = 256 * minInflight // 4096 max streams per connection on the server side
)
t := &dynamicThrottler{
minInflight: minInflight,
@@ -49,7 +49,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
start: nowFunc(),
now: nowFunc,
}
- t.targetInflight.Store(minInflight)
+ t.targetInflight.Store(8 * minInflight)
t.targetTimesTen.Store(10 * maxInflight)
return t
}
@@ -57,7 +57,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler {
func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) }
func (t *dynamicThrottler) Sent() {
- currentInflight := t.TargetInflight()
+ currentInflight := t.targetInflight.Load()
t.sent++
if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight {
return
diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go
index 410e518e596..03f0bc75bdc 100644
--- a/client/go/internal/vespa/document/throttler_test.go
+++ b/client/go/internal/vespa/document/throttler_test.go
@@ -9,14 +9,14 @@ import (
func TestThrottler(t *testing.T) {
clock := &manualClock{tick: time.Second}
tr := newThrottler(8, clock.now)
- if got, want := tr.TargetInflight(), int64(16); got != want {
- t.Errorf("got TargetInflight() = %d, but want %d", got, want)
- }
- for tr.TargetInflight() < int64(18) {
+ for i := 0; i < 100; i++ {
tr.Sent()
}
- tr.Throttled(34)
- if got, want := tr.TargetInflight(), int64(17); got != want {
+ if got, want := tr.TargetInflight(), int64(1024); got != want {
+ t.Errorf("got TargetInflight() = %d, but want %d", got, want)
+ }
+ tr.Throttled(5)
+ if got, want := tr.TargetInflight(), int64(128); got != want {
t.Errorf("got TargetInflight() = %d, but want %d", got, want)
}
}
diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
index 1a42b688437..97f681404e9 100644
--- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
+++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java
@@ -167,10 +167,10 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy {
}
/**
- * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this go *boom*.
+ * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*.
* 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this.
*/
- // Original javadoc is nonsense, but kept for historical reasons.
+ // Original javadoc is non-sense, but kept for historical reasons.
/*
* Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is
* the correlation between throughput and window size. The algorithm will increase the window size until efficiency
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
index 81e972fe45d..951a1776b6f 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java
@@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler {
public DynamicThrottler(FeedClientBuilderImpl builder) {
super(builder);
- targetInflight = new AtomicLong(minInflight);
+ targetInflight = new AtomicLong(8 * minInflight);
}
@Override
public void sent(long __, CompletableFuture<HttpResponse> ___) {
- double currentInflight = targetInflight();
+ double currentInflight = targetInflight.get();
if (++sent * sent * sent < 1e2 * currentInflight * currentInflight)
return;
diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
index f0ee434e87c..9010b0a7ad8 100644
--- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
+++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java
@@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler {
public StaticThrottler(FeedClientBuilderImpl builder) {
minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size();
- maxInflight = 256 * minInflight; // 512 max streams per connection on the server side.
+ maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side.
targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates.
}
diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
index b1a04ac9ed4..54fab9b859b 100644
--- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
+++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java
@@ -33,7 +33,6 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -106,7 +105,7 @@ class HttpRequestStrategyTest {
cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom")));
ExecutionException expected = assertThrows(ExecutionException.class,
() -> strategy.enqueue(id1, request).get());
- assertInstanceOf(FeedException.class, expected.getCause());
+ assertTrue(expected.getCause() instanceof FeedException);
assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage());
assertEquals(1, strategy.stats().requests());
@@ -201,7 +200,7 @@ class HttpRequestStrategyTest {
@Override public int retries() { return 1; }
})
.setCircuitBreaker(breaker)
- .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops.
+ .setConnectionsPerEndpoint(1),
cluster);
DocumentId id1 = DocumentId.of("ns", "type", "1");