diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-12-19 13:34:15 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-12-19 13:34:15 +0200 |
commit | 9fede395bcf3fe1f1af3a228046fcd51b7eee197 (patch) | |
tree | ca4de2923b4ff1105c497d8da3be20716d50c3f0 /container-search | |
parent | 26c6781168f21eb58bb86f1c40babd3a38e03dfd (diff) |
Adaptive load balancing scheduler, coverage calculation fixes
Diffstat (limited to 'container-search')
8 files changed, 315 insertions, 115 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index bdab74326ca..58203981039 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -114,12 +114,12 @@ public class FS4InvokerFactory { } private SearchInvoker createCoverageErrorInvoker(List<Node> nodes, Set<Integer> failed) { - long activeDocuments = 0; StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); Integer key = null; + int count = 0; for (Node node : nodes) { if (failed.contains(node.key())) { - activeDocuments += node.getActiveDocuments(); + count++; if (key == null) { key = node.key(); } else { @@ -128,8 +128,8 @@ public class FS4InvokerFactory { down.append(node.key()); } } - Coverage coverage = new Coverage(0, activeDocuments, 0); - coverage.setNodesTried(1); + Coverage coverage = new Coverage(0, 0, 0); + coverage.setNodesTried(count); return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index eb35886113c..ae0a29aa80b 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -13,6 +13,7 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.ResponseMonitor; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; @@ -127,7 +128,9 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor<F private Result errorResult(ErrorMessage errorMessage) { Result error = new Result(query, errorMessage); - getErrorCoverage().ifPresent(error::setCoverage); + Coverage errorCoverage = new Coverage(0, 0, 0); + errorCoverage.setNodesTried(1); + error.setCoverage(errorCoverage); return error; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java index 481940a33b7..515d6249fd8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -2,6 +2,7 @@ package com.yahoo.search.dispatch; import java.io.Closeable; +import java.util.function.BiConsumer; /** * CloseableInvoker is an abstract implementation of {@link Closeable} with an additional hook for @@ -13,16 +14,23 @@ import java.io.Closeable; public abstract class CloseableInvoker implements Closeable { protected abstract void release(); - private Runnable teardown = null; + private BiConsumer<Boolean, Long> teardown = null; + private boolean success = false; + private long startTime = 0; - public void teardown(Runnable teardown) { + public void teardown(BiConsumer<Boolean, Long> teardown) { this.teardown = teardown; + this.startTime = System.currentTimeMillis(); + } + + protected void setFinalStatus(boolean success) { + this.success = success; } @Override public final void close() { if (teardown != null) { - teardown.run(); + teardown.accept(success, System.currentTimeMillis() - startTime); teardown = null; } release(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 1ca64be7924..baf401a2536 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -144,10 +144,10 @@ public class Dispatcher extends AbstractComponent { if (invoker.isPresent()) { query.trace(false, 2, "Dispatching internally to search group ", group.id()); query.getModel().setSearchPath("/" + group.id()); - invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); + invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time)); return invoker; } else { - loadBalancer.releaseGroup(group); + loadBalancer.releaseGroup(group, false, 0); if (rejected == null) { rejected = new HashSet<>(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index afd4b4b1807..e6ea2035151 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -24,6 +24,7 @@ import java.util.logging.Level; import java.util.logging.Logger; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_ADAPTIVE_TIMEOUT; +import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; /** @@ -54,6 +55,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private int askedNodes = 0; private int answeredNodes = 0; private boolean timedOut = false; + private boolean degradedByMatchPhase = false; private boolean trimResult = false; @@ -144,7 +146,6 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM message = "Backend communication timeout"; } result.hits().addError(ErrorMessage.createBackendCommunicationError(message)); - invoker.getErrorCoverage().ifPresent(this::collectCoverage); timedOut = true; } } @@ -201,7 +202,9 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM answeredDocs += source.getDocs(); answeredActiveDocs += source.getActive(); answeredSoonActiveDocs += source.getSoonActive(); - answeredNodes++; + answeredNodes += source.getNodes(); + degradedByMatchPhase |= source.isDegradedByMatchPhase(); + timedOut |= source.isDegradedByTimeout(); } private Coverage createCoverage() { @@ -209,9 +212,14 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM Coverage coverage = new Coverage(answeredDocs, answeredActiveDocs, answeredNodes, 1); coverage.setNodesTried(askedNodes); coverage.setSoonActive(answeredSoonActiveDocs); + int degradedReason = 0; if (timedOut) { - coverage.setDegradedReason(adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT); + degradedReason |= (adaptiveTimeoutCalculated ? DEGRADED_BY_ADAPTIVE_TIMEOUT : DEGRADED_BY_TIMEOUT); } + if (degradedByMatchPhase) { + degradedReason |= DEGRADED_BY_MATCH_PHASE; + } + coverage.setDegradedReason(degradedReason); return coverage; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index df6384cf81c..65a158a42fc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -5,9 +5,9 @@ import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.Optional; +import java.util.Random; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -23,23 +23,23 @@ public class LoadBalancer { private static final Logger log = Logger.getLogger(LoadBalancer.class.getName()); - private final List<GroupSchedule> scoreboard; - private int needle = 0; + private static final long DEFAULT_LATENCY_DECAY_RATE = 1000; + private static final long MIN_LATENCY_DECAY_RATE = 42; + private static final double INITIAL_QUERY_TIME = 0.001; + private static final double MIN_QUERY_TIME = 0.001; + + private final List<GroupStatus> scoreboard; + private final GroupScheduler scheduler; public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) { - if (searchCluster == null) { - this.scoreboard = null; - return; - } this.scoreboard = new ArrayList<>(searchCluster.groups().size()); - for (Group group : searchCluster.orderedGroups()) { - scoreboard.add(new GroupSchedule(group)); + scoreboard.add(new GroupStatus(group)); } - - if(! roundRobin) { - // TODO - More randomness could be desirable - Collections.shuffle(scoreboard); + if (roundRobin) { + this.scheduler = new RoundRobinScheduler(scoreboard); + } else { + this.scheduler = new AdaptiveScheduler(new Random(), scoreboard); } } @@ -51,11 +51,21 @@ public class LoadBalancer { * @return The node group to target, or <i>empty</i> if the internal dispatch logic cannot be used */ public Optional<Group> takeGroup(Set<Integer> rejectedGroups) { - if (scoreboard == null) { - return Optional.empty(); - } + synchronized (this) { + Optional<GroupStatus> best = scheduler.takeNextGroup(rejectedGroups); - return allocateNextGroup(rejectedGroups); + if (best.isPresent()) { + GroupStatus gs = best.get(); + gs.allocate(); + Group ret = gs.group; + if (log.isLoggable(Level.FINE)) { + log.fine("Offering <" + ret + "> for query connection"); + } + return Optional.of(ret); + } else { + return Optional.empty(); + } + } } /** @@ -63,90 +73,188 @@ public class LoadBalancer { * * @param group * previously allocated group + * @param success + * was the query successful + * @param searchTimeMs + * query execution time in milliseconds, used for adaptive load balancing */ - public void releaseGroup(Group group) { + public void releaseGroup(Group group, boolean success, double searchTimeMs) { synchronized (this) { - for (GroupSchedule sched : scoreboard) { + for (GroupStatus sched : scoreboard) { if (sched.group.id() == group.id()) { - sched.adjustScore(-1); + sched.release(success, (double) searchTimeMs / 1000.0); break; } } } } - private Optional<Group> allocateNextGroup(Set<Integer> rejectedGroups) { - synchronized (this) { - GroupSchedule bestSchedule = null; + static class GroupStatus { + private final Group group; + private int allocations = 0; + private long queries = 0; + private double averageSearchTime = INITIAL_QUERY_TIME; + + GroupStatus(Group group) { + this.group = group; + } + + void allocate() { + allocations++; + } + + void release(boolean success, double searchTime) { + allocations--; + if (allocations < 0) { + log.warning("Double free of query target group detected"); + allocations = 0; + } + if (success) { + searchTime = Math.max(searchTime, MIN_QUERY_TIME); + double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE); + averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate; + queries++; + } + } + + double averageSearchTime() { + return averageSearchTime; + } + + double averageSearchTimeInverse() { + return 1.0 / averageSearchTime; + } + + int groupId() { + return group.id(); + } + + void setQueryStatistics(long queries, double averageSearchTime) { + this.queries = queries; + this.averageSearchTime = averageSearchTime; + } + } + + private interface GroupScheduler { + Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups); + } + + private static class RoundRobinScheduler implements GroupScheduler { + private int needle = 0; + private final List<GroupStatus> scoreboard; + + public RoundRobinScheduler(List<GroupStatus> scoreboard) { + this.scoreboard = scoreboard; + } + + @Override + public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) { + GroupStatus bestCandidate = null; int bestIndex = needle; int index = needle; for (int i = 0; i < scoreboard.size(); i++) { - GroupSchedule sched = scoreboard.get(index); - if (rejectedGroups == null || !rejectedGroups.contains(sched.group.id())) { - if (sched.isPreferredOver(bestSchedule)) { - bestSchedule = sched; + GroupStatus candidate = scoreboard.get(index); + if (rejectedGroups == null || !rejectedGroups.contains(candidate.group.id())) { + GroupStatus better = betterGroup(bestCandidate, candidate); + if (better == candidate) { + bestCandidate = candidate; bestIndex = index; } } index = nextScoreboardIndex(index); } needle = nextScoreboardIndex(bestIndex); + return Optional.ofNullable(bestCandidate); + } - Group ret = null; - if (bestSchedule != null) { - bestSchedule.adjustScore(1); - ret = bestSchedule.group; + /** + * Select the better of the two given GroupStatus objects, biased to the first + * parameter. Thus, if all groups have equal coverage sufficiency, the one + * currently at the needle will be used. Either parameter can be null, in which + * case any non-null will be preferred. + * + * @param first preferred GroupStatus + * @param second potentially better GroupStatus + * @return the better of the two + */ + private static GroupStatus betterGroup(GroupStatus first, GroupStatus second) { + if (second == null) { + return first; } - if (log.isLoggable(Level.FINE)) { - log.fine("Offering <" + ret + "> for query connection"); + if (first == null) { + return second; + } + + // different coverage + if (first.group.hasSufficientCoverage() != second.group.hasSufficientCoverage()) { + if (!first.group.hasSufficientCoverage()) { + // first doesn't have coverage, second does + return second; + } else { + // second doesn't have coverage, first does + return first; + } } - return Optional.ofNullable(ret); + + return first; } - } - private int nextScoreboardIndex(int current) { - int next = current + 1; - if (next >= scoreboard.size()) { - next %= scoreboard.size(); + private int nextScoreboardIndex(int current) { + int next = current + 1; + if (next >= scoreboard.size()) { + next %= scoreboard.size(); + } + return next; } - return next; } - private static class GroupSchedule { - private final Group group; - private int score; + static class AdaptiveScheduler implements GroupScheduler { + private final Random random; + private final List<GroupStatus> scoreboard; - public GroupSchedule(Group group) { - this.group = group; - this.score = 0; + public AdaptiveScheduler(Random random, List<GroupStatus> scoreboard) { + this.random = random; + this.scoreboard = scoreboard; } - public boolean isPreferredOver(GroupSchedule other) { - if (other == null) { - return true; + private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) { + double sum = 0; + int n = 0; + for (GroupStatus gs : scoreboard) { + if (rejected == null || !rejected.contains(gs.group.id())) { + if (!requireCoverage || gs.group.hasSufficientCoverage()) { + sum += gs.averageSearchTimeInverse(); + n++; + } + } } - - // different coverage - if (this.group.hasSufficientCoverage() != other.group.hasSufficientCoverage()) { - if (! this.group.hasSufficientCoverage()) { - // this doesn't have coverage, other does - return false; - } else { - // other doesn't have coverage, this does - return true; + if (n == 0) { + return Optional.empty(); + } + double accum = 0; + for (GroupStatus gs : scoreboard) { + if (rejected == null || !rejected.contains(gs.group.id())) { + if (!requireCoverage || gs.group.hasSufficientCoverage()) { + accum += gs.averageSearchTimeInverse(); + if (needle < accum / sum) { + return Optional.of(gs); + } + } } } - - return this.score < other.score; + return Optional.empty(); // should not happen here } - public void adjustScore(int amount) { - this.score += amount; - if (score < 0) { - log.warning("Double free of query target group detected"); - score = 0; + @Override + public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) { + double needle = random.nextDouble(); + Optional<GroupStatus> gs = selectGroup(needle, true, rejectedGroups); + if (gs.isPresent()) { + return gs; } + // fallback - any coverage better than none + return selectGroup(needle, false, rejectedGroups); } } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java index 677e0ed1dc2..aec59df9a38 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.CacheKey; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; import com.yahoo.search.searchchain.Execution; import java.io.IOException; @@ -33,7 +32,9 @@ public abstract class SearchInvoker extends CloseableInvoker { */ public Result search(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) throws IOException { sendSearchRequest(query, queryPacket); - return getSearchResult(cacheKey, execution); + Result result = getSearchResult(cacheKey, execution); + setFinalStatus(result.hits().getError() == null); + return result; } protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException; @@ -53,14 +54,4 @@ public abstract class SearchInvoker extends CloseableInvoker { protected Optional<Integer> distributionKey() { return node.map(Node::key); } - - protected Optional<Coverage> getErrorCoverage() { - if (node.isPresent()) { - Coverage error = new Coverage(0, node.get().getActiveDocuments(), 0); - error.setNodesTried(1); - return Optional.of(error); - } else { - return Optional.empty(); - } - } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index c056423a9c4..141d87a41ab 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -1,15 +1,22 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.yahoo.search.dispatch.LoadBalancer.AdaptiveScheduler; +import com.yahoo.search.dispatch.LoadBalancer.GroupStatus; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import junit.framework.AssertionFailedError; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Optional; +import java.util.Random; import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig; +import static org.hamcrest.Matchers.closeTo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -20,7 +27,7 @@ import static org.junit.Assert.assertThat; */ public class LoadBalancerTest { @Test - public void requreThatLoadBalancerServesSingleNodeSetups() { + public void requireThatLoadBalancerServesSingleNodeSetups() { Node n1 = new Node(0, "test-node1", 0, 0); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster, true); @@ -33,7 +40,7 @@ public class LoadBalancerTest { } @Test - public void requreThatLoadBalancerServesMultiGroupSetups() { + public void requireThatLoadBalancerServesMultiGroupSetups() { Node n1 = new Node(0, "test-node1", 0, 0); Node n2 = new Node(1, "test-node2", 1, 1); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null); @@ -47,7 +54,7 @@ public class LoadBalancerTest { } @Test - public void requreThatLoadBalancerServesClusteredGroups() { + public void requireThatLoadBalancerServesClusteredGroups() { Node n1 = new Node(0, "test-node1", 0, 0); Node n2 = new Node(1, "test-node2", 1, 0); Node n3 = new Node(0, "test-node3", 0, 1); @@ -60,7 +67,7 @@ public class LoadBalancerTest { } @Test - public void requreThatLoadBalancerReturnsDifferentGroups() { + public void requireThatLoadBalancerReturnsDifferentGroups() { Node n1 = new Node(0, "test-node1", 0, 0); Node n2 = new Node(1, "test-node2", 1, 1); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null); @@ -71,7 +78,7 @@ public class LoadBalancerTest { Group group = grp.get(); int id1 = group.id(); // release allocation - lb.releaseGroup(group); + lb.releaseGroup(group, true, 1.0); // get second group grp = lb.takeGroup(null); @@ -80,28 +87,103 @@ public class LoadBalancerTest { } @Test - public void requreThatLoadBalancerReturnsGroupWithShortestQueue() { - Node n1 = new Node(0, "test-node1", 0, 0); - Node n2 = new Node(1, "test-node2", 1, 1); - SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, 1, null); - LoadBalancer lb = new LoadBalancer(cluster, true); + public void requireCorrectAverageSearchTimeDecay() { + final double SMALL = 0.00001; + + GroupStatus gs = newGroupStatus(1); + gs.setQueryStatistics(0, 1.0); + updateSearchTime(gs, 1.0); + assertThat(gs.averageSearchTime(), equalTo(1.0)); + updateSearchTime(gs, 2.0); + assertThat(gs.averageSearchTime(), closeTo(1.02326, SMALL)); + updateSearchTime(gs, 2.0); + assertThat(gs.averageSearchTime(), closeTo(1.04545, SMALL)); + updateSearchTime(gs, 0.1); + updateSearchTime(gs, 0.1); + updateSearchTime(gs, 0.1); + updateSearchTime(gs, 0.1); + assertThat(gs.averageSearchTime(), closeTo(0.966667, SMALL)); + for (int i = 0; i < 10000; i++) { + updateSearchTime(gs, 1.0); + } + assertThat(gs.averageSearchTime(), closeTo(1.0, SMALL)); + updateSearchTime(gs, 0.1); + assertThat(gs.averageSearchTime(), closeTo(0.9991, SMALL)); + for (int i = 0; i < 10000; i++) { + updateSearchTime(gs, 0.0); + } + assertThat(gs.averageSearchTime(), closeTo(0.001045, SMALL)); + } - // get first group - Optional<Group> grp = lb.takeGroup(null); - Group group = grp.get(); - int id1 = group.id(); + @Test + public void requireEqualDistributionInFlatWeightListWithAdaptiveScheduler() { + List<GroupStatus> scoreboard = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + scoreboard.add(newGroupStatus(i)); + } + Random seq = sequence(0.0, 0.1, 0.2, 0.39, 0.4, 0.6, 0.8, 0.99999); + AdaptiveScheduler sched = new AdaptiveScheduler(seq, scoreboard); + + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(2)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(3)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(4)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(4)); + } - // get second group - grp = lb.takeGroup(null); - group = grp.get(); - int id2 = group.id(); - assertThat(id2, not(equalTo(id1))); - // release second allocation - lb.releaseGroup(group); + @Test + public void requireThatAdaptiveSchedulerObeysWeights() { + List<GroupStatus> scoreboard = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + GroupStatus gs = newGroupStatus(i); + gs.setQueryStatistics(1, 0.1 * (i + 1)); + scoreboard.add(gs); + } + Random seq = sequence(0.0, 0.4379, 0.4380, 0.6569, 0.6570, 0.8029, 0.8030, 0.9124, 0.9125); + AdaptiveScheduler sched = new AdaptiveScheduler(seq, scoreboard); + + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(0)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(1)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(2)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(2)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(3)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(3)); + assertThat(sched.takeNextGroup(null).get().groupId(), equalTo(4)); + } - // get third group - grp = lb.takeGroup(null); - group = grp.get(); - assertThat(group.id(), equalTo(id2)); + private static void updateSearchTime(GroupStatus gs, double time) { + gs.allocate(); + gs.release(true, time); + } + + private GroupStatus newGroupStatus(int id) { + Group dummyGroup = new Group(id, Collections.emptyList()) { + @Override + public boolean hasSufficientCoverage() { + return true; + } + }; + return new GroupStatus(dummyGroup); + } + + private Random sequence(double... values) { + return new Random() { + private int index = 0; + + @Override + public double nextDouble() { + double retv = values[index]; + index++; + if (index >= values.length) { + index = 0; + } + return retv; + } + }; } } |