aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2018-08-30 10:09:42 +0200
committerOlli Virtanen <olli.virtanen@oath.com>2018-08-30 10:09:42 +0200
commit6e56ca5b6f2064094933859f530df849f2e28716 (patch)
tree2bd367366f23800f7f88436f3c45b319e3cba84d /container-search
parent1f554f8bb745cb3b5ea60e9dd2aafd10ceb1f22e (diff)
Create code path for extending java side dispatcher
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java17
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java23
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java27
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java64
4 files changed, 127 insertions, 4 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index c93220f0a85..cc5c007522c 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -62,6 +62,10 @@ public class FastSearcher extends VespaBackEndSearcher {
/** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
private final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
+ /** If enabled, the dispatcher internal to the search container will be preferred over fdispatch
+ * whenever possible */
+ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
+
/** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */
private final Dispatcher dispatcher;
@@ -215,12 +219,17 @@ public class FastSearcher extends VespaBackEndSearcher {
/**
* Returns the backend object to issue a search request over.
- * Normally this is the backend field of this instance, which connects to the dispatch node this talk to
- * (which is why this instance was chosen by the cluster controller). However, when certain conditions obtain
- * (see below), we will instead return a backend instance which connects directly to the local search node
- * for efficiency.
+ * Normally this is the backend field of this instance, which connects to the dispatch node this talks to
+ * (which is why this instance was chosen by the cluster controller). However, under certain conditions
+ * we will instead return a backend instance which connects directly to the relevant search nodes.
*/
private Backend chooseBackend(Query query) {
+ if (query.properties().getBoolean(dispatchInternal, false)) {
+ Optional<Backend> directDispatchBackend = dispatcher.getDispatchBackend(query);
+ if(directDispatchBackend.isPresent()) {
+ return directDispatchBackend.get();
+ }
+ }
if ( ! query.properties().getBoolean(dispatchDirect, true)) return dispatchBackend;
if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return dispatchBackend;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 5ef81403f26..473f3932680 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -11,6 +11,7 @@ import com.yahoo.container.protect.Error;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.slime.ArrayTraverser;
import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.fs4.mplex.Backend;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
@@ -28,6 +29,7 @@ import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -57,10 +59,15 @@ public class Dispatcher extends AbstractComponent {
private final Compressor compressor = new Compressor();
+ private final LoadBalancer loadBalancer;
+ private final FS4ResourcePool fs4ResourcePool;
+
public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool,
int containerClusterSize, VipStatus vipStatus) {
this.client = new RpcClient();
this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus);
+ this.fs4ResourcePool = fs4ResourcePool;
+ this.loadBalancer = new LoadBalancer(searchCluster);
// Create node rpc connections, indexed by the node distribution key
ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>();
@@ -75,6 +82,8 @@ public class Dispatcher extends AbstractComponent {
this.searchCluster = null;
this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
this.client = client;
+ this.fs4ResourcePool = null;
+ this.loadBalancer = new LoadBalancer(searchCluster);
}
/** Returns the search cluster this dispatches to */
@@ -275,4 +284,18 @@ public class Dispatcher extends AbstractComponent {
}
+ public Optional<Backend> getDispatchBackend(Query query) {
+ Optional<SearchCluster.Group> groupInCluster = loadBalancer.getGroupForQuery(query);
+
+ return groupInCluster.flatMap(group -> {
+ if(group.nodes().size() == 1) {
+ return Optional.of(group.nodes().get(0));
+ } else {
+ return Optional.empty();
+ }
+ }).map(node -> {
+ query.trace(false, 2, "Dispatching directly (anywhere) to ", node);
+ return fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key()));
+ });
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
new file mode 100644
index 00000000000..8e90eae0eb3
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -0,0 +1,27 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.Query;
+import com.yahoo.search.dispatch.SearchCluster.Group;
+
+import java.util.Optional;
+
+public class LoadBalancer {
+
+ private final SearchCluster searchCluster;
+
+ public LoadBalancer(SearchCluster searchCluster) {
+ this.searchCluster = searchCluster;
+ }
+
+ public Optional<Group> getGroupForQuery(Query query) {
+ if (searchCluster.groups().size() == 1) {
+ for(Group group: searchCluster.groups().values()) {
+ // since the number of groups is 1, this will run only once
+ if(group.nodes().size() == 1) {
+ return Optional.of(group);
+ }
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
new file mode 100644
index 00000000000..448a8d0e894
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -0,0 +1,64 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.dispatch.SearchCluster.Group;
+import com.yahoo.search.dispatch.SearchCluster.Node;
+import junit.framework.AssertionFailedError;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class LoadBalancerTest {
+ @Test
+ public void requreThatLoadBalancerServesSingleNodeSetups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1), null, 1, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ Group group = grp.orElseGet(() -> {
+ throw new AssertionFailedError("Expected a SearchCluster.Group");
+ });
+ assertThat(group.nodes().size(), Matchers.equalTo(1));
+ }
+
+ @Test
+ public void requreThatLoadBalancerIgnoresMultiGroupSetups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ assertThat(grp.isPresent(), is(false));
+ }
+
+ @Test
+ public void requreThatLoadBalancerIgnoresClusteredSingleGroup() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 2, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ assertThat(grp.isPresent(), is(false));
+ }
+
+ @Test
+ public void requreThatLoadBalancerIgnoresClusteredGroups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0);
+ Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1);
+ Node n4 = new SearchCluster.Node(1, "test-node4", 1, 1);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2, n3, n4), null, 2, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ assertThat(grp.isPresent(), is(false));
+ }
+}