diff options
10 files changed, 109 insertions, 30 deletions
diff --git a/client/README.md b/client/README.md index f51db87d631..e730614230f 100644 --- a/client/README.md +++ b/client/README.md @@ -39,7 +39,7 @@ This is a [work-in-progress javascript app](js/app) for querying a Vespa applica <!-- ToDo: move this / demote this somehow --> -### vespa_query_dsl +### vespa\_query\_dsl This lib is used for composing Vespa [YQL queries](https://docs.vespa.ai/en/reference/query-language-reference.html). diff --git a/client/go/internal/vespa/document/throttler.go b/client/go/internal/vespa/document/throttler.go index 39900156563..3eb0ccd17f6 100644 --- a/client/go/internal/vespa/document/throttler.go +++ b/client/go/internal/vespa/document/throttler.go @@ -37,8 +37,8 @@ type dynamicThrottler struct { func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { var ( - minInflight = 16 * int64(connections) - maxInflight = 256 * minInflight // 4096 max streams per connection on the server side + minInflight = 2 * int64(connections) + maxInflight = 256 * minInflight // 512 max streams per connection on the server side ) t := &dynamicThrottler{ minInflight: minInflight, @@ -49,7 +49,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { start: nowFunc(), now: nowFunc, } - t.targetInflight.Store(8 * minInflight) + t.targetInflight.Store(minInflight) t.targetTimesTen.Store(10 * maxInflight) return t } @@ -57,7 +57,7 @@ func newThrottler(connections int, nowFunc func() time.Time) *dynamicThrottler { func NewThrottler(connections int) Throttler { return newThrottler(connections, time.Now) } func (t *dynamicThrottler) Sent() { - currentInflight := t.targetInflight.Load() + currentInflight := t.TargetInflight() t.sent++ if t.sent*t.sent*t.sent < 100*currentInflight*currentInflight { return @@ -73,8 +73,12 @@ func (t *dynamicThrottler) Sent() { t.throughputs[index] = currentThroughput // Loop over throughput measurements and pick the one which optimises throughput and latency. - choice := float64(currentInflight) + best := float64(currentInflight) maxObjective := float64(-1) + choice := 0 + j := -1 + k := -1 + s := 0.0 for i := len(t.throughputs) - 1; i >= 0; i-- { if t.throughputs[i] == 0 { continue // Skip unknown values @@ -83,10 +87,25 @@ func (t *dynamicThrottler) Sent() { objective := t.throughputs[i] * math.Pow(inflight, throttlerWeight-1) // Optimise throughput (weight), but also latency (1 - weight) if objective > maxObjective { maxObjective = objective - choice = inflight + best = inflight + choice = i } + // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness + if j != -1 { + u := t.throughputs[j] + if k != -1 { + t.throughputs[j] = (2*u + t.throughputs[i] + s) / 4 + } + s = u + } + k = j + j = i + } + target := int64((rand.Float64()*0.40+0.84)*best + rand.Float64()*4 - 1) // Random walk, skewed towards increase + // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration + if choice == j && choice+1 < len(t.throughputs) { + target = int64(1 + float64(t.minInflight)*math.Pow(256, (float64(choice)+1.5)/float64(len(t.throughputs)))) } - target := int64((rand.Float64()*0.20 + 0.92) * choice) // Random walk, skewed towards increase t.targetInflight.Store(max(t.minInflight, min(t.maxInflight, target))) } diff --git a/client/go/internal/vespa/document/throttler_test.go b/client/go/internal/vespa/document/throttler_test.go index 03f0bc75bdc..b386e0d5105 100644 --- a/client/go/internal/vespa/document/throttler_test.go +++ b/client/go/internal/vespa/document/throttler_test.go @@ -9,14 +9,19 @@ import ( func TestThrottler(t *testing.T) { clock := &manualClock{tick: time.Second} tr := newThrottler(8, clock.now) - for i := 0; i < 100; i++ { + + if got, want := tr.TargetInflight(), int64(16); got != want { + t.Errorf("got TargetInflight() = %d, but want %d", got, want) + } + for i := 0; i < 30; i++ { tr.Sent() + tr.Success() } - if got, want := tr.TargetInflight(), int64(1024); got != want { + if got, want := tr.TargetInflight(), int64(18); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } - tr.Throttled(5) - if got, want := tr.TargetInflight(), int64(128); got != want { + tr.Throttled(34) + if got, want := tr.TargetInflight(), int64(17); got != want { t.Errorf("got TargetInflight() = %d, but want %d", got, want) } } diff --git a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java index 97f681404e9..1a42b688437 100644 --- a/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java +++ b/messagebus/src/main/java/com/yahoo/messagebus/DynamicThrottlePolicy.java @@ -167,10 +167,10 @@ public class DynamicThrottlePolicy extends StaticThrottlePolicy { } /** - * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this to *boom*. + * Determines where on each latency level the attractor sits. 2 is at the very end, and makes this go *boom*. * 0.2 is at the very start, and makes the algorithm more conservative. Probably fine to stay away from this. */ - // Original javadoc is non-sense, but kept for historical reasons. + // Original javadoc is nonsense, but kept for historical reasons. /* * Sets the lower efficiency threshold at which the algorithm should perform window size back off. Efficiency is * the correlation between throughput and window size. The algorithm will increase the window size until efficiency diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java index 951a1776b6f..567788b8501 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/DynamicThrottler.java @@ -28,12 +28,12 @@ public class DynamicThrottler extends StaticThrottler { public DynamicThrottler(FeedClientBuilderImpl builder) { super(builder); - targetInflight = new AtomicLong(8 * minInflight); + targetInflight = new AtomicLong(minInflight); } @Override public void sent(long __, CompletableFuture<HttpResponse> ___) { - double currentInflight = targetInflight.get(); + double currentInflight = targetInflight(); if (++sent * sent * sent < 1e2 * currentInflight * currentInflight) return; @@ -43,22 +43,36 @@ public class DynamicThrottler extends StaticThrottler { // Use buckets for throughput over inflight, along the log-scale, in [minInflight, maxInflight). int index = (int) (throughputs.length * log(max(1, min(255, currentInflight / minInflight))) - / log(256)); // 4096 (server max streams per connection) / 16 (our min per connection) + / log(256)); // 512 (server max streams per connection) / 2 (our min per connection) throughputs[index] = currentThroughput; // Loop over throughput measurements and pick the one which optimises throughput and latency. - double choice = currentInflight; + double best = currentInflight; double max = -1; - for (int i = throughputs.length; i-- > 0; ) { + int j = -1, k = -1, choice = 0; + double s = 0; + for (int i = 0; i < throughputs.length; i++) { if (throughputs[i] == 0) continue; // Skip unknown values. double inflight = minInflight * pow(256, (i + 0.5) / throughputs.length); double objective = throughputs[i] * pow(inflight, (weight - 1)); // Optimise throughput (weight), but also latency (1 - weight). if (objective > max) { max = objective; - choice = inflight; + best = inflight; + choice = i; } + // Additionally, smooth the throughput values, to reduce the impact of noise, and reduce jumpiness. + if (j != -1) { + double t = throughputs[j]; + if (k != -1) throughputs[j] = (2 * t + throughputs[i] + s) / 4; + s = t; + } + k = j; + j = i; } - long target = (long) ((random() * 0.20 + 0.92) * choice); // Random walk, skewed towards increase. + long target = (long) ((random() * 0.40 + 0.84) * best + random() * 4 - 1); // Random step, skewed towards increase. + // If the best inflight is at the high end of the known, we override the random walk to speed up upwards exploration. + if (choice == j && choice + 1 < throughputs.length) + target = (long) (1 + minInflight * pow(256, (choice + 1.5) / throughputs.length)); targetInflight.set(max(minInflight, min(maxInflight, target))); } diff --git a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java index 9010b0a7ad8..f0ee434e87c 100644 --- a/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java +++ b/vespa-feed-client/src/main/java/ai/vespa/feed/client/impl/StaticThrottler.java @@ -22,7 +22,7 @@ public class StaticThrottler implements Throttler { public StaticThrottler(FeedClientBuilderImpl builder) { minInflight = 2L * builder.connectionsPerEndpoint * builder.endpoints.size(); - maxInflight = 256 * minInflight; // 4096 max streams per connection on the server side. + maxInflight = 256 * minInflight; // 512 max streams per connection on the server side. targetX10 = new AtomicLong(10 * maxInflight); // 10x the actual value to allow for smaller updates. } diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java new file mode 100644 index 00000000000..7e07fc6e116 --- /dev/null +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/DynamicThrottlerTest.java @@ -0,0 +1,30 @@ +package ai.vespa.feed.client.impl; + +import org.junit.jupiter.api.Test; + +import java.net.URI; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * @author jonmv + */ +class DynamicThrottlerTest { + + @Test + void testThrottler() { + DynamicThrottler throttler = new DynamicThrottler(new FeedClientBuilderImpl(List.of(URI.create("http://localhost:8080")))); + assertEquals(16, throttler.targetInflight()); + + for (int i = 0; i < 30; i++) { + throttler.sent(1, null); + throttler.success(); + } + assertEquals(18, throttler.targetInflight()); + + throttler.throttled(34); + assertEquals(17, throttler.targetInflight()); + } + +} diff --git a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java index 54fab9b859b..b1a04ac9ed4 100644 --- a/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java +++ b/vespa-feed-client/src/test/java/ai/vespa/feed/client/impl/HttpRequestStrategyTest.java @@ -33,6 +33,7 @@ import static ai.vespa.feed.client.FeedClient.CircuitBreaker.State.OPEN; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -105,7 +106,7 @@ class HttpRequestStrategyTest { cluster.expect((__, vessel) -> vessel.completeExceptionally(new RuntimeException("boom"))); ExecutionException expected = assertThrows(ExecutionException.class, () -> strategy.enqueue(id1, request).get()); - assertTrue(expected.getCause() instanceof FeedException); + assertInstanceOf(FeedException.class, expected.getCause()); assertEquals("java.lang.RuntimeException: boom", expected.getCause().getMessage()); assertEquals(1, strategy.stats().requests()); @@ -200,7 +201,7 @@ class HttpRequestStrategyTest { @Override public int retries() { return 1; } }) .setCircuitBreaker(breaker) - .setConnectionsPerEndpoint(1), + .setConnectionsPerEndpoint(3), // Must be >= 0.5x text ops. cluster); DocumentId id1 = DocumentId.of("ns", "type", "1"); diff --git a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java index b483d6977d6..6e07661235e 100644 --- a/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java +++ b/vespaclient-container-plugin/src/main/java/com/yahoo/document/restapi/resource/DocumentV1ApiHandler.java @@ -1397,6 +1397,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { Phaser phaser = new Phaser(2); // Synchronize this thread (dispatch) with the visitor callback thread. AtomicReference<String> error = new AtomicReference<>(); // Set if error occurs during processing of visited documents. callback.onStart(response, fullyApplied); + final AtomicLong locallyReceivedDocCount = new AtomicLong(0); VisitorControlHandler controller = new VisitorControlHandler() { final ScheduledFuture<?> abort = streaming ? visitDispatcher.schedule(this::abort, visitTimeout(request), MILLISECONDS) : null; final AtomicReference<VisitorSession> session = new AtomicReference<>(); @@ -1410,7 +1411,10 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { try (response) { callback.onEnd(response); - response.writeDocumentCount(getVisitorStatistics() == null ? 0 : getVisitorStatistics().getDocumentsVisited()); + // Locally tracked document count is only correct if we have a local data handler. + // Otherwise, we have to report the statistics received transitively from the content nodes. + long statsDocCount = (getVisitorStatistics() != null ? getVisitorStatistics().getDocumentsVisited() : 0); + response.writeDocumentCount(parameters.getLocalDataHandler() != null ? locallyReceivedDocCount.get() : statsDocCount); if (session.get() != null) response.writeTrace(session.get().getTrace()); @@ -1456,6 +1460,7 @@ public class DocumentV1ApiHandler extends AbstractRequestHandler { if (m instanceof PutDocumentMessage put) document = put.getDocumentPut().getDocument(); else if (parameters.visitRemoves() && m instanceof RemoveDocumentMessage remove) removeId = remove.getDocumentId(); else throw new UnsupportedOperationException("Got unsupported message type: " + m.getClass().getName()); + locallyReceivedDocCount.getAndAdd(1); callback.onDocument(response, document, removeId, diff --git a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java index 58cf34712aa..b2c0b1b2ce8 100644 --- a/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java +++ b/vespaclient-container-plugin/src/test/java/com/yahoo/document/restapi/resource/DocumentV1ApiTest.java @@ -290,7 +290,7 @@ public class DocumentV1ApiTest { parameters.getLocalDataHandler().onMessage(new RemoveDocumentMessage(new DocumentId("id:space:music::t-square-truth")), tokens.get(3)); VisitorStatistics statistics = new VisitorStatistics(); statistics.setBucketsVisited(1); - statistics.setDocumentsVisited(3); + statistics.setDocumentsVisited(123); // Ignored in favor of tracking actually emitted entries parameters.getControlHandler().onVisitorStatistics(statistics); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.TIMEOUT, "timeout is OK"); }); @@ -323,7 +323,7 @@ public class DocumentV1ApiTest { "remove": "id:space:music::t-square-truth" } ], - "documentCount": 3, + "documentCount": 4, "trace": [ { "message": "Tracy Chapman" }, { @@ -441,13 +441,18 @@ public class DocumentV1ApiTest { assertEquals("[Content:cluster=content]", parameters.getRemoteDataHandler()); assertEquals("[document]", parameters.fieldSet()); assertEquals(60_000L, parameters.getSessionTimeoutMs()); + VisitorStatistics statistics = new VisitorStatistics(); + statistics.setBucketsVisited(1); + statistics.setDocumentsVisited(2); + // Visiting with remote data handlers should report the remotely aggregated statistics + parameters.getControlHandler().onVisitorStatistics(statistics); parameters.getControlHandler().onDone(VisitorControlHandler.CompletionCode.SUCCESS, "We made it!"); }); response = driver.sendRequest("http://localhost/document/v1/space/music/docid?destinationCluster=content&selection=true&cluster=content&timeout=60", POST); assertSameJson(""" { "pathId": "/document/v1/space/music/docid", - "documentCount": 0 + "documentCount": 2 }""", response.readAll()); assertEquals(200, response.getStatus()); @@ -488,7 +493,7 @@ public class DocumentV1ApiTest { assertSameJson(""" { "pathId": "/document/v1/space/music/docid", - "documentCount": 0 + "documentCount": 1 }""", response.readAll()); assertEquals(200, response.getStatus()); @@ -542,7 +547,7 @@ public class DocumentV1ApiTest { assertSameJson(""" { "pathId": "/document/v1/space/music/docid", - "documentCount": 0, + "documentCount": 1, "message": "boom" }""", response.readAll()); |