summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-09-06 13:04:12 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2018-09-06 13:04:12 +0200
commit1d1fa442fa5039c0a17e3b2e900ad7adc76e673b (patch)
treef716ac55c7dd422031840bbce41c71a61328cd10 /container-search
parentc1fdecf3cb26f1a3aef2caf290916a4f533c6c58 (diff)
Java dispatch support for multiple groups of single nodes
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java32
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java99
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java58
5 files changed, 184 insertions, 22 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 0f067f33b79..336efcdfbc3 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -222,9 +222,9 @@ public class FastSearcher extends VespaBackEndSearcher {
*/
private CloseableChannel getChannel(Query query) {
if (query.properties().getBoolean(dispatchInternal, false)) {
- Optional<CloseableChannel> directDispatchChannel = dispatcher.getDispatchBackend(query);
- if(directDispatchChannel.isPresent()) {
- return directDispatchChannel.get();
+ Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(query);
+ if (dispatchedChannel.isPresent()) {
+ return dispatchedChannel.get();
}
}
if (!query.properties().getBoolean(dispatchDirect, true))
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java
new file mode 100644
index 00000000000..85926845647
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/DispatchedChannel.java
@@ -0,0 +1,32 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.prelude.fastsearch.FS4ResourcePool;
+import com.yahoo.search.dispatch.SearchCluster.Group;
+import com.yahoo.search.dispatch.SearchCluster.Node;
+
+import java.util.Optional;
+
+public class DispatchedChannel extends CloseableChannel {
+ private final SearchCluster.Group group;
+ private final LoadBalancer loadBalancer;
+ private boolean groupAllocated = true;
+
+ public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group, Node node) {
+ super(fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key())));
+
+ this.loadBalancer = loadBalancer;
+ this.group = group;
+ }
+
+ public DispatchedChannel(FS4ResourcePool fs4ResourcePool, LoadBalancer loadBalancer, Group group) {
+ this(fs4ResourcePool, loadBalancer, group, group.nodes().iterator().next());
+ }
+
+ public void close() {
+ if (groupAllocated) {
+ groupAllocated = false;
+ loadBalancer.releaseGroup(group);
+ }
+ super.close();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index d0f03dde3dd..f3677fd8d8d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -11,7 +11,6 @@ import com.yahoo.container.protect.Error;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.slime.ArrayTraverser;
import com.yahoo.data.access.slime.SlimeAdapter;
-import com.yahoo.fs4.mplex.Backend;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
@@ -284,19 +283,17 @@ public class Dispatcher extends AbstractComponent {
}
- public Optional<CloseableChannel> getDispatchBackend(Query query) {
+ public Optional<CloseableChannel> getDispatchedChannel(Query query) {
Optional<SearchCluster.Group> groupInCluster = loadBalancer.getGroupForQuery(query);
return groupInCluster.flatMap(group -> {
if(group.nodes().size() == 1) {
- return Optional.of(group.nodes().get(0));
+ query.trace(false, 2, "Dispatching directly (anywhere) to ", group);
+ return Optional.of(new DispatchedChannel(fs4ResourcePool, loadBalancer, group));
} else {
+ loadBalancer.releaseGroup(group);
return Optional.empty();
}
- }).map(node -> {
- query.trace(false, 2, "Dispatching directly (anywhere) to ", node);
- Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key()));
- return new CloseableChannel(backend);
});
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
index 8e90eae0eb3..652ee134a87 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -3,25 +3,108 @@ package com.yahoo.search.dispatch;
import com.yahoo.search.Query;
import com.yahoo.search.dispatch.SearchCluster.Group;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
public class LoadBalancer {
+ // The implementation here is a simplistic least queries in flight + round-robin load balancer
+ // TODO: consider the options in com.yahoo.vespa.model.content.TuningDispatch
- private final SearchCluster searchCluster;
+ private final static Logger log = Logger.getLogger(LoadBalancer.class.getName());
+
+ private final boolean isInternallyDispatchable;
+ private final List<GroupSchedule> scoreboard;
+ private int needle = 0;
public LoadBalancer(SearchCluster searchCluster) {
- this.searchCluster = searchCluster;
+ if (searchCluster == null) {
+ this.isInternallyDispatchable = false;
+ this.scoreboard = null;
+ return;
+ }
+ this.isInternallyDispatchable = (searchCluster.groupSize() == 1);
+ this.scoreboard = new ArrayList<>(searchCluster.groups().size());
+
+ for (Group group : searchCluster.groups().values()) {
+ scoreboard.add(new GroupSchedule(group));
+ }
+ Collections.shuffle(scoreboard);
}
public Optional<Group> getGroupForQuery(Query query) {
- if (searchCluster.groups().size() == 1) {
- for(Group group: searchCluster.groups().values()) {
- // since the number of groups is 1, this will run only once
- if(group.nodes().size() == 1) {
- return Optional.of(group);
+ if (!isInternallyDispatchable) {
+ return Optional.empty();
+ }
+
+ return allocateNextGroup();
+ }
+
+ public void releaseGroup(Group group) {
+ synchronized (this) {
+ for (GroupSchedule sched : scoreboard) {
+ if (sched.group.id() == group.id()) {
+ sched.adjustScore(-1);
+ break;
+ }
+ }
+ }
+ }
+
+ private Optional<Group> allocateNextGroup() {
+ synchronized (this) {
+ GroupSchedule bestSchedule = null;
+
+ int index = needle;
+ for (int i = 0; i < scoreboard.size(); i++) {
+ GroupSchedule sched = scoreboard.get(index);
+ index++;
+ if (index >= scoreboard.size()) {
+ index = 0;
}
+ if (sched.group.hasSufficientCoverage() && (bestSchedule == null || sched.compareTo(bestSchedule) < 0)) {
+ bestSchedule = sched;
+ }
+ }
+ needle++;
+ if (needle >= scoreboard.size()) {
+ needle = 0;
+ }
+ Group ret = null;
+ if (bestSchedule != null) {
+ bestSchedule.adjustScore(1);
+ ret = bestSchedule.group;
+ }
+ if (log.isLoggable(Level.FINE)) {
+ log.fine("Offering <" + ret + "> for query connection");
+ }
+ return Optional.ofNullable(ret);
+ }
+ }
+
+ public static class GroupSchedule implements Comparable<GroupSchedule> {
+ private final Group group;
+ private int score;
+
+ public GroupSchedule(Group group) {
+ this.group = group;
+ this.score = 0;
+ }
+
+ @Override
+ public int compareTo(GroupSchedule that) {
+ return this.score - that.score;
+ }
+
+ public void adjustScore(int amount) {
+ this.score += amount;
+ if (score < 0) {
+ log.warning("Double free of query target group detected");
+ score = 0;
}
}
- return Optional.empty();
}
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
index 448a8d0e894..443ab52d8cc 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -4,13 +4,14 @@ package com.yahoo.search.dispatch;
import com.yahoo.search.dispatch.SearchCluster.Group;
import com.yahoo.search.dispatch.SearchCluster.Node;
import junit.framework.AssertionFailedError;
-import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Arrays;
import java.util.Optional;
+import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
public class LoadBalancerTest {
@@ -24,18 +25,21 @@ public class LoadBalancerTest {
Group group = grp.orElseGet(() -> {
throw new AssertionFailedError("Expected a SearchCluster.Group");
});
- assertThat(group.nodes().size(), Matchers.equalTo(1));
+ assertThat(group.nodes().size(), equalTo(1));
}
@Test
- public void requreThatLoadBalancerIgnoresMultiGroupSetups() {
+ public void requreThatLoadBalancerServesMultiGroupSetups() {
Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1);
SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null);
LoadBalancer lb = new LoadBalancer(cluster);
Optional<Group> grp = lb.getGroupForQuery(null);
- assertThat(grp.isPresent(), is(false));
+ Group group = grp.orElseGet(() -> {
+ throw new AssertionFailedError("Expected a SearchCluster.Group");
+ });
+ assertThat(group.nodes().size(), equalTo(1));
}
@Test
@@ -61,4 +65,50 @@ public class LoadBalancerTest {
Optional<Group> grp = lb.getGroupForQuery(null);
assertThat(grp.isPresent(), is(false));
}
+
+ @Test
+ public void requreThatLoadBalancerReturnsDifferentGroups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ // get first group
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ Group group = grp.get();
+ int id1 = group.id();
+ // release allocation
+ lb.releaseGroup(group);
+
+ // get second group
+ grp = lb.getGroupForQuery(null);
+ group = grp.get();
+ assertThat(group.id(), not(equalTo(id1)));
+ }
+
+ @Test
+ public void requreThatLoadBalancerReturnsGroupWithShortestQueue() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ // get first group
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ Group group = grp.get();
+ int id1 = group.id();
+
+ // get second group
+ grp = lb.getGroupForQuery(null);
+ group = grp.get();
+ int id2 = group.id();
+ assertThat(id2, not(equalTo(id1)));
+ // release second allocation
+ lb.releaseGroup(group);
+
+ // get third group
+ grp = lb.getGroupForQuery(null);
+ group = grp.get();
+ assertThat(group.id(), equalTo(id2));
+ }
}