diff options
author | Jon Bratseth <bratseth@vespa.ai> | 2023-11-29 16:35:56 +0100 |
---|---|---|
committer | Jon Bratseth <bratseth@vespa.ai> | 2023-11-29 16:35:56 +0100 |
commit | d7c3de1fd9600c67fdfe620aa2ee675109ec1481 (patch) | |
tree | 0138d4248fd0586f754853c1332280f33ffca159 /container-search | |
parent | 23f832d370909d3b00cc249ceeb59460d545dd5a (diff) |
Support node-local rates
Diffstat (limited to 'container-search')
3 files changed, 47 insertions, 19 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/searchers/RateLimitingSearcher.java b/container-search/src/main/java/com/yahoo/search/searchers/RateLimitingSearcher.java index 104db4ea731..3a65990a6e4 100755 --- a/container-search/src/main/java/com/yahoo/search/searchers/RateLimitingSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/searchers/RateLimitingSearcher.java @@ -31,9 +31,12 @@ import java.util.concurrent.ThreadLocalRandom; * <li>rate.id - (String) the id of the client from rate limiting perspective * <li>rate.cost - (Double) the cost Double of this query. This is read after executing the query and hence can be set * by downstream searchers inspecting the result to allow differencing the cost of various queries. Default is 1. - * <li>rate.quota - (Double) the cost per second a particular id is allowed to consume in this system. + * <li>rate.quota - (Double) the cost per second a particular id is allowed to consume. By default this is across + * all nodes of the cluster (i.e, this is invariant with cluster size), set the config variable + * localRate to true to make this be rate per node. * <li>rate.idDimension - (String) the name of the rate-id dimension used when logging metrics. * If this is not specified, the metric will be logged without dimensions. + * <li>rate.local - (Boolean) set to true to let the quota be per node instead of across the cluster. * <li>rate.dryRun - (Boolean) emit metrics on rejected requests but don't actually reject them * </ul> * <p> @@ -69,6 +72,8 @@ public class RateLimitingSearcher extends Searcher { /** Shared capacity across all threads. Each thread will ask for more capacity from here when they run out. */ private final AvailableCapacity availableCapacity; + private final boolean localRate; + /** Capacity already allocated to this thread */ private final ThreadLocal<Map<String, Double>> allocatedCapacity = new ThreadLocal<>(); @@ -90,10 +95,14 @@ public class RateLimitingSearcher extends Searcher { } /** For testing - allows injection of a timer to avoid depending on the system clock */ - public RateLimitingSearcher(RateLimitingConfig rateLimitingConfig, ClusterInfoConfig clusterInfoConfig, MetricReceiver metric, Clock clock) { + public RateLimitingSearcher(RateLimitingConfig rateLimitingConfig, + ClusterInfoConfig clusterInfoConfig, + MetricReceiver metric, + Clock clock) { this.capacityIncrement = rateLimitingConfig.capacityIncrement(); this.recheckForCapacityProbability = rateLimitingConfig.recheckForCapacityProbability(); this.availableCapacity = new AvailableCapacity(rateLimitingConfig.maxAvailableCapacity(), clock); + this.localRate = rateLimitingConfig.localRate(); this.nodeCount = clusterInfoConfig.nodeCount(); @@ -109,7 +118,8 @@ public class RateLimitingSearcher extends Searcher { return execution.search(query); } - rate = rate / nodeCount; + if ( ! localRate) + rate = rate / nodeCount; if (allocatedCapacity.get() == null) // new thread allocatedCapacity.set(new HashMap<>()); @@ -122,7 +132,7 @@ public class RateLimitingSearcher extends Searcher { requestCapacity(id, rate); } - if (rate==0 || getAllocatedCapacity(id) <= 0) { // we are still over rate: reject + if (rate == 0 || getAllocatedCapacity(id) <= 0) { // we are still over rate: reject String idDim = query.properties().getString(idDimensionKey, null); if (idDim == null) { overQuotaCounter.add(1); diff --git a/container-search/src/main/resources/configdefinitions/search.config.rate-limiting.def b/container-search/src/main/resources/configdefinitions/search.config.rate-limiting.def index e5d2f5f9ed4..81ef06868ce 100644 --- a/container-search/src/main/resources/configdefinitions/search.config.rate-limiting.def +++ b/container-search/src/main/resources/configdefinitions/search.config.rate-limiting.def @@ -17,3 +17,6 @@ maxAvailableCapacity double default=10000 # A good number may be 1 / (maxAvailableCapacity * average-cost) recheckForCapacityProbability double default=0.001 +# Set to true to interpret the rate.quota given in the query as a node-local value +# instead of a cluster-wide value. +localRate bool default=false diff --git a/container-search/src/test/java/com/yahoo/search/searchers/test/RateLimitingSearcherTestCase.java b/container-search/src/test/java/com/yahoo/search/searchers/test/RateLimitingSearcherTestCase.java index 22b8d8f4d76..1682bd37fb6 100755 --- a/container-search/src/test/java/com/yahoo/search/searchers/test/RateLimitingSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/searchers/test/RateLimitingSearcherTestCase.java @@ -31,23 +31,10 @@ public class RateLimitingSearcherTestCase { @Test void testRateLimiting() { - RateLimitingConfig.Builder rateLimitingConfig = new RateLimitingConfig.Builder(); - rateLimitingConfig.maxAvailableCapacity(4); - rateLimitingConfig.capacityIncrement(2); - rateLimitingConfig.recheckForCapacityProbability(1.0); - - ClusterInfoConfig.Builder clusterInfoConfig = new ClusterInfoConfig.Builder(); - clusterInfoConfig.clusterId("testCluster"); - clusterInfoConfig.nodeCount(4); - ManualClock clock = new ManualClock(); MetricReceiver.MockReceiver metric = new MetricReceiver.MockReceiver(); - - Chain<Searcher> chain = new Chain<>("test", new RateLimitingSearcher(new RateLimitingConfig(rateLimitingConfig), - new ClusterInfoConfig(clusterInfoConfig), - metric, clock), - new CostSettingSearcher()); - assertEquals(2, tryRequests(chain, "id1"), "'rate' request are available initially"); + var chain = createChain(false, clock, metric); + assertEquals(2, tryRequests(chain, "id1"), "'rate/nodes' request are available initially"); assertTrue(executeWasAllowed(chain, "id1", true), "However, don't reject if we dryRun"); clock.advance(Duration.ofMillis(1500)); // causes 2 new requests to become available assertEquals(2, tryRequests(chain, "id1"), "'rate' new requests became available"); @@ -76,6 +63,34 @@ public class RateLimitingSearcherTestCase { assertEquals(requestsToTry - 2 + requestsToTry - 4, map.get(metric.point("id", "id2")).getCount()); } + @Test + void testLocalRateLimiting() { + ManualClock clock = new ManualClock(); + MetricReceiver.MockReceiver metric = new MetricReceiver.MockReceiver(); + var chain = createChain(true, clock, metric); + + assertEquals(9, tryRequests(chain, "id1"), "'rate' request are available initially"); + } + + private Chain<Searcher> createChain(boolean localRate, ManualClock clock, MetricReceiver.MockReceiver metric) { + RateLimitingConfig.Builder rateLimitingConfig = new RateLimitingConfig.Builder(); + rateLimitingConfig.maxAvailableCapacity(4); + rateLimitingConfig.capacityIncrement(2); + rateLimitingConfig.recheckForCapacityProbability(1.0); + rateLimitingConfig.localRate(localRate); + + ClusterInfoConfig.Builder clusterInfoConfig = new ClusterInfoConfig.Builder(); + clusterInfoConfig.clusterId("testCluster"); + clusterInfoConfig.nodeCount(4); + + + return new Chain<>("test", new RateLimitingSearcher(new RateLimitingConfig(rateLimitingConfig), + new ClusterInfoConfig(clusterInfoConfig), + metric, + clock), + new CostSettingSearcher()); + } + private int requestsToTry = 50; /** |