summaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorOlli Virtanen <ovirtanen@gmail.com>2018-08-31 13:01:11 +0200
committerGitHub <noreply@github.com>2018-08-31 13:01:11 +0200
commit610e0406ee5c4e9a9a1f0093ad659fd3d4ff8daa (patch)
treeea0d54df6fde4fbe55ea59aa582d4a0f021ddbba /container-search
parenta0e1762341111284f40ae09b1bdba5c5501f22cb (diff)
parent9593b7025cfc053d2a57e584cfb639b579c99ceb (diff)
Merge pull request #6749 from vespa-engine/ollivir/dispatch-in-container-search
Java dispatcher extensions
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java59
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java44
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java24
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java1
-rw-r--r--container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java3
-rw-r--r--container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java4
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java1
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java64
10 files changed, 195 insertions, 33 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index c93220f0a85..0f067f33b79 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -22,6 +22,7 @@ import com.yahoo.prelude.querytransform.QueryRewrite;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.CloseableChannel;
import com.yahoo.search.dispatch.Dispatcher;
import com.yahoo.search.dispatch.SearchCluster;
import com.yahoo.search.grouping.GroupingRequest;
@@ -62,6 +63,10 @@ public class FastSearcher extends VespaBackEndSearcher {
/** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
private final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
+ /** If enabled, the dispatcher internal to the search container will be preferred over fdispatch
+ * whenever possible */
+ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
+
/** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */
private final Dispatcher dispatcher;
@@ -168,11 +173,9 @@ public class FastSearcher extends VespaBackEndSearcher {
@Override
public Result doSearch2(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) {
- FS4Channel channel = null;
- try {
- if (dispatcher.searchCluster().groupSize() == 1)
- forceSinglePassGrouping(query);
- channel = chooseBackend(query).openChannel();
+ if (dispatcher.searchCluster().groupSize() == 1)
+ forceSinglePassGrouping(query);
+ try(CloseableChannel channel = getChannel(query)) {
channel.setQuery(query);
Result result = searchTwoPhase(channel, query, queryPacket, cacheKey);
@@ -195,9 +198,6 @@ public class FastSearcher extends VespaBackEndSearcher {
query.trace(getName() + " error response: " + result, false, 1);
result.hits().addError(ErrorMessage.createBackendCommunicationError(getName() + " failed: "+ e.getMessage()));
return result;
- } finally {
- if (channel != null)
- channel.close();
}
}
@@ -214,24 +214,32 @@ public class FastSearcher extends VespaBackEndSearcher {
}
/**
- * Returns the backend object to issue a search request over.
- * Normally this is the backend field of this instance, which connects to the dispatch node this talk to
- * (which is why this instance was chosen by the cluster controller). However, when certain conditions obtain
- * (see below), we will instead return a backend instance which connects directly to the local search node
- * for efficiency.
+ * Returns an interface object to issue a search request over.
+ * Normally this is built from the backend field of this instance, which connects to the dispatch node
+ * this component talks to (which is why this instance was chosen by the cluster controller). However,
+ * under certain conditions we will instead return an interface which connects directly to the relevant
+ * search nodes.
*/
- private Backend chooseBackend(Query query) {
- if ( ! query.properties().getBoolean(dispatchDirect, true)) return dispatchBackend;
- if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) return dispatchBackend;
+ private CloseableChannel getChannel(Query query) {
+ if (query.properties().getBoolean(dispatchInternal, false)) {
+ Optional<CloseableChannel> directDispatchChannel = dispatcher.getDispatchBackend(query);
+ if(directDispatchChannel.isPresent()) {
+ return directDispatchChannel.get();
+ }
+ }
+ if (!query.properties().getBoolean(dispatchDirect, true))
+ return new CloseableChannel(dispatchBackend);
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
+ return new CloseableChannel(dispatchBackend);
Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
- if ( ! directDispatchRecipient.isPresent()) return dispatchBackend;
+ if (!directDispatchRecipient.isPresent())
+ return new CloseableChannel(dispatchBackend);
// Dispatch directly to the single, local search node
query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get());
- return fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(),
- directDispatchRecipient.get().fs4port(),
- Optional.of(directDispatchRecipient.get().key()));
+ return new CloseableChannel(fs4ResourcePool.getBackend(directDispatchRecipient.get().hostname(),
+ directDispatchRecipient.get().fs4port(), Optional.of(directDispatchRecipient.get().key())));
}
/**
@@ -270,10 +278,9 @@ public class FastSearcher extends VespaBackEndSearcher {
packetWrapper = cacheLookupTwoPhase(cacheKey, result, summaryClass);
}
- FS4Channel channel = chooseBackend(query).openChannel();
- channel.setQuery(query);
Packet[] receivedPackets;
- try {
+ try(CloseableChannel channel = getChannel(query)) {
+ channel.setQuery(query);
DocsumPacketKey[] packetKeys;
if (countFastHits(result) > 0) {
@@ -340,8 +347,6 @@ public class FastSearcher extends VespaBackEndSearcher {
query.trace(traceMsg, false, 3);
}
}
- } finally {
- channel.close();
}
}
@@ -373,7 +378,7 @@ public class FastSearcher extends VespaBackEndSearcher {
return null;
}
- private Result searchTwoPhase(FS4Channel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ private Result searchTwoPhase(CloseableChannel channel, Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
if (isLoggingFine())
getLogger().finest("sending query packet");
@@ -453,7 +458,7 @@ public class FastSearcher extends VespaBackEndSearcher {
return packets;
}
- private Packet[] fetchSummaries(FS4Channel channel, Result result, String summaryClass)
+ private Packet[] fetchSummaries(CloseableChannel channel, Result result, String summaryClass)
throws InvalidChannelException, ChannelTimeoutException, ClassCastException, IOException {
BasicPacket[] receivedPackets;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
new file mode 100644
index 00000000000..9a3e7e71031
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
@@ -0,0 +1,44 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.search.Query;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Optional;
+
+public class CloseableChannel implements Closeable {
+ private FS4Channel channel;
+
+ public CloseableChannel(Backend backend) {
+ this.channel = backend.openChannel();
+ }
+
+ public void setQuery(Query query) {
+ channel.setQuery(query);
+ }
+
+ public boolean sendPacket(BasicPacket packet) throws InvalidChannelException, IOException {
+ return channel.sendPacket(packet);
+ }
+
+ public BasicPacket[] receivePackets(long timeout, int packetCount) throws InvalidChannelException, ChannelTimeoutException {
+ return channel.receivePackets(timeout, packetCount);
+ }
+
+ public Optional<Integer> distributionKey() {
+ return channel.distributionKey();
+ }
+
+ @Override
+ public void close() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 5ef81403f26..d0f03dde3dd 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -11,6 +11,7 @@ import com.yahoo.container.protect.Error;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
import com.yahoo.slime.ArrayTraverser;
import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.fs4.mplex.Backend;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
@@ -28,6 +29,7 @@ import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -57,10 +59,15 @@ public class Dispatcher extends AbstractComponent {
private final Compressor compressor = new Compressor();
+ private final LoadBalancer loadBalancer;
+ private final FS4ResourcePool fs4ResourcePool;
+
public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool,
int containerClusterSize, VipStatus vipStatus) {
this.client = new RpcClient();
this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus);
+ this.fs4ResourcePool = fs4ResourcePool;
+ this.loadBalancer = new LoadBalancer(searchCluster);
// Create node rpc connections, indexed by the node distribution key
ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>();
@@ -75,6 +82,8 @@ public class Dispatcher extends AbstractComponent {
this.searchCluster = null;
this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
this.client = client;
+ this.fs4ResourcePool = null;
+ this.loadBalancer = new LoadBalancer(searchCluster);
}
/** Returns the search cluster this dispatches to */
@@ -275,4 +284,19 @@ public class Dispatcher extends AbstractComponent {
}
+ public Optional<CloseableChannel> getDispatchBackend(Query query) {
+ Optional<SearchCluster.Group> groupInCluster = loadBalancer.getGroupForQuery(query);
+
+ return groupInCluster.flatMap(group -> {
+ if(group.nodes().size() == 1) {
+ return Optional.of(group.nodes().get(0));
+ } else {
+ return Optional.empty();
+ }
+ }).map(node -> {
+ query.trace(false, 2, "Dispatching directly (anywhere) to ", node);
+ Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port(), Optional.of(node.key()));
+ return new CloseableChannel(backend);
+ });
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
new file mode 100644
index 00000000000..8e90eae0eb3
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/LoadBalancer.java
@@ -0,0 +1,27 @@
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.Query;
+import com.yahoo.search.dispatch.SearchCluster.Group;
+
+import java.util.Optional;
+
+public class LoadBalancer {
+
+ private final SearchCluster searchCluster;
+
+ public LoadBalancer(SearchCluster searchCluster) {
+ this.searchCluster = searchCluster;
+ }
+
+ public Optional<Group> getGroupForQuery(Query query) {
+ if (searchCluster.groups().size() == 1) {
+ for(Group group: searchCluster.groups().values()) {
+ // since the number of groups is 1, this will run only once
+ if(group.nodes().size() == 1) {
+ return Optional.of(group);
+ }
+ }
+ }
+ return Optional.empty();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
index 8107a50e8c0..67e032eca37 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
@@ -15,7 +15,6 @@ import com.yahoo.jrt.Values;
import com.yahoo.prelude.fastsearch.FastHit;
import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
/**
* A client which uses rpc request to search nodes to implement the Client API.
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
index efce2fdac9c..48ddba6c301 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
diff --git a/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java b/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java
index cb693d6801d..af54d28c2ac 100644
--- a/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java
+++ b/container-search/src/main/java/com/yahoo/search/yql/TypeCheckers.java
@@ -64,7 +64,8 @@ final class TypeCheckers {
TypeLiteral<?> arg = TypeLiteral.get(type.getActualTypeArguments()[0]);
if (OperatorNode.class.isAssignableFrom(arg.getRawType())) {
Preconditions.checkArgument(arg.getType() instanceof ParameterizedType, "Type spec must be List<OperatorNode<?>>");
- Class<? extends Operator> optype = (Class<? extends Operator>) TypeLiteral.get(((ParameterizedType) arg.getType()).getActualTypeArguments()[0]).getRawType();
+ Class<?> rawType = (Class<?>) TypeLiteral.get(((ParameterizedType) arg.getType()).getActualTypeArguments()[0]).getRawType();
+ Class<? extends Operator> optype = (Class<? extends Operator>) rawType;
return new OperatorNodeListTypeChecker(parent, idx, optype, ImmutableSet.<Operator>of());
} else {
return new JavaListTypeChecker(parent, idx, arg.getRawType());
diff --git a/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java b/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java
index 09a439c7bc9..18231785a26 100644
--- a/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java
+++ b/container-search/src/test/java/com/yahoo/prelude/hitfield/test/JSONStringTestCase.java
@@ -329,8 +329,8 @@ public class JSONStringTestCase {
String rendered = js.renderFromInspector();
assertTrue(-1 < rendered.indexOf(f1));
- int offsetF2;
- assertTrue(-1 < (offsetF2 = rendered.indexOf(f2)));
+ int offsetF2 = rendered.indexOf(f2);
+ assertTrue(-1 < offsetF2);
offsetF2 += f2.length();
assertTrue(-1 < rendered.indexOf(f2_1, offsetF2));
assertTrue(-1 < rendered.indexOf(f2_2, offsetF2));
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java
index 0191b1a799b..5e3e0dc301e 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java
@@ -2,7 +2,6 @@
package com.yahoo.search.dispatch;
import com.yahoo.compress.CompressionType;
-import com.yahoo.log.event.Collection;
import com.yahoo.prelude.fastsearch.DocsumDefinition;
import com.yahoo.prelude.fastsearch.DocsumDefinitionSet;
import com.yahoo.prelude.fastsearch.DocsumField;
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
new file mode 100644
index 00000000000..448a8d0e894
--- /dev/null
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/LoadBalancerTest.java
@@ -0,0 +1,64 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.dispatch.SearchCluster.Group;
+import com.yahoo.search.dispatch.SearchCluster.Node;
+import junit.framework.AssertionFailedError;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+public class LoadBalancerTest {
+ @Test
+ public void requreThatLoadBalancerServesSingleNodeSetups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1), null, 1, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ Group group = grp.orElseGet(() -> {
+ throw new AssertionFailedError("Expected a SearchCluster.Group");
+ });
+ assertThat(group.nodes().size(), Matchers.equalTo(1));
+ }
+
+ @Test
+ public void requreThatLoadBalancerIgnoresMultiGroupSetups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 1);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 1, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ assertThat(grp.isPresent(), is(false));
+ }
+
+ @Test
+ public void requreThatLoadBalancerIgnoresClusteredSingleGroup() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2), null, 2, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ assertThat(grp.isPresent(), is(false));
+ }
+
+ @Test
+ public void requreThatLoadBalancerIgnoresClusteredGroups() {
+ Node n1 = new SearchCluster.Node(0, "test-node1", 0, 0);
+ Node n2 = new SearchCluster.Node(1, "test-node2", 1, 0);
+ Node n3 = new SearchCluster.Node(0, "test-node3", 0, 1);
+ Node n4 = new SearchCluster.Node(1, "test-node4", 1, 1);
+ SearchCluster cluster = new SearchCluster(88.0, Arrays.asList(n1, n2, n3, n4), null, 2, null);
+ LoadBalancer lb = new LoadBalancer(cluster);
+
+ Optional<Group> grp = lb.getGroupForQuery(null);
+ assertThat(grp.isPresent(), is(false));
+ }
+}