summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <ovirtanen@gmail.com>2019-01-02 10:50:43 +0200
committerGitHub <noreply@github.com>2019-01-02 10:50:43 +0200
commite7cfae15a651a1c8e25a5da188707a7b101776cf (patch)
tree3d307abbefda3010340805f953f293444082dc6b /container-search
parent1e90ccea25ec833b516f6c351d8f01fc2ee94a4e (diff)
parenteb21a1f095e91aaea5a5869c3079b954255890a4 (diff)
Merge pull request #7967 from vespa-engine/ollivir/adaptive-lb-scheduling
Adaptive load balancing scheduler, coverage calculation fixes
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java8
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java5
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java14
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java4
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java39
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java237
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java15
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java132
8 files changed, 325 insertions, 129 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
index bdab74326ca..58203981039 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
@@ -114,12 +114,12 @@ public class FS4InvokerFactory {
}
private SearchInvoker createCoverageErrorInvoker(List<Node> nodes, Set<Integer> failed) {
- long activeDocuments = 0;
StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: ");
Integer key = null;
+ int count = 0;
for (Node node : nodes) {
if (failed.contains(node.key())) {
- activeDocuments += node.getActiveDocuments();
+ count++;
if (key == null) {
key = node.key();
} else {
@@ -128,8 +128,8 @@ public class FS4InvokerFactory {
down.append(node.key());
}
}
- Coverage coverage = new Coverage(0, activeDocuments, 0);
- coverage.setNodesTried(1);
+ Coverage coverage = new Coverage(0, 0, 0);
+ coverage.setNodesTried(count);
return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage);
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
index eb35886113c..ae0a29aa80b 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
@@ -13,6 +13,7 @@ import com.yahoo.search.Result;
import com.yahoo.search.dispatch.ResponseMonitor;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Node;
+import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.searchchain.Execution;
@@ -127,7 +128,9 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<F
private Result errorResult(ErrorMessage errorMessage) {
Result error = new Result(query, errorMessage);
- getErrorCoverage().ifPresent(error::setCoverage);
+ Coverage errorCoverage = new Coverage(0, 0, 0);
+ errorCoverage.setNodesTried(1);
+ error.setCoverage(errorCoverage);
return error;
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
index 481940a33b7..515d6249fd8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
@@ -2,6 +2,7 @@
package com.yahoo.search.dispatch;
import java.io.Closeable;
+import java.util.function.BiConsumer;
/**
* CloseableInvoker is an abstract implementation of {@link Closeable} with an additional hook for
@@ -13,16 +14,23 @@ import java.io.Closeable;
public abstract class CloseableInvoker implements Closeable {
protected abstract void release();
- private Runnable teardown = null;
+ private BiConsumer<Boolean, Long> teardown = null;
+ private boolean success = false;
+ private long startTime = 0;
- public void teardown(Runnable teardown) {
+ public void teardown(BiConsumer<Boolean, Long> teardown) {
this.teardown = teardown;
+ this.startTime = System.currentTimeMillis();
+ }
+
+ protected void setFinalStatus(boolean success) {
+ this.success = success;
}
@Override
public final void close() {
if (teardown != null) {
- teardown.run();
+ teardown.accept(success, System.currentTimeMillis() - startTime);
teardown = null;
}
release();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 1ca64be7924..baf401a2536 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -144,10 +144,10 @@ public class Dispatcher extends AbstractComponent {
if (invoker.isPresent()) {
query.trace(false, 2, "Dispatching internally to search group ", group.id());
query.getModel().setSearchPath("/" + group.id());
- invoker.get().teardown(() -> loadBalancer.releaseGroup(group));
+ invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time));
return invoker;
} else {
- loadBalancer.releaseGroup(group);
+ loadBalancer.releaseGroup(group, false, 0);
if (rejected == null) {
rejected = new HashSet<>();
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index afd4b4b1807..d695c108533 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -20,10 +20,10 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
import java.util.logging.Logger;
import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT;
+import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE;
import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
/**
@@ -53,7 +53,9 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private long answeredSoonActiveDocs = 0;
private int askedNodes = 0;
private int answeredNodes = 0;
+ private int answeredNodesParticipated = 0;
private boolean timedOut = false;
+ private boolean degradedByMatchPhase = false;
private boolean trimResult = false;
@@ -99,9 +101,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
while (!invokers.isEmpty() && nextTimeout >= 0) {
SearchInvoker invoker = availableForProcessing.poll(nextTimeout, TimeUnit.MILLISECONDS);
if (invoker == null) {
- if (log.isLoggable(Level.FINE)) {
- log.fine("Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received");
- }
+ log.fine(() -> "Search timed out with " + askedNodes + " requests made, " + answeredNodes + " responses received");
break;
} else {
invokers.remove(invoker);
@@ -144,7 +144,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
message = "Backend communication timeout";
}
result.hits().addError(ErrorMessage.createBackendCommunicationError(message));
- invoker.getErrorCoverage().ifPresent(this::collectCoverage);
timedOut = true;
}
}
@@ -201,36 +200,44 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
answeredDocs += source.getDocs();
answeredActiveDocs += source.getActive();
answeredSoonActiveDocs += source.getSoonActive();
+ answeredNodesParticipated += source.getNodes();
answeredNodes++;
+ degradedByMatchPhase |= source.isDegradedByMatchPhase();
+ timedOut |= source.isDegradedByTimeout();
}
private Coverage createCoverage() {
adjustDegradedCoverage();
- Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodes, 1);
+ Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodesParticipated, 1);
coverage.setNodesTried(askedNodes);
coverage.setSoonActive(answeredSoonActiveDocs);
+ int degradedReason = 0;
if (timedOut) {
- coverage.setDegradedReason(adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT);
+ degradedReason |= (adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT);
}
+ if (degradedByMatchPhase) {
+ degradedReason |= DEGRADED_BY_MATCH_PHASE;
+ }
+ coverage.setDegradedReason(degradedReason);
return coverage;
}
private void adjustDegradedCoverage() {
- if (askedNodes == answeredNodes) {
+ if (askedNodes == answeredNodesParticipated) {
return;
}
- int notAnswered = askedNodes - answeredNodes;
+ int notAnswered = askedNodes - answeredNodesParticipated;
- if (adaptiveTimeoutCalculated) {
- answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodes);
- answeredSoonActiveDocs += (notAnswered * answeredSoonActiveDocs / answeredNodes);
+ if (adaptiveTimeoutCalculated && answeredNodesParticipated > 0) {
+ answeredActiveDocs += (notAnswered * answeredActiveDocs / answeredNodesParticipated);
+ answeredSoonActiveDocs += (notAnswered * answeredSoonActiveDocs / answeredNodesParticipated);
} else {
- if (askedNodes > answeredNodes) {
+ if (askedNodes > answeredNodesParticipated) {
int searchableCopies = (int) searchCluster.dispatchConfig().searchableCopies();
int missingNodes = notAnswered - (searchableCopies - 1);
- if (answeredNodes > 0) {
- answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodes);
- answeredSoonActiveDocs += (missingNodes * answeredSoonActiveDocs / answeredNodes);
+ if (answeredNodesParticipated > 0) {
+ answeredActiveDocs += (missingNodes * answeredActiveDocs / answeredNodesParticipated);
+ answeredSoonActiveDocs += (missingNodes * answeredSoonActiveDocs / answeredNodesParticipated);
timedOut = true;
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index df6384cf81c..3c7af447809 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -5,11 +5,10 @@ import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.Random;
import java.util.Set;
-import java.util.logging.Level;
import java.util.logging.Logger;
/**
@@ -23,23 +22,23 @@ public class LoadBalancer {
private static final Logger log = Logger.getLogger(LoadBalancer.class.getName());
- private final List<GroupSchedule> scoreboard;
- private int needle = 0;
+ private static final long DEFAULT_LATENCY_DECAY_RATE = 1000;
+ private static final long MIN_LATENCY_DECAY_RATE = 42;
+ private static final double INITIAL_QUERY_TIME = 0.001;
+ private static final double MIN_QUERY_TIME = 0.001;
+
+ private final List<GroupStatus> scoreboard;
+ private final GroupScheduler scheduler;
public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) {
- if (searchCluster == null) {
- this.scoreboard = null;
- return;
- }
this.scoreboard = new ArrayList<>(searchCluster.groups().size());
-
for (Group group : searchCluster.orderedGroups()) {
- scoreboard.add(new GroupSchedule(group));
+ scoreboard.add(new GroupStatus(group));
}
-
- if(! roundRobin) {
- // TODO - More randomness could be desirable
- Collections.shuffle(scoreboard);
+ if (roundRobin) {
+ this.scheduler = new RoundRobinScheduler(scoreboard);
+ } else {
+ this.scheduler = new AdaptiveScheduler(new Random(), scoreboard);
}
}
@@ -51,11 +50,19 @@ public class LoadBalancer {
* @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used
*/
public Optional<Group> takeGroup(Set<Integer> rejectedGroups) {
- if (scoreboard == null) {
- return Optional.empty();
- }
+ synchronized (this) {
+ Optional<GroupStatus> best = scheduler.takeNextGroup(rejectedGroups);
- return allocateNextGroup(rejectedGroups);
+ if (best.isPresent()) {
+ GroupStatus gs = best.get();
+ gs.allocate();
+ Group ret = gs.group;
+ log.fine(() -> "Offering <" + ret + "> for query connection");
+ return Optional.of(ret);
+ } else {
+ return Optional.empty();
+ }
+ }
}
/**
@@ -63,90 +70,188 @@ public class LoadBalancer {
*
* @param group
* previously allocated group
+ * @param success
+ * was the query successful
+ * @param searchTimeMs
+ * query execution time in milliseconds, used for adaptive load balancing
*/
- public void releaseGroup(Group group) {
+ public void releaseGroup(Group group, boolean success, double searchTimeMs) {
synchronized (this) {
- for (GroupSchedule sched : scoreboard) {
+ for (GroupStatus sched : scoreboard) {
if (sched.group.id() == group.id()) {
- sched.adjustScore(-1);
+ sched.release(success, (double) searchTimeMs / 1000.0);
break;
}
}
}
}
- private Optional<Group> allocateNextGroup(Set<Integer> rejectedGroups) {
- synchronized (this) {
- GroupSchedule bestSchedule = null;
+ static class GroupStatus {
+ private final Group group;
+ private int allocations = 0;
+ private long queries = 0;
+ private double averageSearchTime = INITIAL_QUERY_TIME;
+
+ GroupStatus(Group group) {
+ this.group = group;
+ }
+
+ void allocate() {
+ allocations++;
+ }
+
+ void release(boolean success, double searchTime) {
+ allocations--;
+ if (allocations < 0) {
+ log.warning("Double free of query target group detected");
+ allocations = 0;
+ }
+ if (success) {
+ searchTime = Math.max(searchTime, MIN_QUERY_TIME);
+ double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE);
+ averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate;
+ queries++;
+ }
+ }
+
+ double averageSearchTime() {
+ return averageSearchTime;
+ }
+
+ double averageSearchTimeInverse() {
+ return 1.0 / averageSearchTime;
+ }
+
+ int groupId() {
+ return group.id();
+ }
+
+ void setQueryStatistics(long queries, double averageSearchTime) {
+ this.queries = queries;
+ this.averageSearchTime = averageSearchTime;
+ }
+ }
+
+ private interface GroupScheduler {
+ Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups);
+ }
+
+ private static class RoundRobinScheduler implements GroupScheduler {
+ private int needle = 0;
+ private final List<GroupStatus> scoreboard;
+
+ public RoundRobinScheduler(List<GroupStatus> scoreboard) {
+ this.scoreboard = scoreboard;
+ }
+
+ @Override
+ public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) {
+ GroupStatus bestCandidate = null;
int bestIndex = needle;
int index = needle;
for (int i = 0; i < scoreboard.size(); i++) {
- GroupSchedule sched = scoreboard.get(index);
- if (rejectedGroups == null || !rejectedGroups.contains(sched.group.id())) {
- if (sched.isPreferredOver(bestSchedule)) {
- bestSchedule = sched;
+ GroupStatus candidate = scoreboard.get(index);
+ if (rejectedGroups == null || !rejectedGroups.contains(candidate.group.id())) {
+ GroupStatus better = betterGroup(bestCandidate, candidate);
+ if (better == candidate) {
+ bestCandidate = candidate;
bestIndex = index;
}
}
index = nextScoreboardIndex(index);
}
needle = nextScoreboardIndex(bestIndex);
+ return Optional.ofNullable(bestCandidate);
+ }
- Group ret = null;
- if (bestSchedule != null) {
- bestSchedule.adjustScore(1);
- ret = bestSchedule.group;
+ /**
+ * Select the better of the two given GroupStatus objects, biased to the first
+ * parameter. Thus, if all groups have equal coverage sufficiency, the one
+ * currently at the needle will be used. Either parameter can be null, in which
+ * case any non-null will be preferred.
+ *
+ * @param first preferred GroupStatus
+ * @param second potentially better GroupStatus
+ * @return the better of the two
+ */
+ private static GroupStatus betterGroup(GroupStatus first, GroupStatus second) {
+ if (second == null) {
+ return first;
}
- if (log.isLoggable(Level.FINE)) {
- log.fine("Offering <" + ret + "> for query connection");
+ if (first == null) {
+ return second;
}
- return Optional.ofNullable(ret);
+
+ // different coverage
+ if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage()) {
+ if (!first.group.hasSufficientCoverage()) {
+ // first doesn't have coverage, second does
+ return second;
+ } else {
+ // second doesn't have coverage, first does
+ return first;
+ }
+ }
+
+ return first;
}
- }
- private int nextScoreboardIndex(int current) {
- int next = current + 1;
- if (next >= scoreboard.size()) {
- next %= scoreboard.size();
+ private int nextScoreboardIndex(int current) {
+ int next = current + 1;
+ if (next >= scoreboard.size()) {
+ next %= scoreboard.size();
+ }
+ return next;
}
- return next;
}
- private static class GroupSchedule {
- private final Group group;
- private int score;
+ static class AdaptiveScheduler implements GroupScheduler {
+ private final Random random;
+ private final List<GroupStatus> scoreboard;
- public GroupSchedule(Group group) {
- this.group = group;
- this.score = 0;
+ public AdaptiveScheduler(Random random, List<GroupStatus> scoreboard) {
+ this.random = random;
+ this.scoreboard = scoreboard;
}
- public boolean isPreferredOver(GroupSchedule other) {
- if (other == null) {
- return true;
+ private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) {
+ double sum = 0;
+ int n = 0;
+ for (GroupStatus gs : scoreboard) {
+ if (rejected == null || !rejected.contains(gs.group.id())) {
+ if (!requireCoverage || gs.group.hasSufficientCoverage()) {
+ sum += gs.averageSearchTimeInverse();
+ n++;
+ }
+ }
}
-
- // different coverage
- if (this.group.hasSufficientCoverage() != other.group.hasSufficientCoverage()) {
- if (! this.group.hasSufficientCoverage()) {
- // this doesn't have coverage, other does
- return false;
- } else {
- // other doesn't have coverage, this does
- return true;
+ if (n == 0) {
+ return Optional.empty();
+ }
+ double accum = 0;
+ for (GroupStatus gs : scoreboard) {
+ if (rejected == null || !rejected.contains(gs.group.id())) {
+ if (!requireCoverage || gs.group.hasSufficientCoverage()) {
+ accum += gs.averageSearchTimeInverse();
+ if (needle < accum / sum) {
+ return Optional.of(gs);
+ }
+ }
}
}
-
- return this.score < other.score;
+ return Optional.empty(); // should not happen here
}
- public void adjustScore(int amount) {
- this.score += amount;
- if (score < 0) {
- log.warning("Double free of query target group detected");
- score = 0;
+ @Override
+ public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) {
+ double needle = random.nextDouble();
+ Optional<GroupStatus> gs = selectGroup(needle, true, rejectedGroups);
+ if (gs.isPresent()) {
+ return gs;
}
+ // fallback - any coverage better than none
+ return selectGroup(needle, false, rejectedGroups);
}
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
index 677e0ed1dc2..aec59df9a38 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
@@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.CacheKey;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Node;
-import com.yahoo.search.result.Coverage;
import com.yahoo.search.searchchain.Execution;
import java.io.IOException;
@@ -33,7 +32,9 @@ public abstract class SearchInvoker extends CloseableInvoker {
*/
public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) throws IOException {
sendSearchRequest(query, queryPacket);
- return getSearchResult(cacheKey, execution);
+ Result result = getSearchResult(cacheKey, execution);
+ setFinalStatus(result.hits().getError() == null);
+ return result;
}
protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException;
@@ -53,14 +54,4 @@ public abstract class SearchInvoker extends CloseableInvoker {
protected Optional<Integer> distributionKey() {
return node.map(Node::key);
}
-
- protected Optional<Coverage> getErrorCoverage() {
- if (node.isPresent()) {
- Coverage error = new Coverage(0, node.get().getActiveDocuments(), 0);
- error.setNodesTried(1);
- return Optional.of(error);
- } else {
- return Optional.empty();
- }
- }
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
index c056423a9c4..141d87a41ab 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -1,15 +1,22 @@
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.yahoo.search.dispatch.LoadBalancer.AdaptiveScheduler;
+import com.yahoo.search.dispatch.LoadBalancer.GroupStatus;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import junit.framework.AssertionFailedError;
import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
+import java.util.Random;
import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig;
+import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
@@ -20,7 +27,7 @@ import static org.junit.Assert.assertThat;
*/
public class LoadBalancerTest {
@Test
- public void requreThatLoadBalancerServesSingleNodeSetups() {
+ public void requireThatLoadBalancerServesSingleNodeSetups() {
Node n1 = new Node(0, "test-node1", 0, 0);
SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), null, 1, null);
LoadBalancer lb = new LoadBalancer(cluster, true);
@@ -33,7 +40,7 @@ public class LoadBalancerTest {
}
@Test
- public void requreThatLoadBalancerServesMultiGroupSetups() {
+ public void requireThatLoadBalancerServesMultiGroupSetups() {
Node n1 = new Node(0, "test-node1", 0, 0);
Node n2 = new Node(1, "test-node2", 1, 1);
SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null);
@@ -47,7 +54,7 @@ public class LoadBalancerTest {
}
@Test
- public void requreThatLoadBalancerServesClusteredGroups() {
+ public void requireThatLoadBalancerServesClusteredGroups() {
Node n1 = new Node(0, "test-node1", 0, 0);
Node n2 = new Node(1, "test-node2", 1, 0);
Node n3 = new Node(0, "test-node3", 0, 1);
@@ -60,7 +67,7 @@ public class LoadBalancerTest {
}
@Test
- public void requreThatLoadBalancerReturnsDifferentGroups() {
+ public void requireThatLoadBalancerReturnsDifferentGroups() {
Node n1 = new Node(0, "test-node1", 0, 0);
Node n2 = new Node(1, "test-node2", 1, 1);
SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null);
@@ -71,7 +78,7 @@ public class LoadBalancerTest {
Group group = grp.get();
int id1 = group.id();
// release allocation
- lb.releaseGroup(group);
+ lb.releaseGroup(group, true, 1.0);
// get second group
grp = lb.takeGroup(null);
@@ -80,28 +87,103 @@ public class LoadBalancerTest {
}
@Test
- public void requreThatLoadBalancerReturnsGroupWithShortestQueue() {
- Node n1 = new Node(0, "test-node1", 0, 0);
- Node n2 = new Node(1, "test-node2", 1, 1);
- SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null);
- LoadBalancer lb = new LoadBalancer(cluster, true);
+ public void requireCorrectAverageSearchTimeDecay() {
+ final double SMALL = 0.00001;
+
+ GroupStatus gs = newGroupStatus(1);
+ gs.setQueryStatistics(0, 1.0);
+ updateSearchTime(gs, 1.0);
+ assertThat(gs.averageSearchTime(), equalTo(1.0));
+ updateSearchTime(gs, 2.0);
+ assertThat(gs.averageSearchTime(), closeTo(1.02326, SMALL));
+ updateSearchTime(gs, 2.0);
+ assertThat(gs.averageSearchTime(), closeTo(1.04545, SMALL));
+ updateSearchTime(gs, 0.1);
+ updateSearchTime(gs, 0.1);
+ updateSearchTime(gs, 0.1);
+ updateSearchTime(gs, 0.1);
+ assertThat(gs.averageSearchTime(), closeTo(0.966667, SMALL));
+ for (int i = 0; i < 10000; i++) {
+ updateSearchTime(gs, 1.0);
+ }
+ assertThat(gs.averageSearchTime(), closeTo(1.0, SMALL));
+ updateSearchTime(gs, 0.1);
+ assertThat(gs.averageSearchTime(), closeTo(0.9991, SMALL));
+ for (int i = 0; i < 10000; i++) {
+ updateSearchTime(gs, 0.0);
+ }
+ assertThat(gs.averageSearchTime(), closeTo(0.001045, SMALL));
+ }
- // get first group
- Optional<Group> grp = lb.takeGroup(null);
- Group group = grp.get();
- int id1 = group.id();
+ @Test
+ public void requireEqualDistributionInFlatWeightListWithAdaptiveScheduler() {
+ List<GroupStatus> scoreboard = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ scoreboard.add(newGroupStatus(i));
+ }
+ Random seq = sequence(0.0, 0.1, 0.2, 0.39, 0.4, 0.6, 0.8, 0.99999);
+ AdaptiveScheduler sched = new AdaptiveScheduler(seq, scoreboard);
+
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(2));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(3));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(4));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(4));
+ }
- // get second group
- grp = lb.takeGroup(null);
- group = grp.get();
- int id2 = group.id();
- assertThat(id2, not(equalTo(id1)));
- // release second allocation
- lb.releaseGroup(group);
+ @Test
+ public void requireThatAdaptiveSchedulerObeysWeights() {
+ List<GroupStatus> scoreboard = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ GroupStatus gs = newGroupStatus(i);
+ gs.setQueryStatistics(1, 0.1 * (i + 1));
+ scoreboard.add(gs);
+ }
+ Random seq = sequence(0.0, 0.4379, 0.4380, 0.6569, 0.6570, 0.8029, 0.8030, 0.9124, 0.9125);
+ AdaptiveScheduler sched = new AdaptiveScheduler(seq, scoreboard);
+
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(2));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(2));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(3));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(3));
+ assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(4));
+ }
- // get third group
- grp = lb.takeGroup(null);
- group = grp.get();
- assertThat(group.id(), equalTo(id2));
+ private static void updateSearchTime(GroupStatus gs, double time) {
+ gs.allocate();
+ gs.release(true, time);
+ }
+
+ private GroupStatus newGroupStatus(int id) {
+ Group dummyGroup = new Group(id, Collections.emptyList()) {
+ @Override
+ public boolean hasSufficientCoverage() {
+ return true;
+ }
+ };
+ return new GroupStatus(dummyGroup);
+ }
+
+ private Random sequence(double... values) {
+ return new Random() {
+ private int index = 0;
+
+ @Override
+ public double nextDouble() {
+ double retv = values[index];
+ index++;
+ if (index >= values.length) {
+ index = 0;
+ }
+ return retv;
+ }
+ };
}
}