diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2018-09-06 13:04:12 +0200 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2018-09-06 13:04:12 +0200 |
commit | 1d1fa442fa5039c0a17e3b2e900ad7adc76e673b (patch) | |
tree | f716ac55c7dd422031840bbce41c71a61328cd10 /container-search | |
parent | c1fdecf3cb26f1a3aef2caf290916a4f533c6c58 (diff) |
Java dispatch support for multiple groups of single nodes
Diffstat (limited to 'container-search')
5 files changed, 184 insertions, 22 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 0f067f33b79..336efcdfbc3 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -222,9 +222,9 @@ public class FastSearcher extends VespaBackEndSearcher { */ private CloseableChannel getChannel(Query query) { if (query.properties().getBoolean(dispatchInternal, false)) { - Optional<CloseableChannel> directDispatchChannel = dispatcher.getDispatchBackend(query); - if(directDispatchChannel.isPresent()) { - return directDispatchChannel.get(); + Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(query); + if (dispatchedChannel.isPresent()) { + return dispatchedChannel.get(); } } if (!query.properties().getBoolean(dispatchDirect, true)) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java new file mode 100644 index 00000000000..85926845647 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java @@ -0,0 +1,32 @@ +package com.yahoo.search.dispatch; + +import com.yahoo.prelude.fastsearch.FS4ResourcePool; +import com.yahoo.search.dispatch.SearchCluster.Group; +import com.yahoo.search.dispatch.SearchCluster.Node; + +import java.util.Optional; + +public class DispatchedChannel extends CloseableChannel { + private final SearchCluster.Group group; + private final LoadBalancer loadBalancer; + private boolean groupAllocated = true; + + public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group, Node node) { + super(fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key()))); + + this.loadBalancer = loadBalancer; + this.group = group; + } + + public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group) { + this(fs4ResourcePool, loadBalancer, group, group.nodes().iterator().next()); + } + + public void close() { + if (groupAllocated) { + groupAllocated = false; + loadBalancer.releaseGroup(group); + } + super.close(); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index d0f03dde3dd..f3677fd8d8d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -11,7 +11,6 @@ import com.yahoo.container.protect.Error; import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.slime.ArrayTraverser; import com.yahoo.data.access.slime.SlimeAdapter; -import com.yahoo.fs4.mplex.Backend; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; @@ -284,19 +283,17 @@ public class Dispatcher extends AbstractComponent { } - public Optional<CloseableChannel> getDispatchBackend(Query query) { + public Optional<CloseableChannel> getDispatchedChannel(Query query) { Optional<SearchCluster.Group> groupInCluster = loadBalancer.getGroupForQuery(query); return groupInCluster.flatMap(group -> { if(group.nodes().size() == 1) { - return Optional.of(group.nodes().get(0)); + query.trace(false, 2, "Dispatching directly (anywhere) to ", group); + return Optional.of(new DispatchedChannel(fs4ResourcePool, loadBalancer, group)); } else { + loadBalancer.releaseGroup(group); return Optional.empty(); } - }).map(node -> { - query.trace(false, 2, "Dispatching directly (anywhere) to ", node); - Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())); - return new CloseableChannel(backend); }); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java index 8e90eae0eb3..652ee134a87 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java @@ -3,25 +3,108 @@ package com.yahoo.search.dispatch; import com.yahoo.search.Query; import com.yahoo.search.dispatch.SearchCluster.Group; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; public class LoadBalancer { + // The implementation here is a simplistic least queries in flight + round-robin load balancer + // TODO: consider the options in com.yahoo.vespa.model.content.TuningDispatch - private final SearchCluster searchCluster; + private final static Logger log = Logger.getLogger(LoadBalancer.class.getName()); + + private final boolean isInternallyDispatchable; + private final List<GroupSchedule> scoreboard; + private int needle = 0; public LoadBalancer(SearchCluster searchCluster) { - this.searchCluster = searchCluster; + if (searchCluster == null) { + this.isInternallyDispatchable = false; + this.scoreboard = null; + return; + } + this.isInternallyDispatchable = (searchCluster.groupSize() == 1); + this.scoreboard = new ArrayList<>(searchCluster.groups().size()); + + for (Group group : searchCluster.groups().values()) { + scoreboard.add(new GroupSchedule(group)); + } + Collections.shuffle(scoreboard); } public Optional<Group> getGroupForQuery(Query query) { - if (searchCluster.groups().size() == 1) { - for(Group group: searchCluster.groups().values()) { - // since the number of groups is 1, this will run only once - if(group.nodes().size() == 1) { - return Optional.of(group); + if (!isInternallyDispatchable) { + return Optional.empty(); + } + + return allocateNextGroup(); + } + + public void releaseGroup(Group group) { + synchronized (this) { + for (GroupSchedule sched : scoreboard) { + if (sched.group.id() == group.id()) { + sched.adjustScore(-1); + break; + } + } + } + } + + private Optional<Group> allocateNextGroup() { + synchronized (this) { + GroupSchedule bestSchedule = null; + + int index = needle; + for (int i = 0; i < scoreboard.size(); i++) { + GroupSchedule sched = scoreboard.get(index); + index++; + if (index >= scoreboard.size()) { + index = 0; } + if (sched.group.hasSufficientCoverage() && (bestSchedule == null || sched.compareTo(bestSchedule) < 0)) { + bestSchedule = sched; + } + } + needle++; + if (needle >= scoreboard.size()) { + needle = 0; + } + Group ret = null; + if (bestSchedule != null) { + bestSchedule.adjustScore(1); + ret = bestSchedule.group; + } + if (log.isLoggable(Level.FINE)) { + log.fine("Offering <" + ret + "> for query connection"); + } + return Optional.ofNullable(ret); + } + } + + public static class GroupSchedule implements Comparable<GroupSchedule> { + private final Group group; + private int score; + + public GroupSchedule(Group group) { + this.group = group; + this.score = 0; + } + + @Override + public int compareTo(GroupSchedule that) { + return this.score - that.score; + } + + public void adjustScore(int amount) { + this.score += amount; + if (score < 0) { + log.warning("Double free of query target group detected"); + score = 0; } } - return Optional.empty(); } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java index 448a8d0e894..443ab52d8cc 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java @@ -4,13 +4,14 @@ package com.yahoo.search.dispatch; import com.yahoo.search.dispatch.SearchCluster.Group; import com.yahoo.search.dispatch.SearchCluster.Node; import junit.framework.AssertionFailedError; -import org.hamcrest.Matchers; import org.junit.Test; import java.util.Arrays; import java.util.Optional; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; public class LoadBalancerTest { @@ -24,18 +25,21 @@ public class LoadBalancerTest { Group group = grp.orElseGet(() -> { throw new AssertionFailedError("Expected a SearchCluster.Group"); }); - assertThat(group.nodes().size(), Matchers.equalTo(1)); + assertThat(group.nodes().size(), equalTo(1)); } @Test - public void requreThatLoadBalancerIgnoresMultiGroupSetups() { + public void requreThatLoadBalancerServesMultiGroupSetups() { Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); LoadBalancer lb = new LoadBalancer(cluster); Optional<Group> grp = lb.getGroupForQuery(null); - assertThat(grp.isPresent(), is(false)); + Group group = grp.orElseGet(() -> { + throw new AssertionFailedError("Expected a SearchCluster.Group"); + }); + assertThat(group.nodes().size(), equalTo(1)); } @Test @@ -61,4 +65,50 @@ public class LoadBalancerTest { Optional<Group> grp = lb.getGroupForQuery(null); assertThat(grp.isPresent(), is(false)); } + + @Test + public void requreThatLoadBalancerReturnsDifferentGroups() { + Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); + SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster); + + // get first group + Optional<Group> grp = lb.getGroupForQuery(null); + Group group = grp.get(); + int id1 = group.id(); + // release allocation + lb.releaseGroup(group); + + // get second group + grp = lb.getGroupForQuery(null); + group = grp.get(); + assertThat(group.id(), not(equalTo(id1))); + } + + @Test + public void requreThatLoadBalancerReturnsGroupWithShortestQueue() { + Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0); + Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1); + SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null); + LoadBalancer lb = new LoadBalancer(cluster); + + // get first group + Optional<Group> grp = lb.getGroupForQuery(null); + Group group = grp.get(); + int id1 = group.id(); + + // get second group + grp = lb.getGroupForQuery(null); + group = grp.get(); + int id2 = group.id(); + assertThat(id2, not(equalTo(id1))); + // release second allocation + lb.releaseGroup(group); + + // get third group + grp = lb.getGroupForQuery(null); + group = grp.get(); + assertThat(group.id(), equalTo(id2)); + } } |