aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java18
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java166
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java43
4 files changed, 195 insertions, 43 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
index e66c48ddb74..77496114df1 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
@@ -2,6 +2,7 @@
package com.yahoo.search.dispatch;
import java.io.Closeable;
+import java.time.Duration;
import java.util.function.BiConsumer;
/**
@@ -15,13 +16,13 @@ public abstract class CloseableInvoker implements Closeable {
protected abstract void release();
- private BiConsumer<Boolean, Long> teardown = null;
+ private BiConsumer<Boolean, RequestDuration> teardown = null;
private boolean success = false;
- private long startTime = 0;
+ private RequestDuration duration;
- public void teardown(BiConsumer<Boolean, Long> teardown) {
+ public void teardown(BiConsumer<Boolean, RequestDuration> teardown) {
this.teardown = teardown;
- this.startTime = System.currentTimeMillis();
+ this.duration = new RequestDuration();
}
protected void setFinalStatus(boolean success) {
@@ -31,7 +32,7 @@ public abstract class CloseableInvoker implements Closeable {
@Override
public final void close() {
if (teardown != null) {
- teardown.accept(success, System.currentTimeMillis() - startTime);
+ teardown.accept(success, duration.complete());
teardown = null;
}
release();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 68a8e351b34..345c621ae24 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -25,11 +25,11 @@ import com.yahoo.search.query.profile.types.QueryProfileType;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
+import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.stream.Collectors;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
@@ -96,6 +96,15 @@ public class Dispatcher extends AbstractComponent {
this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster), metric);
}
+ private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
+ return switch (policy) {
+ case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN;
+ case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2;
+ case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
+ case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
+ };
+ }
+
/* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
protected Dispatcher(ClusterMonitor<Node> clusterMonitor,
SearchCluster searchCluster,
@@ -107,8 +116,7 @@ public class Dispatcher extends AbstractComponent {
this.searchCluster = searchCluster;
this.clusterMonitor = clusterMonitor;
- this.loadBalancer = new LoadBalancer(searchCluster,
- dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN);
+ this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy()));
this.invokerFactory = invokerFactory;
this.metric = metric;
this.metricContext = metric.createContext(null);
@@ -219,7 +227,7 @@ public class Dispatcher extends AbstractComponent {
invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time));
return invoker.get();
} else {
- loadBalancer.releaseGroup(group, false, 0);
+ loadBalancer.releaseGroup(group, false, RequestDuration.of(Duration.ZERO));
if (rejected == null) {
rejected = new HashSet<>();
}
@@ -239,7 +247,7 @@ public class Dispatcher extends AbstractComponent {
*/
private Set<Integer> rejectGroupBlockingFeed(List<Group> groups) {
if (groups.size() == 1) return null;
- List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).collect(Collectors.toList());
+ List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).toList();
if (groupsRejectingFeed.size() != 1) return null;
Set<Integer> rejected = new HashSet<>();
rejected.add(groupsRejectingFeed.get(0).id());
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index 4c0bcb38d15..59821827d4e 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -4,6 +4,7 @@ package com.yahoo.search.dispatch;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@@ -14,7 +15,6 @@ import java.util.logging.Logger;
/**
* LoadBalancer determines which group of content nodes should be accessed next for each search query when the
* internal java dispatcher is used.
- *
* The implementation here is a simplistic least queries in flight + round-robin load balancer
*
* @author ollivir
@@ -25,22 +25,29 @@ public class LoadBalancer {
private static final long DEFAULT_LATENCY_DECAY_RATE = 1000;
private static final long MIN_LATENCY_DECAY_RATE = 42;
- private static final double INITIAL_QUERY_TIME = 0.001;
- private static final double MIN_QUERY_TIME = 0.001;
+ private static final double LATENCY_DECAY_TIME = Duration.ofSeconds(5).toMillis()/1000.0;
+ private static final Duration INITIAL_QUERY_TIME = Duration.ofMillis(1);
+ private static final double MIN_QUERY_TIME = Duration.ofMillis(1).toMillis()/1000.0;
private final List<GroupStatus> scoreboard;
private final GroupScheduler scheduler;
- public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) {
+ public enum Policy { ROUNDROBIN, LATENCY_AMORTIZED_OVER_REQUESTS, LATENCY_AMORTIZED_OVER_TIME, BEST_OF_RANDOM_2}
+
+ public LoadBalancer(SearchCluster searchCluster, Policy policy) {
this.scoreboard = new ArrayList<>(searchCluster.groups().size());
for (Group group : searchCluster.orderedGroups()) {
scoreboard.add(new GroupStatus(group));
}
- if (roundRobin || scoreboard.size() == 1) {
- this.scheduler = new RoundRobinScheduler(scoreboard);
- } else {
- this.scheduler = new AdaptiveScheduler(new Random(), scoreboard);
- }
+ if (scoreboard.size() == 1)
+ policy = Policy.ROUNDROBIN;
+
+ this.scheduler = switch (policy) {
+ case ROUNDROBIN: yield new RoundRobinScheduler(scoreboard);
+ case BEST_OF_RANDOM_2: yield new BestOfRandom2(new Random(), scoreboard);
+ case LATENCY_AMORTIZED_OVER_REQUESTS: yield new AdaptiveScheduler(AdaptiveScheduler.Type.REQUESTS, new Random(), scoreboard);
+ case LATENCY_AMORTIZED_OVER_TIME: yield new AdaptiveScheduler(AdaptiveScheduler.Type.TIME, new Random(), scoreboard);
+ };
}
/**
@@ -71,13 +78,13 @@ public class LoadBalancer {
*
* @param group previously allocated group
* @param success was the query successful
- * @param searchTimeMs query execution time in milliseconds, used for adaptive load balancing
+ * @param searchTime query execution time, used for adaptive load balancing
*/
- public void releaseGroup(Group group, boolean success, double searchTimeMs) {
+ public void releaseGroup(Group group, boolean success, RequestDuration searchTime) {
synchronized (this) {
for (GroupStatus sched : scoreboard) {
if (sched.group.id() == group.id()) {
- sched.release(success, searchTimeMs / 1000.0);
+ sched.release(success, searchTime);
break;
}
}
@@ -86,49 +93,51 @@ public class LoadBalancer {
static class GroupStatus {
+ interface Decayer {
+ void decay(RequestDuration duration);
+ double averageCost();
+ }
+
+ static class NoDecay implements Decayer {
+ public void decay(RequestDuration duration) {}
+ public double averageCost() { return MIN_QUERY_TIME; }
+ }
+
private final Group group;
private int allocations = 0;
- private long queries = 0;
- private double averageSearchTime = INITIAL_QUERY_TIME;
+ private Decayer decayer;
GroupStatus(Group group) {
this.group = group;
+ this.decayer = new NoDecay();
+ }
+ void setDecayer(Decayer decayer) {
+ this.decayer = decayer;
}
void allocate() {
allocations++;
}
- void release(boolean success, double searchTime) {
+ void release(boolean success, RequestDuration searchTime) {
allocations--;
if (allocations < 0) {
log.warning("Double free of query target group detected");
allocations = 0;
}
if (success) {
- searchTime = Math.max(searchTime, MIN_QUERY_TIME);
- double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE);
- averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate;
- queries++;
+ decayer.decay(searchTime);
}
}
- double averageSearchTime() {
- return averageSearchTime;
- }
-
- double averageSearchTimeInverse() {
- return 1.0 / averageSearchTime;
+ double weight() {
+ return 1.0 / decayer.averageCost();
}
int groupId() {
return group.id();
}
- void setQueryStatistics(long queries, double averageSearchTime) {
- this.queries = queries;
- this.averageSearchTime = averageSearchTime;
- }
}
private interface GroupScheduler {
@@ -193,13 +202,61 @@ public class LoadBalancer {
}
static class AdaptiveScheduler implements GroupScheduler {
-
+ enum Type {TIME, REQUESTS}
private final Random random;
private final List<GroupStatus> scoreboard;
- public AdaptiveScheduler(Random random, List<GroupStatus> scoreboard) {
+ private static double toDouble(Duration duration) {
+ return duration.toNanos()/1_000_000_000.0;
+ }
+ private static Duration fromDouble(double seconds) { return Duration.ofNanos((long)(seconds*1_000_000_000));}
+
+ static class DecayByRequests implements GroupStatus.Decayer {
+ private long queries;
+ private double averageSearchTime;
+ DecayByRequests() {
+ this(0, INITIAL_QUERY_TIME);
+ }
+ DecayByRequests(long initialQueries, Duration initialSearchTime) {
+ queries = initialQueries;
+ averageSearchTime = toDouble(initialSearchTime);
+ }
+ public void decay(RequestDuration duration) {
+ double searchTime = Math.max(toDouble(duration.duration()), MIN_QUERY_TIME);
+ double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE);
+ queries++;
+ averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate;
+ }
+ public double averageCost() { return averageSearchTime; }
+ Duration averageSearchTime() { return fromDouble(averageSearchTime);}
+ }
+
+ static class DecayByTime implements GroupStatus.Decayer {
+ private double averageSearchTime;
+ private RequestDuration prev;
+ DecayByTime() {
+ this(INITIAL_QUERY_TIME, RequestDuration.of(Duration.ZERO));
+ }
+ DecayByTime(Duration initialSearchTime, RequestDuration start) {
+ averageSearchTime = toDouble(initialSearchTime);
+ prev = start;
+ }
+ public void decay(RequestDuration duration) {
+ double searchTime = Math.max(toDouble(duration.duration()), MIN_QUERY_TIME);
+ double sampleWeight = toDouble(duration.difference(prev));
+ averageSearchTime = (sampleWeight*searchTime + LATENCY_DECAY_TIME * averageSearchTime) / (LATENCY_DECAY_TIME + sampleWeight);
+ prev = duration;
+ }
+ public double averageCost() { return averageSearchTime; }
+ Duration averageSearchTime() { return fromDouble(averageSearchTime);}
+ }
+
+ public AdaptiveScheduler(Type type, Random random, List<GroupStatus> scoreboard) {
this.random = random;
this.scoreboard = scoreboard;
+ for (GroupStatus gs : scoreboard) {
+ gs.setDecayer(type == Type.REQUESTS ? new DecayByRequests() : new DecayByTime());
+ }
}
private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) {
@@ -208,7 +265,7 @@ public class LoadBalancer {
for (GroupStatus gs : scoreboard) {
if (rejected == null || !rejected.contains(gs.group.id())) {
if (!requireCoverage || gs.group.hasSufficientCoverage()) {
- sum += gs.averageSearchTimeInverse();
+ sum += gs.weight();
n++;
}
}
@@ -220,7 +277,7 @@ public class LoadBalancer {
for (GroupStatus gs : scoreboard) {
if (rejected == null || !rejected.contains(gs.group.id())) {
if (!requireCoverage || gs.group.hasSufficientCoverage()) {
- accum += gs.averageSearchTimeInverse();
+ accum += gs.weight();
if (needle < accum / sum) {
return Optional.of(gs);
}
@@ -239,4 +296,47 @@ public class LoadBalancer {
}
}
+ static class BestOfRandom2 implements GroupScheduler {
+ private final Random random;
+ private final List<GroupStatus> scoreboard;
+ public BestOfRandom2(Random random, List<GroupStatus> scoreboard) {
+ this.random = random;
+ this.scoreboard = scoreboard;
+ }
+ @Override
+ public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) {
+ GroupStatus gs = selectBestOf2(rejectedGroups, true);
+ return (gs != null)
+ ? Optional.of(gs)
+ : Optional.ofNullable(selectBestOf2(rejectedGroups, false));
+ }
+
+ private GroupStatus selectBestOf2(Set<Integer> rejectedGroups, boolean requireCoverage) {
+ List<Integer> candidates = new ArrayList<>(scoreboard.size());
+ for (int i=0; i < scoreboard.size(); i++) {
+ GroupStatus gs = scoreboard.get(i);
+ if (rejectedGroups == null || !rejectedGroups.contains(gs.group.id())) {
+ if (!requireCoverage || gs.group.hasSufficientCoverage()) {
+ candidates.add(i);
+ }
+ }
+ }
+ GroupStatus candA = selectRandom(candidates);
+ GroupStatus candB = selectRandom(candidates);
+ if (candA == null) return candB;
+ if (candB == null) return candA;
+ if (candB.allocations < candA.allocations) return candB;
+ return candA;
+ }
+ private GroupStatus selectRandom(List<Integer> candidates) {
+ if ( ! candidates.isEmpty()) {
+ int index = random.nextInt(candidates.size());
+ Integer groupIndex = candidates.remove(index);
+ return scoreboard.get(groupIndex);
+ }
+ return null;
+ }
+
+ }
+
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
new file mode 100644
index 00000000000..1206277a103
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java
@@ -0,0 +1,43 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import java.time.Duration;
+import java.time.Instant;
+
+/**
+ * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests.
+ * It does use System.nanoTime to get a steady clock.
+ *
+ * @author baldersheim
+ */
+class RequestDuration {
+ private final long startTime;
+ private long endTime;
+ RequestDuration() {
+ this(System.nanoTime());
+ }
+ private RequestDuration(long startTime) {
+ this.startTime = startTime;
+ }
+
+ RequestDuration complete() {
+ endTime = System.nanoTime();
+ return this;
+ }
+ private RequestDuration complete(long duration) {
+ endTime = startTime + duration;
+ return this;
+ }
+ Duration duration() {
+ return Duration.ofNanos(endTime - startTime);
+ }
+ Duration difference(RequestDuration prev) {
+ return Duration.ofNanos(Math.abs(endTime - prev.endTime));
+ }
+ static RequestDuration of(Duration duration) {
+ return new RequestDuration().complete(duration.toNanos());
+ }
+ static RequestDuration of(Instant sinceEpoch, Duration duration) {
+ return new RequestDuration(sinceEpoch.toEpochMilli()*1_000_000).complete(duration.toNanos());
+ }
+}