diff options
Diffstat (limited to 'container-search/src')
5 files changed, 317 insertions, 75 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java index e66c48ddb74..77496114df1 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -2,6 +2,7 @@ package com.yahoo.search.dispatch; import java.io.Closeable; +import java.time.Duration; import java.util.function.BiConsumer; /** @@ -15,13 +16,13 @@ public abstract class CloseableInvoker implements Closeable { protected abstract void release(); - private BiConsumer<Boolean, Long> teardown = null; + private BiConsumer<Boolean, RequestDuration> teardown = null; private boolean success = false; - private long startTime = 0; + private RequestDuration duration; - public void teardown(BiConsumer<Boolean, Long> teardown) { + public void teardown(BiConsumer<Boolean, RequestDuration> teardown) { this.teardown = teardown; - this.startTime = System.currentTimeMillis(); + this.duration = new RequestDuration(); } protected void setFinalStatus(boolean success) { @@ -31,7 +32,7 @@ public abstract class CloseableInvoker implements Closeable { @Override public final void close() { if (teardown != null) { - teardown.accept(success, System.currentTimeMillis() - startTime); + teardown.accept(success, duration.complete()); teardown = null; } release(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 68a8e351b34..345c621ae24 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -25,11 +25,11 @@ import com.yahoo.search.query.profile.types.QueryProfileType; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; +import java.time.Duration; import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; /** * A dispatcher communicates with search nodes to perform queries and fill hits. @@ -96,6 +96,15 @@ public class Dispatcher extends AbstractComponent { this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster), metric); } + private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) { + return switch (policy) { + case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN; + case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2; + case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS; + case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME; + }; + } + /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ protected Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, @@ -107,8 +116,7 @@ public class Dispatcher extends AbstractComponent { this.searchCluster = searchCluster; this.clusterMonitor = clusterMonitor; - this.loadBalancer = new LoadBalancer(searchCluster, - dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); + this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy())); this.invokerFactory = invokerFactory; this.metric = metric; this.metricContext = metric.createContext(null); @@ -219,7 +227,7 @@ public class Dispatcher extends AbstractComponent { invoker.get().teardown((success, time) -> loadBalancer.releaseGroup(group, success, time)); return invoker.get(); } else { - loadBalancer.releaseGroup(group, false, 0); + loadBalancer.releaseGroup(group, false, RequestDuration.of(Duration.ZERO)); if (rejected == null) { rejected = new HashSet<>(); } @@ -239,7 +247,7 @@ public class Dispatcher extends AbstractComponent { */ private Set<Integer> rejectGroupBlockingFeed(List<Group> groups) { if (groups.size() == 1) return null; - List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).collect(Collectors.toList()); + List<Group> groupsRejectingFeed = groups.stream().filter(Group::isBlockingWrites).toList(); if (groupsRejectingFeed.size() != 1) return null; Set<Integer> rejected = new HashSet<>(); rejected.add(groupsRejectingFeed.get(0).id()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 4c0bcb38d15..59821827d4e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -4,6 +4,7 @@ package com.yahoo.search.dispatch; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Optional; @@ -14,7 +15,6 @@ import java.util.logging.Logger; /** * LoadBalancer determines which group of content nodes should be accessed next for each search query when the * internal java dispatcher is used. - * * The implementation here is a simplistic least queries in flight + round-robin load balancer * * @author ollivir @@ -25,22 +25,29 @@ public class LoadBalancer { private static final long DEFAULT_LATENCY_DECAY_RATE = 1000; private static final long MIN_LATENCY_DECAY_RATE = 42; - private static final double INITIAL_QUERY_TIME = 0.001; - private static final double MIN_QUERY_TIME = 0.001; + private static final double LATENCY_DECAY_TIME = Duration.ofSeconds(5).toMillis()/1000.0; + private static final Duration INITIAL_QUERY_TIME = Duration.ofMillis(1); + private static final double MIN_QUERY_TIME = Duration.ofMillis(1).toMillis()/1000.0; private final List<GroupStatus> scoreboard; private final GroupScheduler scheduler; - public LoadBalancer(SearchCluster searchCluster, boolean roundRobin) { + public enum Policy { ROUNDROBIN, LATENCY_AMORTIZED_OVER_REQUESTS, LATENCY_AMORTIZED_OVER_TIME, BEST_OF_RANDOM_2} + + public LoadBalancer(SearchCluster searchCluster, Policy policy) { this.scoreboard = new ArrayList<>(searchCluster.groups().size()); for (Group group : searchCluster.orderedGroups()) { scoreboard.add(new GroupStatus(group)); } - if (roundRobin || scoreboard.size() == 1) { - this.scheduler = new RoundRobinScheduler(scoreboard); - } else { - this.scheduler = new AdaptiveScheduler(new Random(), scoreboard); - } + if (scoreboard.size() == 1) + policy = Policy.ROUNDROBIN; + + this.scheduler = switch (policy) { + case ROUNDROBIN: yield new RoundRobinScheduler(scoreboard); + case BEST_OF_RANDOM_2: yield new BestOfRandom2(new Random(), scoreboard); + case LATENCY_AMORTIZED_OVER_REQUESTS: yield new AdaptiveScheduler(AdaptiveScheduler.Type.REQUESTS, new Random(), scoreboard); + case LATENCY_AMORTIZED_OVER_TIME: yield new AdaptiveScheduler(AdaptiveScheduler.Type.TIME, new Random(), scoreboard); + }; } /** @@ -71,13 +78,13 @@ public class LoadBalancer { * * @param group previously allocated group * @param success was the query successful - * @param searchTimeMs query execution time in milliseconds, used for adaptive load balancing + * @param searchTime query execution time, used for adaptive load balancing */ - public void releaseGroup(Group group, boolean success, double searchTimeMs) { + public void releaseGroup(Group group, boolean success, RequestDuration searchTime) { synchronized (this) { for (GroupStatus sched : scoreboard) { if (sched.group.id() == group.id()) { - sched.release(success, searchTimeMs / 1000.0); + sched.release(success, searchTime); break; } } @@ -86,49 +93,51 @@ public class LoadBalancer { static class GroupStatus { + interface Decayer { + void decay(RequestDuration duration); + double averageCost(); + } + + static class NoDecay implements Decayer { + public void decay(RequestDuration duration) {} + public double averageCost() { return MIN_QUERY_TIME; } + } + private final Group group; private int allocations = 0; - private long queries = 0; - private double averageSearchTime = INITIAL_QUERY_TIME; + private Decayer decayer; GroupStatus(Group group) { this.group = group; + this.decayer = new NoDecay(); + } + void setDecayer(Decayer decayer) { + this.decayer = decayer; } void allocate() { allocations++; } - void release(boolean success, double searchTime) { + void release(boolean success, RequestDuration searchTime) { allocations--; if (allocations < 0) { log.warning("Double free of query target group detected"); allocations = 0; } if (success) { - searchTime = Math.max(searchTime, MIN_QUERY_TIME); - double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE); - averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate; - queries++; + decayer.decay(searchTime); } } - double averageSearchTime() { - return averageSearchTime; - } - - double averageSearchTimeInverse() { - return 1.0 / averageSearchTime; + double weight() { + return 1.0 / decayer.averageCost(); } int groupId() { return group.id(); } - void setQueryStatistics(long queries, double averageSearchTime) { - this.queries = queries; - this.averageSearchTime = averageSearchTime; - } } private interface GroupScheduler { @@ -193,13 +202,61 @@ public class LoadBalancer { } static class AdaptiveScheduler implements GroupScheduler { - + enum Type {TIME, REQUESTS} private final Random random; private final List<GroupStatus> scoreboard; - public AdaptiveScheduler(Random random, List<GroupStatus> scoreboard) { + private static double toDouble(Duration duration) { + return duration.toNanos()/1_000_000_000.0; + } + private static Duration fromDouble(double seconds) { return Duration.ofNanos((long)(seconds*1_000_000_000));} + + static class DecayByRequests implements GroupStatus.Decayer { + private long queries; + private double averageSearchTime; + DecayByRequests() { + this(0, INITIAL_QUERY_TIME); + } + DecayByRequests(long initialQueries, Duration initialSearchTime) { + queries = initialQueries; + averageSearchTime = toDouble(initialSearchTime); + } + public void decay(RequestDuration duration) { + double searchTime = Math.max(toDouble(duration.duration()), MIN_QUERY_TIME); + double decayRate = Math.min(queries + MIN_LATENCY_DECAY_RATE, DEFAULT_LATENCY_DECAY_RATE); + queries++; + averageSearchTime = (searchTime + (decayRate - 1) * averageSearchTime) / decayRate; + } + public double averageCost() { return averageSearchTime; } + Duration averageSearchTime() { return fromDouble(averageSearchTime);} + } + + static class DecayByTime implements GroupStatus.Decayer { + private double averageSearchTime; + private RequestDuration prev; + DecayByTime() { + this(INITIAL_QUERY_TIME, RequestDuration.of(Duration.ZERO)); + } + DecayByTime(Duration initialSearchTime, RequestDuration start) { + averageSearchTime = toDouble(initialSearchTime); + prev = start; + } + public void decay(RequestDuration duration) { + double searchTime = Math.max(toDouble(duration.duration()), MIN_QUERY_TIME); + double sampleWeight = toDouble(duration.difference(prev)); + averageSearchTime = (sampleWeight*searchTime + LATENCY_DECAY_TIME * averageSearchTime) / (LATENCY_DECAY_TIME + sampleWeight); + prev = duration; + } + public double averageCost() { return averageSearchTime; } + Duration averageSearchTime() { return fromDouble(averageSearchTime);} + } + + public AdaptiveScheduler(Type type, Random random, List<GroupStatus> scoreboard) { this.random = random; this.scoreboard = scoreboard; + for (GroupStatus gs : scoreboard) { + gs.setDecayer(type == Type.REQUESTS ? new DecayByRequests() : new DecayByTime()); + } } private Optional<GroupStatus> selectGroup(double needle, boolean requireCoverage, Set<Integer> rejected) { @@ -208,7 +265,7 @@ public class LoadBalancer { for (GroupStatus gs : scoreboard) { if (rejected == null || !rejected.contains(gs.group.id())) { if (!requireCoverage || gs.group.hasSufficientCoverage()) { - sum += gs.averageSearchTimeInverse(); + sum += gs.weight(); n++; } } @@ -220,7 +277,7 @@ public class LoadBalancer { for (GroupStatus gs : scoreboard) { if (rejected == null || !rejected.contains(gs.group.id())) { if (!requireCoverage || gs.group.hasSufficientCoverage()) { - accum += gs.averageSearchTimeInverse(); + accum += gs.weight(); if (needle < accum / sum) { return Optional.of(gs); } @@ -239,4 +296,47 @@ public class LoadBalancer { } } + static class BestOfRandom2 implements GroupScheduler { + private final Random random; + private final List<GroupStatus> scoreboard; + public BestOfRandom2(Random random, List<GroupStatus> scoreboard) { + this.random = random; + this.scoreboard = scoreboard; + } + @Override + public Optional<GroupStatus> takeNextGroup(Set<Integer> rejectedGroups) { + GroupStatus gs = selectBestOf2(rejectedGroups, true); + return (gs != null) + ? Optional.of(gs) + : Optional.ofNullable(selectBestOf2(rejectedGroups, false)); + } + + private GroupStatus selectBestOf2(Set<Integer> rejectedGroups, boolean requireCoverage) { + List<Integer> candidates = new ArrayList<>(scoreboard.size()); + for (int i=0; i < scoreboard.size(); i++) { + GroupStatus gs = scoreboard.get(i); + if (rejectedGroups == null || !rejectedGroups.contains(gs.group.id())) { + if (!requireCoverage || gs.group.hasSufficientCoverage()) { + candidates.add(i); + } + } + } + GroupStatus candA = selectRandom(candidates); + GroupStatus candB = selectRandom(candidates); + if (candA == null) return candB; + if (candB == null) return candA; + if (candB.allocations < candA.allocations) return candB; + return candA; + } + private GroupStatus selectRandom(List<Integer> candidates) { + if ( ! candidates.isEmpty()) { + int index = random.nextInt(candidates.size()); + Integer groupIndex = candidates.remove(index); + return scoreboard.get(groupIndex); + } + return null; + } + + } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java new file mode 100644 index 00000000000..1206277a103 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RequestDuration.java @@ -0,0 +1,43 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import java.time.Duration; +import java.time.Instant; + +/** + * Contains start and and time. Exposes a duration, and lets you measure the time difference between 2 requests. + * It does use System.nanoTime to get a steady clock. + * + * @author baldersheim + */ +class RequestDuration { + private final long startTime; + private long endTime; + RequestDuration() { + this(System.nanoTime()); + } + private RequestDuration(long startTime) { + this.startTime = startTime; + } + + RequestDuration complete() { + endTime = System.nanoTime(); + return this; + } + private RequestDuration complete(long duration) { + endTime = startTime + duration; + return this; + } + Duration duration() { + return Duration.ofNanos(endTime - startTime); + } + Duration difference(RequestDuration prev) { + return Duration.ofNanos(Math.abs(endTime - prev.endTime)); + } + static RequestDuration of(Duration duration) { + return new RequestDuration().complete(duration.toNanos()); + } + static RequestDuration of(Instant sinceEpoch, Duration duration) { + return new RequestDuration(sinceEpoch.toEpochMilli()*1_000_000).complete(duration.toNanos()); + } +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index e9ed1c48302..ce3876e59c1 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -2,6 +2,7 @@ package com.yahoo.search.dispatch; import com.yahoo.search.dispatch.LoadBalancer.AdaptiveScheduler; +import com.yahoo.search.dispatch.LoadBalancer.BestOfRandom2; import com.yahoo.search.dispatch.LoadBalancer.GroupStatus; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; @@ -9,6 +10,8 @@ import com.yahoo.search.dispatch.searchcluster.SearchCluster; import org.junit.jupiter.api.Test; import org.opentest4j.AssertionFailedError; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -24,15 +27,15 @@ import static org.junit.jupiter.api.Assertions.assertTrue; * @author ollivir */ public class LoadBalancerTest { - + private static final double delta = 0.0000001; @Test void requireThatLoadBalancerServesSingleNodeSetups() { Node n1 = new Node(0, "test-node1", 0); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1), null, null); - LoadBalancer lb = new LoadBalancer(cluster, true); + LoadBalancer lb = new LoadBalancer(cluster, LoadBalancer.Policy.ROUNDROBIN); Optional<Group> grp = lb.takeGroup(null); - Group group = grp.orElseGet(() -> { + Group group = grp.orElseThrow(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); assertEquals(1, group.nodes().size()); @@ -43,10 +46,10 @@ public class LoadBalancerTest { Node n1 = new Node(0, "test-node1", 0); Node n2 = new Node(1, "test-node2", 1); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, null); - LoadBalancer lb = new LoadBalancer(cluster, true); + LoadBalancer lb = new LoadBalancer(cluster, LoadBalancer.Policy.ROUNDROBIN); Optional<Group> grp = lb.takeGroup(null); - Group group = grp.orElseGet(() -> { + Group group = grp.orElseThrow(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); assertEquals(1, group.nodes().size()); @@ -59,7 +62,7 @@ public class LoadBalancerTest { Node n3 = new Node(0, "test-node3", 1); Node n4 = new Node(1, "test-node4", 1); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2, n3, n4), null, null); - LoadBalancer lb = new LoadBalancer(cluster, true); + LoadBalancer lb = new LoadBalancer(cluster, LoadBalancer.Policy.ROUNDROBIN); Optional<Group> grp = lb.takeGroup(null); assertTrue(grp.isPresent()); @@ -70,14 +73,14 @@ public class LoadBalancerTest { Node n1 = new Node(0, "test-node1", 0); Node n2 = new Node(1, "test-node2", 1); SearchCluster cluster = new SearchCluster("a", createDispatchConfig(n1, n2), null, null); - LoadBalancer lb = new LoadBalancer(cluster, true); + LoadBalancer lb = new LoadBalancer(cluster, LoadBalancer.Policy.ROUNDROBIN); // get first group Optional<Group> grp = lb.takeGroup(null); Group group = grp.get(); int id1 = group.id(); // release allocation - lb.releaseGroup(group, true, 1.0); + lb.releaseGroup(group, true, RequestDuration.of(Duration.ofMillis(1))); // get second group grp = lb.takeGroup(null); @@ -87,31 +90,30 @@ public class LoadBalancerTest { @Test void requireCorrectAverageSearchTimeDecay() { - final double delta = 0.00001; - + AdaptiveScheduler.DecayByRequests decayer = new AdaptiveScheduler.DecayByRequests(0, Duration.ofSeconds(1)); GroupStatus gs = newGroupStatus(1); - gs.setQueryStatistics(0, 1.0); - updateSearchTime(gs, 1.0); - assertEquals(1.0, gs.averageSearchTime(), delta); - updateSearchTime(gs, 2.0); - assertEquals(1.02326, gs.averageSearchTime(), delta); - updateSearchTime(gs, 2.0); - assertEquals(1.04545, gs.averageSearchTime(), delta); - updateSearchTime(gs, 0.1); - updateSearchTime(gs, 0.1); - updateSearchTime(gs, 0.1); - updateSearchTime(gs, 0.1); - assertEquals(0.966667, gs.averageSearchTime(), delta); + gs.setDecayer(decayer); + updateSearchTime(gs, RequestDuration.of(Duration.ofSeconds(1))); + assertEquals(Duration.ofSeconds(1), decayer.averageSearchTime()); + updateSearchTime(gs, RequestDuration.of(Duration.ofSeconds(2))); + assertEquals(Duration.ofNanos(1023255813), decayer.averageSearchTime()); + updateSearchTime(gs, RequestDuration.of(Duration.ofSeconds(2))); + assertEquals(Duration.ofNanos(1045454545), decayer.averageSearchTime()); + updateSearchTime(gs, RequestDuration.of(Duration.ofMillis(100))); + updateSearchTime(gs, RequestDuration.of(Duration.ofMillis(100))); + updateSearchTime(gs, RequestDuration.of(Duration.ofMillis(100))); + updateSearchTime(gs, RequestDuration.of(Duration.ofMillis(100))); + assertEquals(Duration.ofNanos(966666666), decayer.averageSearchTime()); for (int i = 0; i < 10000; i++) { - updateSearchTime(gs, 1.0); + updateSearchTime(gs, RequestDuration.of(Duration.ofSeconds(1))); } - assertEquals(1.0, gs.averageSearchTime(), delta); - updateSearchTime(gs, 0.1); - assertEquals(0.9991, gs.averageSearchTime(), delta); + assertEquals(Duration.ofNanos(999999812), decayer.averageSearchTime()); + updateSearchTime(gs, RequestDuration.of(Duration.ofMillis(100))); + assertEquals(Duration.ofNanos(999099812), decayer.averageSearchTime()); for (int i = 0; i < 10000; i++) { - updateSearchTime(gs, 0.0); + updateSearchTime(gs, RequestDuration.of(Duration.ZERO)); } - assertEquals(0.001045, gs.averageSearchTime(), delta); + assertEquals(Duration.ofNanos(1045087), decayer.averageSearchTime()); } @Test @@ -121,7 +123,7 @@ public class LoadBalancerTest { scoreboard.add(newGroupStatus(i)); } Random seq = sequence(0.0, 0.1, 0.2, 0.39, 0.4, 0.6, 0.8, 0.99999); - AdaptiveScheduler sched = new AdaptiveScheduler(seq, scoreboard); + AdaptiveScheduler sched = new AdaptiveScheduler(AdaptiveScheduler.Type.REQUESTS, seq, scoreboard); assertEquals(0, sched.takeNextGroup(null).get().groupId()); assertEquals(0, sched.takeNextGroup(null).get().groupId()); @@ -138,11 +140,15 @@ public class LoadBalancerTest { List<GroupStatus> scoreboard = new ArrayList<>(); for (int i = 0; i < 5; i++) { GroupStatus gs = newGroupStatus(i); - gs.setQueryStatistics(1, 0.1 * (i + 1)); scoreboard.add(gs); } Random seq = sequence(0.0, 0.4379, 0.4380, 0.6569, 0.6570, 0.8029, 0.8030, 0.9124, 0.9125); - AdaptiveScheduler sched = new AdaptiveScheduler(seq, scoreboard); + AdaptiveScheduler sched = new AdaptiveScheduler(AdaptiveScheduler.Type.REQUESTS, seq, scoreboard); + int i= 0; + for (GroupStatus gs : scoreboard) { + gs.setDecayer(new AdaptiveScheduler.DecayByRequests(1, Duration.ofMillis((long)(0.1 * (i + 1)*1000.0)))); + i++; + } assertEquals(0, sched.takeNextGroup(null).get().groupId()); assertEquals(0, sched.takeNextGroup(null).get().groupId()); @@ -155,7 +161,87 @@ public class LoadBalancerTest { assertEquals(4, sched.takeNextGroup(null).get().groupId()); } - private static void updateSearchTime(GroupStatus gs, double time) { + private static GroupStatus allocate(GroupStatus gs) { + gs.allocate(); + return gs; + } + @Test + void requireBestOfRandom2Scheduler() { + List<GroupStatus> scoreboard = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + scoreboard.add(newGroupStatus(i)); + } + Random seq = sequence( + 0.1, 0.125, + 0.1, 0.125, + 0.1, 0.125, + 0.1, 0.125, + 0.1, 0.375, + 0.9, 0.125, + 0.9, 0.125, + 0.9, 0.125 + ); + BestOfRandom2 sched = new BestOfRandom2(seq, scoreboard); + + assertEquals(0, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(1, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(0, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(1, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(2, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(4, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(4, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(4, allocate(sched.takeNextGroup(null).get()).groupId()); + assertEquals(0, allocate(sched.takeNextGroup(null).get()).groupId()); + } + + private static int countRequestsToReach90p(Duration timeBetweenSample, Duration searchTime) { + double p90 = 0.9*searchTime.toMillis()/1000.0; + GroupStatus.Decayer decayer = new AdaptiveScheduler.DecayByTime(Duration.ofMillis(1), RequestDuration.of(Instant.EPOCH, Duration.ZERO)); + int requests = 0; + Instant start = Instant.EPOCH; + while (decayer.averageCost() < p90) { + decayer.decay(RequestDuration.of(start, searchTime)); + start = start.plus(timeBetweenSample); + requests++; + } + return requests; + } + + @Test + public void requireDecayByTimeToDependOnlyOnTime() { + GroupStatus.Decayer decayer = new AdaptiveScheduler.DecayByTime(Duration.ofMillis(2), RequestDuration.of(Instant.EPOCH, Duration.ZERO)); + assertEquals(0.002, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(1000), Duration.ofMillis(10))); + assertEquals(0.003344426, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(2000), Duration.ofMillis(10))); + assertEquals(0.004453688, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(3000), Duration.ofMillis(10))); + assertEquals(0.005378073, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(3100), Duration.ofMillis(10))); + assertEquals(0.005468700, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(3100), Duration.ofMillis(10))); + assertEquals(0.005468700, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(3000), Duration.ofMillis(10))); + assertEquals(0.005557549, decayer.averageCost(), delta); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(5000), Duration.ofMillis(10))); + assertEquals(0.006826820, decayer.averageCost(), delta); + assertEquals(112, countRequestsToReach90p(Duration.ofMillis(100), Duration.ofMillis(10))); + assertEquals(57, countRequestsToReach90p(Duration.ofMillis(200), Duration.ofMillis(10))); + assertEquals(14, countRequestsToReach90p(Duration.ofMillis(1000), Duration.ofMillis(10))); + } + + @Test + public void requireDecayByTimeToNotJumpTooFar() { + AdaptiveScheduler.DecayByTime decayer = new AdaptiveScheduler.DecayByTime(Duration.ofMillis(2), RequestDuration.of(Instant.EPOCH, Duration.ZERO)); + assertEquals(0.002, decayer.averageCost(), delta); + assertEquals(Duration.ofMillis(2), decayer.averageSearchTime()); + decayer.decay(RequestDuration.of(Instant.ofEpochMilli(10000), Duration.ofMillis(10))); + assertEquals(0.007335110, decayer.averageCost(), delta); + assertEquals(Duration.ofNanos(7335109), decayer.averageSearchTime()); + + } + + private static void updateSearchTime(GroupStatus gs, RequestDuration time) { gs.allocate(); gs.release(true, time); } @@ -183,6 +269,10 @@ public class LoadBalancerTest { } return retv; } + @Override + public int nextInt(int bound) { + return (int)(nextDouble() * bound); + } }; } |