summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@oath.com>2018-10-02 09:35:17 +0200
committerGitHub <noreply@github.com>2018-10-02 09:35:17 +0200
commit553250535e399607d3363fc38753f10d9f47a78b (patch)
tree95d299989ec5273a45794aa4b7c37b6e33ee8f28
parentb7c77e487c576455c809d207d0171fa1111764cd (diff)
parent9caaebede97014fd3427a63a765932c9fbede1a8 (diff)
Merge pull request #7150 from vespa-engine/ollivir/split-search-and-fill
Java dispatch: CloseableChannel split to search and fill invokers
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java2
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java (renamed from container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java)118
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java115
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java157
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java141
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Client.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java59
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java30
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java294
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java22
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java98
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java67
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java62
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java6
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java251
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java97
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java32
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java20
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java2
20 files changed, 968 insertions, 609 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java
index c9e771fe48c..07997f0c8f6 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java
@@ -50,7 +50,7 @@ public final class DocsumDefinitionSet {
this.emulationConfig = emulConfig;
}
- LegacyEmulationConfig legacyEmulationConfig() { return emulationConfig; }
+ public LegacyEmulationConfig legacyEmulationConfig() { return emulationConfig; }
/**
* Returns the summary definition of the given name, or the default if not found.
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
index 10a640c54c8..90eb0b611bf 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java
@@ -7,50 +7,38 @@ import com.yahoo.fs4.DocsumPacket;
import com.yahoo.fs4.GetDocSumsPacket;
import com.yahoo.fs4.Packet;
import com.yahoo.fs4.QueryPacket;
-import com.yahoo.fs4.QueryResultPacket;
import com.yahoo.fs4.mplex.Backend;
import com.yahoo.fs4.mplex.FS4Channel;
import com.yahoo.fs4.mplex.InvalidChannelException;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
-import com.yahoo.search.dispatch.CloseableChannel;
+import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.result.HitGroup;
import java.io.IOException;
import java.util.Iterator;
-import java.util.List;
-import java.util.Optional;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator;
-import static java.util.Arrays.asList;
/**
- * {@link CloseableChannel} implementation for FS4 nodes and fdispatch
+ * {@link FillInvoker} implementation for FS4 nodes and fdispatch
*
* @author ollivir
*/
-public class FS4CloseableChannel extends CloseableChannel {
+public class FS4FillInvoker extends FillInvoker {
private final VespaBackEndSearcher searcher;
private FS4Channel channel;
- private final Optional<Integer> distributionKey;
-
- private ErrorMessage pendingSearchError = null;
- private Query query = null;
- private QueryPacket queryPacket = null;
private int expectedFillResults = 0;
private CacheKey summaryCacheKey = null;
private DocsumPacketKey[] summaryPacketKeys = null;
- public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port,
+ public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port,
int distributionKey) {
this.searcher = searcher;
- this.distributionKey = Optional.of(distributionKey);
Backend backend = fs4ResourcePool.getBackend(hostname, port);
this.channel = backend.openChannel();
@@ -58,96 +46,14 @@ public class FS4CloseableChannel extends CloseableChannel {
}
// fdispatch code path
- public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, Backend backend) {
+ public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) {
this.searcher = searcher;
- this.distributionKey = Optional.empty();
this.channel = backend.openChannel();
channel.setQuery(query);
}
@Override
- protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
- if (isLoggingFine())
- getLogger().finest("sending query packet");
-
- if(queryPacket == null) {
- // query changed for subchannel
- queryPacket = searcher.createQueryPacket(query);
- }
-
- this.query = query;
- this.queryPacket = queryPacket;
-
- try {
- boolean couldSend = channel.sendPacket(queryPacket);
- if (!couldSend) {
- pendingSearchError = ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'");
- }
- } catch (InvalidChannelException e) {
- pendingSearchError = ErrorMessage.createBackendCommunicationError("Invalid channel " + getName());
- } catch (IllegalStateException e) {
- pendingSearchError = ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage());
- }
- }
-
- @Override
- protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
- if(pendingSearchError != null) {
- return asList(new Result(query, pendingSearchError));
- }
- BasicPacket[] basicPackets;
-
- try {
- basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
- } catch (ChannelTimeoutException e) {
- return asList(new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())));
- } catch (InvalidChannelException e) {
- return asList(new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())));
- }
-
- if (basicPackets.length == 0) {
- return asList(new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")));
- }
-
- if (isLoggingFine())
- getLogger().finest("got packets " + basicPackets.length + " packets");
-
- basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName());
- QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
-
- if (isLoggingFine())
- getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
-
- if (query.getPresentation().getSummary() == null)
- query.getPresentation().setSummary(searcher.getDefaultDocsumClass());
-
- Result result = new Result(query);
-
- searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result);
-
- searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey);
- Packet[] packets;
- CacheControl cacheControl = searcher.getCacheControl();
- PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query);
-
- if (packetWrapper != null) {
- cacheControl.updateCacheEntry(cacheKey, query, resultPacket);
- } else {
- if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) {
- // Don't add error here, it was done in first phase
- // No check if packetWrapper already exists, since incomplete
- // first phase data won't be cached anyway.
- } else {
- packets = new Packet[1];
- packets[0] = resultPacket;
- cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey);
- }
- }
- return asList(result);
- }
-
- @Override
- protected void sendPartialFillRequest(Result result, String summaryClass) {
+ protected void sendFillRequest(Result result, String summaryClass) {
summaryCacheKey = null;
if (searcher.getCacheControl().useCache(channel.getQuery())) {
summaryCacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass);
@@ -185,7 +91,7 @@ public class FS4CloseableChannel extends CloseableChannel {
@Override
- protected void getPartialFillResults(Result result, String summaryClass) {
+ protected void getFillResults(Result result, String summaryClass) {
if (expectedFillResults == 0) {
return;
}
@@ -248,7 +154,7 @@ public class FS4CloseableChannel extends CloseableChannel {
}
@Override
- public void closeChannel() {
+ public void release() {
if (channel != null) {
channel.close();
channel = null;
@@ -383,12 +289,4 @@ public class FS4CloseableChannel extends CloseableChannel {
private String getName() {
return searcher.getName();
}
-
- private Logger getLogger() {
- return searcher.getLogger();
- }
-
- private boolean isLoggingFine() {
- return getLogger().isLoggable(Level.FINE);
- }
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
new file mode 100644
index 00000000000..cec7fd2ce52
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java
@@ -0,0 +1,115 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.google.common.collect.ImmutableMap;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.CloseableInvoker;
+import com.yahoo.search.dispatch.FillInvoker;
+import com.yahoo.search.dispatch.InterleavedFillInvoker;
+import com.yahoo.search.dispatch.InterleavedSearchInvoker;
+import com.yahoo.search.dispatch.SearchCluster;
+import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.result.Hit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * FS4InvokerFactory constructs {@link FillInvoker} and {@link SearchInvoker} objects that communicate with
+ * content nodes or dispatchers over the fnet/FS4 protocol
+ *
+ * @author ollivir
+ */
+public class FS4InvokerFactory {
+ private final FS4ResourcePool fs4ResourcePool;
+ private final VespaBackEndSearcher searcher;
+ private final ImmutableMap<Integer, SearchCluster.Node> nodesByKey;
+
+ public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster, VespaBackEndSearcher searcher) {
+ this.fs4ResourcePool = fs4ResourcePool;
+ this.searcher = searcher;
+
+ ImmutableMap.Builder<Integer, SearchCluster.Node> builder = ImmutableMap.builder();
+ searchCluster.groups().values().forEach(group -> group.nodes().forEach(node -> builder.put(node.key(), node)));
+ this.nodesByKey = builder.build();
+ }
+
+ public SearchInvoker getSearchInvoker(Query query, SearchCluster.Node node) {
+ return new FS4SearchInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key());
+ }
+
+ public Optional<SearchInvoker> getSearchInvoker(Query query, SearchCluster.Group group) {
+ return getInvoker(group.nodes(), node -> getSearchInvoker(query, node), InterleavedSearchInvoker::new);
+ }
+
+ public FillInvoker getFillInvoker(Query query, SearchCluster.Node node) {
+ return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key());
+ }
+
+ public Optional<FillInvoker> getFillInvoker(Result result) {
+ Collection<Integer> requiredNodes = requiredFillNodes(result);
+ List<SearchCluster.Node> nodes = new ArrayList<>(requiredNodes.size());
+
+ for (Integer distKey : requiredNodes) {
+ SearchCluster.Node node = nodesByKey.get(distKey);
+ if (node == null) {
+ return Optional.empty();
+ }
+ nodes.add(node);
+ }
+
+ Query query = result.getQuery();
+ return getInvoker(nodes, node -> getFillInvoker(query, node), InterleavedFillInvoker::new);
+ }
+
+ private static Collection<Integer> requiredFillNodes(Result result) {
+ Set<Integer> requiredNodes = new HashSet<>();
+ for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
+ Hit h = i.next();
+ if (h instanceof FastHit) {
+ FastHit hit = (FastHit) h;
+ requiredNodes.add(hit.getDistributionKey());
+ }
+ }
+ return requiredNodes;
+ }
+
+ @FunctionalInterface
+ private interface InvokerConstructor<INVOKER> {
+ INVOKER construct(SearchCluster.Node node);
+ }
+
+ @FunctionalInterface
+ private interface ClusterInvokerConstructor<CLUSTERINVOKER extends INVOKER, INVOKER> {
+ CLUSTERINVOKER construct(Map<Integer, INVOKER> subinvokers);
+ }
+
+ private <INVOKER extends CloseableInvoker, CLUSTERINVOKER extends INVOKER> Optional<INVOKER> getInvoker(
+ Collection<SearchCluster.Node> nodes, InvokerConstructor<INVOKER> singleNodeCtor,
+ ClusterInvokerConstructor<CLUSTERINVOKER, INVOKER> clusterCtor) {
+ if (nodes.size() == 1) {
+ SearchCluster.Node node = nodes.iterator().next();
+ return Optional.of(singleNodeCtor.construct(node));
+ } else {
+ Map<Integer, INVOKER> nodeInvokers = new HashMap<>();
+ for (SearchCluster.Node node : nodes) {
+ if (node.isWorking()) {
+ nodeInvokers.put(node.key(), singleNodeCtor.construct(node));
+ }
+ }
+ if (nodeInvokers.size() == 1) {
+ return Optional.of(nodeInvokers.values().iterator().next());
+ } else {
+ return Optional.of(clusterCtor.construct(nodeInvokers));
+ }
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
new file mode 100644
index 00000000000..82f87fcac19
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java
@@ -0,0 +1,157 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.prelude.fastsearch;
+
+import com.yahoo.fs4.BasicPacket;
+import com.yahoo.fs4.ChannelTimeoutException;
+import com.yahoo.fs4.Packet;
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.fs4.QueryResultPacket;
+import com.yahoo.fs4.mplex.Backend;
+import com.yahoo.fs4.mplex.FS4Channel;
+import com.yahoo.fs4.mplex.InvalidChannelException;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.dispatch.SearchInvoker;
+import com.yahoo.search.result.ErrorMessage;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import static java.util.Arrays.asList;
+
+/**
+ * {@link SearchInvoker} implementation for FS4 nodes and fdispatch
+ *
+ * @author ollivir
+ */
+public class FS4SearchInvoker extends SearchInvoker {
+ private final VespaBackEndSearcher searcher;
+ private FS4Channel channel;
+ private final Optional<Integer> distributionKey;
+
+ private ErrorMessage pendingSearchError = null;
+ private Query query = null;
+ private QueryPacket queryPacket = null;
+
+ public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port,
+ int distributionKey) {
+ this.searcher = searcher;
+ this.distributionKey = Optional.of(distributionKey);
+
+ Backend backend = fs4ResourcePool.getBackend(hostname, port);
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ // fdispatch code path
+ public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) {
+ this.searcher = searcher;
+ this.distributionKey = Optional.empty();
+ this.channel = backend.openChannel();
+ channel.setQuery(query);
+ }
+
+ @Override
+ protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
+ if (isLoggingFine())
+ getLogger().finest("sending query packet");
+
+ if(queryPacket == null) {
+ // query changed for subchannel
+ queryPacket = searcher.createQueryPacket(query);
+ }
+
+ this.query = query;
+ this.queryPacket = queryPacket;
+
+ try {
+ boolean couldSend = channel.sendPacket(queryPacket);
+ if (!couldSend) {
+ pendingSearchError = ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'");
+ }
+ } catch (InvalidChannelException e) {
+ pendingSearchError = ErrorMessage.createBackendCommunicationError("Invalid channel " + getName());
+ } catch (IllegalStateException e) {
+ pendingSearchError = ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage());
+ }
+ }
+
+ @Override
+ protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
+ if(pendingSearchError != null) {
+ return asList(new Result(query, pendingSearchError));
+ }
+ BasicPacket[] basicPackets;
+
+ try {
+ basicPackets = channel.receivePackets(query.getTimeLeft(), 1);
+ } catch (ChannelTimeoutException e) {
+ return asList(new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())));
+ } catch (InvalidChannelException e) {
+ return asList(new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName())));
+ }
+
+ if (basicPackets.length == 0) {
+ return asList(new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back")));
+ }
+
+ if (isLoggingFine())
+ getLogger().finest("got packets " + basicPackets.length + " packets");
+
+ basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName());
+ QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0];
+
+ if (isLoggingFine())
+ getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary());
+
+ if (query.getPresentation().getSummary() == null)
+ query.getPresentation().setSummary(searcher.getDefaultDocsumClass());
+
+ Result result = new Result(query);
+
+ searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result);
+
+ searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey);
+ Packet[] packets;
+ CacheControl cacheControl = searcher.getCacheControl();
+ PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query);
+
+ if (packetWrapper != null) {
+ cacheControl.updateCacheEntry(cacheKey, query, resultPacket);
+ } else {
+ if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) {
+ // Don't add error here, it was done in first phase
+ // No check if packetWrapper already exists, since incomplete
+ // first phase data won't be cached anyway.
+ } else {
+ packets = new Packet[1];
+ packets[0] = resultPacket;
+ cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey);
+ }
+ }
+ return asList(result);
+ }
+
+ @Override
+ public void release() {
+ if (channel != null) {
+ channel.close();
+ channel = null;
+ }
+ }
+
+ private String getName() {
+ return searcher.getName();
+ }
+
+ private Logger getLogger() {
+ return searcher.getLogger();
+ }
+
+ private boolean isLoggingFine() {
+ return getLogger().isLoggable(Level.FINE);
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 9acf48a7c67..b429995460d 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -1,8 +1,6 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.fastsearch;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.container.search.LegacyEmulationConfig;
import com.yahoo.fs4.BasicPacket;
import com.yahoo.fs4.ChannelTimeoutException;
import com.yahoo.fs4.PingPacket;
@@ -17,9 +15,10 @@ import com.yahoo.prelude.querytransform.QueryRewrite;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
-import com.yahoo.search.dispatch.CloseableChannel;
import com.yahoo.search.dispatch.Dispatcher;
+import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.SearchCluster;
+import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.grouping.GroupingRequest;
import com.yahoo.search.grouping.request.GroupingOperation;
import com.yahoo.search.query.Ranking;
@@ -51,22 +50,12 @@ public class FastSearcher extends VespaBackEndSearcher {
/** If this is turned on this will make search queries directly to the local search node when possible */
private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct");
- /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */
- private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries");
-
- /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
- private final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
-
- /** If enabled, the dispatcher internal to the search container will be preferred over fdispatch
- * whenever possible */
- private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
-
/** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */
private final Dispatcher dispatcher;
private final Backend dispatchBackend;
- private final FS4ResourcePool fs4ResourcePool;
+ private final FS4InvokerFactory fs4InvokerFactory;
/**
* Creates a Fastsearcher.
@@ -89,8 +78,8 @@ public class FastSearcher extends VespaBackEndSearcher {
CacheParams cacheParams, DocumentdbInfoConfig documentdbInfoConfig) {
init(docSumParams, clusterParams, cacheParams, documentdbInfoConfig);
this.dispatchBackend = dispatchBackend;
- this.fs4ResourcePool = fs4ResourcePool;
this.dispatcher = dispatcher;
+ this.fs4InvokerFactory = new FS4InvokerFactory(fs4ResourcePool, dispatcher.searchCluster(), this);
}
/**
@@ -161,8 +150,8 @@ public class FastSearcher extends VespaBackEndSearcher {
public Result doSearch2(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) {
if (dispatcher.searchCluster().groupSize() == 1)
forceSinglePassGrouping(query);
- try(CloseableChannel channel = getChannel(query)) {
- List<Result> results = channel.search(query, queryPacket, cacheKey);
+ try(SearchInvoker invoker = getSearchInvoker(query)) {
+ List<Result> results = invoker.search(query, queryPacket, cacheKey);
Result result = mergeResults(results, query, execution);
if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) {
@@ -186,6 +175,25 @@ public class FastSearcher extends VespaBackEndSearcher {
}
}
+ /**
+ * Perform a partial docsum fill for a temporary result
+ * representing a partition of the complete fill request.
+ *
+ * @param result result containing a partition of the unfilled hits
+ * @param summaryClass the summary class we want to fill with
+ **/
+ @Override
+ protected void doPartialFill(Result result, String summaryClass) {
+ if (result.isFilled(summaryClass)) return;
+
+ Query query = result.getQuery();
+ traceQuery(getName(), "fill", query, query.getOffset(), query.getHits(), 1, quotedSummaryClass(summaryClass));
+
+ try (FillInvoker invoker = getFillInvoker(result)) {
+ invoker.fill(result, summaryClass);
+ }
+ }
+
/** When we only search a single node, doing all grouping in one pass is more efficient */
private void forceSinglePassGrouping(Query query) {
for (GroupingRequest groupingRequest : query.getSelect().getGrouping())
@@ -199,71 +207,60 @@ public class FastSearcher extends VespaBackEndSearcher {
}
/**
- * Returns a request interface object for the given query.
- * Normally this is built from the backend field of this instance, which connects to the dispatch node
- * this component talks to (which is why this instance was chosen by the cluster controller). However,
- * under certain conditions we will instead return an interface which connects directly to the relevant
- * search nodes.
+ * Returns an invocation object for use in a single search request. The specific implementation returned
+ * depends on query properties with the default being an invoker that interfaces with a dispatcher
+ * on the same host.
*/
- private CloseableChannel getChannel(Query query) {
- if (query.properties().getBoolean(dispatchInternal, false)) {
- Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(this, query);
- if (dispatchedChannel.isPresent()) {
- return dispatchedChannel.get();
- }
+ private SearchInvoker getSearchInvoker(Query query) {
+ Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, fs4InvokerFactory);
+ if (invoker.isPresent()) {
+ return invoker.get();
}
- if (!query.properties().getBoolean(dispatchDirect, true))
- return new FS4CloseableChannel(this, query, dispatchBackend);
- if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
- return new FS4CloseableChannel(this, query, dispatchBackend);
-
- Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
- if (!directDispatchRecipient.isPresent())
- return new FS4CloseableChannel(this, query, dispatchBackend);
- // Dispatch directly to the single, local search node
- SearchCluster.Node local = directDispatchRecipient.get();
- query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get());
- return new FS4CloseableChannel(this, query, fs4ResourcePool, local.hostname(), local.fs4port(), local.key());
+ Optional<SearchCluster.Node> direct = getDirectNode(query);
+ if(direct.isPresent()) {
+ return fs4InvokerFactory.getSearchInvoker(query, direct.get());
+ }
+ return new FS4SearchInvoker(this, query, dispatchBackend);
}
/**
- * Perform a partial docsum fill for a temporary result
- * representing a partition of the complete fill request.
- *
- * @param result result containing a partition of the unfilled hits
- * @param summaryClass the summary class we want to fill with
- **/
- @Override
- protected void doPartialFill(Result result, String summaryClass) {
- if (result.isFilled(summaryClass)) return;
-
+ * Returns an invocation object for use in a single fill request. The specific implementation returned
+ * depends on query properties with the default being an invoker that uses RPC to interface with
+ * content nodes.
+ */
+ private FillInvoker getFillInvoker(Result result) {
Query query = result.getQuery();
- traceQuery(getName(), "fill", query, query.getOffset(), query.getHits(), 1, quotedSummaryClass(summaryClass));
-
- if (query.properties().getBoolean(dispatchSummaries, true)
- && ! summaryNeedsQuery(query)
- && query.getRanking().getLocation() == null
- && ! cacheControl.useCache(query)
- && ! legacyEmulationConfigIsSet(getDocumentDatabase(query))) {
-
- CompressionType compression =
- CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
- dispatcher.fill(result, summaryClass, getDocumentDatabase(query), compression);
- return;
+ Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this, getDocumentDatabase(query), fs4InvokerFactory);
+ if (invoker.isPresent()) {
+ return invoker.get();
}
- try (CloseableChannel channel = getChannel(query)) {
- channel.partialFill(result, summaryClass);
+ Optional<SearchCluster.Node> direct = getDirectNode(query);
+ if (direct.isPresent()) {
+ return fs4InvokerFactory.getFillInvoker(query, direct.get());
}
+ return new FS4FillInvoker(this, query, dispatchBackend);
}
- private boolean legacyEmulationConfigIsSet(DocumentDatabase db) {
- LegacyEmulationConfig config = db.getDocsumDefinitionSet().legacyEmulationConfig();
- if (config.forceFillEmptyFields()) return true;
- if (config.stringBackedFeatureData()) return true;
- if (config.stringBackedStructuredData()) return true;
- return false;
+ /**
+ * If the query can be directed to a single local content node, returns that node. Otherwise,
+ * returns an empty value.
+ */
+ private Optional<SearchCluster.Node> getDirectNode(Query query) {
+ if (!query.properties().getBoolean(dispatchDirect, true))
+ return Optional.empty();
+ if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE))
+ return Optional.empty();
+
+ Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget();
+ if (!directDispatchRecipient.isPresent())
+ return Optional.empty();
+
+ // Dispatch directly to the single, local search node
+ SearchCluster.Node local = directDispatchRecipient.get();
+ query.trace(false, 2, "Dispatching directly to ", local);
+ return Optional.of(local);
}
private Result mergeResults(List<Result> results, Query query, Execution execution) {
@@ -289,8 +286,6 @@ public class FastSearcher extends VespaBackEndSearcher {
result.hits().trim(query.getOffset(), query.getHits());
}
- // TODO grouping
-
return result;
}
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
index 409d05e3aaf..42903dcfa90 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java
@@ -102,7 +102,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher {
* Returns whether we need to send the query when fetching summaries.
* This is necessary if the query requests summary features or dynamic snippeting
*/
- boolean summaryNeedsQuery(Query query) {
+ public boolean summaryNeedsQuery(Query query) {
if (query.getRanking().getQueryCache()) return false; // Query is cached in backend
DocumentDatabase documentDb = getDocumentDatabase(query);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java
index f2d7750ebe4..431b36c2623 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java
@@ -15,7 +15,7 @@ import java.util.Optional;
interface Client {
void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression,
- int uncompressedLength, byte[] compressedSlime, Dispatcher.GetDocsumsResponseReceiver responseReceiver,
+ int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
double timeoutSeconds);
/** Creates a connection to a particular node in this */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
deleted file mode 100644
index fc337d589ec..00000000000
--- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java
+++ /dev/null
@@ -1,59 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
-
-import com.yahoo.fs4.QueryPacket;
-import com.yahoo.prelude.fastsearch.CacheKey;
-import com.yahoo.search.Query;
-import com.yahoo.search.Result;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * CloseableChannel is an interface for running a search query and getting document summaries against some content node, node group or
- * dispatcher while abstracting the specifics of the invocation target. ClosebleChannel objects are stateful and should not be reused.
- *
- * @author ollivir
- */
-public abstract class CloseableChannel implements Closeable {
- /** Retrieve the hits for the given {@link Query}. The channel may return more than one result, in
- * which case the caller is responsible for merging the results. If multiple results are returned
- * and the search query had a hit offset other than zero, that offset will be set to zero and the
- * number of requested hits will be adjusted accordingly. */
- public List<Result> search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
- sendSearchRequest(query, queryPacket);
- return getSearchResults(cacheKey);
- }
-
- protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException;
-
- protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException;
-
- /** Retrieve document summaries for the unfilled hits in the given {@link Result} */
- public void partialFill(Result result, String summaryClass) {
- sendPartialFillRequest(result, summaryClass);
- getPartialFillResults(result, summaryClass);
- }
-
- protected abstract void getPartialFillResults(Result result, String summaryClass);
-
- protected abstract void sendPartialFillRequest(Result result, String summaryClass);
-
- protected abstract void closeChannel();
-
- private Runnable teardown = null;
-
- public void teardown(Runnable teardown) {
- this.teardown = teardown;
- }
-
- @Override
- public final void close() {
- if (teardown != null) {
- teardown.run();
- teardown = null;
- }
- closeChannel();
- }
-}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
new file mode 100644
index 00000000000..481940a33b7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java
@@ -0,0 +1,30 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import java.io.Closeable;
+
+/**
+ * CloseableInvoker is an abstract implementation of {@link Closeable} with an additional hook for
+ * executing code at closing. Classes that extend CloseableInvoker need to override {@link #release()}
+ * instead of {@link #close()} which is final to avoid accidental overriding.
+ *
+ * @author ollivir
+ */
+public abstract class CloseableInvoker implements Closeable {
+ protected abstract void release();
+
+ private Runnable teardown = null;
+
+ public void teardown(Runnable teardown) {
+ this.teardown = teardown;
+ }
+
+ @Override
+ public final void close() {
+ if (teardown != null) {
+ teardown.run();
+ teardown = null;
+ }
+ release();
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index ce0d48f5638..31e6070423d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -1,296 +1,94 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.google.common.collect.ImmutableMap;
-import com.yahoo.collections.ListMap;
import com.yahoo.component.AbstractComponent;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
import com.yahoo.container.handler.VipStatus;
-import com.yahoo.container.protect.Error;
-import com.yahoo.data.access.Inspector;
-import com.yahoo.data.access.slime.SlimeAdapter;
import com.yahoo.prelude.fastsearch.DocumentDatabase;
-import com.yahoo.prelude.fastsearch.FS4CloseableChannel;
+import com.yahoo.prelude.fastsearch.FS4InvokerFactory;
import com.yahoo.prelude.fastsearch.FS4ResourcePool;
-import com.yahoo.prelude.fastsearch.FastHit;
-import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
-import com.yahoo.search.query.SessionId;
-import com.yahoo.search.result.ErrorMessage;
-import com.yahoo.search.result.Hit;
-import com.yahoo.slime.ArrayTraverser;
-import com.yahoo.slime.BinaryFormat;
-import com.yahoo.slime.Cursor;
-import com.yahoo.slime.Slime;
import com.yahoo.vespa.config.search.DispatchConfig;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.Level;
-import java.util.logging.Logger;
/**
* A dispatcher communicates with search nodes to perform queries and fill hits.
*
- * This is currently not functionally complete: Queries can only be dispatched to a single node,
- * and summaries can only be requested when they do not need the query.
+ * This class allocates {@link SearchInvoker} and {@link FillInvoker} objects based
+ * on query properties and general system status. The caller can then use the provided
+ * invocation object to execute the search or fill.
*
* This class is multithread safe.
*
* @author bratseth
+ * @author ollvir
*/
public class Dispatcher extends AbstractComponent {
-
- private final static Logger log = Logger.getLogger(Dispatcher.class.getName());
- private final Client client;
+ /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */
+ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal");
/** A model of the search cluster this dispatches to */
private final SearchCluster searchCluster;
- /** Connections to the search nodes this talks to, indexed by node id ("partid") */
- private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections;
-
- private final Compressor compressor = new Compressor();
-
private final LoadBalancer loadBalancer;
- private final FS4ResourcePool fs4ResourcePool;
+ private final RpcResourcePool rpcResourcePool;
- public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool,
- int containerClusterSize, VipStatus vipStatus) {
- this.client = new RpcClient();
+ public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) {
this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus);
- this.fs4ResourcePool = fs4ResourcePool;
this.loadBalancer = new LoadBalancer(searchCluster);
-
- // Create node rpc connections, indexed by the node distribution key
- ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>();
- for (DispatchConfig.Node node : dispatchConfig.node()) {
- nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port()));
- }
- nodeConnections = nodeConnectionsBuilder.build();
+ this.rpcResourcePool = new RpcResourcePool(dispatchConfig);
}
/** For testing */
public Dispatcher(Map<Integer, Client.NodeConnection> nodeConnections, Client client) {
this.searchCluster = null;
- this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
- this.client = client;
- this.fs4ResourcePool = null;
this.loadBalancer = new LoadBalancer(searchCluster);
+ this.rpcResourcePool = new RpcResourcePool(client, nodeConnections);
}
/** Returns the search cluster this dispatches to */
- public SearchCluster searchCluster() { return searchCluster; }
-
- /** Fills the given summary class by sending RPC requests to the right search nodes */
- public void fill(Result result, String summaryClass, DocumentDatabase documentDb, CompressionType compression) {
- try {
- ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
-
- if (result.getQuery().getTraceLevel() >=3)
- result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3);
-
- GetDocsumsResponseReceiver responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), compressor, result);
- for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
- sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver);
- }
- responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb);
- result.hits().setSorted(false);
- result.analyzeHits();
- }
- catch (TimeoutException e) {
- result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
- }
- }
-
- /** Return a map of hits by their search node (partition) id */
- private static ListMap<Integer, FastHit> hitsByNode(Result result) {
- ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
- for (Iterator<Hit> i = result.hits().unorderedDeepIterator() ; i.hasNext(); ) {
- Hit h = i.next();
- if ( ! (h instanceof FastHit)) continue;
- FastHit hit = (FastHit)h;
-
- hitsByNode.put(hit.getDistributionKey(), hit);
- }
- return hitsByNode;
- }
-
- /** Send a getDocsums request to a node. Responses will be added to the given receiver. */
- private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass,
- CompressionType compression,
- Result result, GetDocsumsResponseReceiver responseReceiver) {
- Client.NodeConnection node = nodeConnections.get(nodeId);
- if (node == null) {
- result.hits().addError(ErrorMessage.createEmptyDocsums("Could not fill hits from unknown node " + nodeId));
- log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
- return;
- }
-
- Query query = result.getQuery();
- String rankProfile = query.getRanking().getProfile();
- byte[] serializedSlime = BinaryFormat.encode(toSlime(rankProfile, summaryClass,
- query.getModel().getDocumentDb(), query.getSessionId(false), hits));
- double timeoutSeconds = ((double)query.getTimeLeft()-3.0)/1000.0;
- Compressor.Compression compressionResult = compressor.compress(compression, serializedSlime);
- client.getDocsums(hits, node, compressionResult.type(),
- serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds);
- }
-
- static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) {
- Slime slime = new Slime();
- Cursor root = slime.setObject();
- if (summaryClass != null) {
- root.setString("class", summaryClass);
- }
- if (sessionId != null) {
- root.setData("sessionid", sessionId.asUtf8String().getBytes());
- }
- if (docType != null) {
- root.setString("doctype", docType);
- }
- if (rankProfile != null) {
- root.setString("ranking", rankProfile);
- }
- Cursor gids = root.setArray("gids");
- for (FastHit hit : hits) {
- gids.addData(hit.getGlobalId().getRawId());
- }
- return slime;
+ public SearchCluster searchCluster() {
+ return searchCluster;
}
@Override
public void deconstruct() {
- for (Client.NodeConnection nodeConnection : nodeConnections.values())
- nodeConnection.close();
+ rpcResourcePool.release();
}
- /** Receiver of the responses to a set of getDocsums requests */
- public static class GetDocsumsResponseReceiver {
-
- private final BlockingQueue<Client.GetDocsumsResponseOrError> responses;
- private final Compressor compressor;
- private final Result result;
-
- /** Whether we have already logged/notified about an error - to avoid spamming */
- private boolean hasReportedError = false;
-
- /** The number of responses we should receive (and process) before this is complete */
- private int outstandingResponses;
-
- public GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) {
- this.compressor = compressor;
- responses = new LinkedBlockingQueue<>(requestCount);
- outstandingResponses = requestCount;
- this.result = result;
- }
-
- /** Called by a thread belonging to the client when a valid response becomes available */
- public void receive(Client.GetDocsumsResponseOrError response) {
- responses.add(response);
- }
-
- private void throwTimeout() throws TimeoutException {
- throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding.");
- }
-
- /**
- * Call this from the dispatcher thread to initiate and complete processing of responses.
- * This will block until all responses are available and processed, or to timeout.
- */
- public void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException {
- try {
- int skippedHits = 0;
- while (outstandingResponses > 0) {
- long timeLeftMs = query.getTimeLeft();
- if (timeLeftMs <= 0) {
- throwTimeout();
- }
- Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
- if (response == null)
- throwTimeout();
- skippedHits += processResponse(response, summaryClass, documentDb);
- outstandingResponses--;
- }
- if (skippedHits != 0) {
- result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " +
- summaryClass + " for " + skippedHits + " hits"));
- }
- }
- catch (InterruptedException e) {
- // TODO: Add error
- }
- }
-
- private int processResponse(Client.GetDocsumsResponseOrError responseOrError,
- String summaryClass,
- DocumentDatabase documentDb) {
- if (responseOrError.error().isPresent()) {
- if (hasReportedError) return 0;
- String error = responseOrError.error().get();
- result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
- log.log(Level.WARNING, "Error fetching summary data: "+ error);
- }
- else {
- Client.GetDocsumsResponse response = responseOrError.response().get();
- CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize());
- return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes);
- }
- return 0;
- }
+ @FunctionalInterface
+ private interface SearchInvokerSupplier {
+ Optional<SearchInvoker> supply(Query query, SearchCluster.Group group);
+ }
- private void addErrors(com.yahoo.slime.Inspector errors) {
- errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> {
- int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString()))
- ? Error.TIMEOUT.code
- : Error.UNSPECIFIED.code;
- result.hits().addError(new ErrorMessage(errorCode,
- value.field("message").asString(), value.field("details").asString()));
- });
+ public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb,
+ FS4InvokerFactory fs4InvokerFactory) {
+ Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb);
+ if (rpcInvoker.isPresent()) {
+ return rpcInvoker;
}
-
- private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) {
- com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get();
- com.yahoo.slime.Inspector errors = root.field("errors");
- boolean hasErrors = errors.valid() && (errors.entries() > 0);
- if (hasErrors) {
- addErrors(errors);
+ if (result.getQuery().properties().getBoolean(dispatchInternal, false)) {
+ Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.getFillInvoker(result);
+ if (fs4Invoker.isPresent()) {
+ return fs4Invoker;
}
-
- Inspector summaries = new SlimeAdapter(root.field("docsums"));
- if ( ! summaries.valid())
- return 0; // No summaries; Perhaps we requested a non-existing summary class
- int skippedHits = 0;
- for (int i = 0; i < hits.size(); i++) {
- Inspector summary = summaries.entry(i).field("docsum");
- if (summary.fieldCount() != 0) {
- hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName());
- hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary);
- hits.get(i).setFilled(summaryClass);
- } else {
- skippedHits++;
- }
- }
- return skippedHits;
}
-
+ return Optional.empty();
}
- public Optional<CloseableChannel> getDispatchedChannel(VespaBackEndSearcher searcher, Query query) {
- if (!query.getSelect().getGrouping().isEmpty()) {
- return Optional.empty();
+ public Optional<SearchInvoker> getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) {
+ if (query.properties().getBoolean(dispatchInternal, false)) {
+ Optional<SearchInvoker> invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker);
+ return invoker;
}
+ return Optional.empty();
+ }
+ private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) {
Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query);
if (!groupInCluster.isPresent()) {
return Optional.empty();
@@ -298,20 +96,12 @@ public class Dispatcher extends AbstractComponent {
SearchCluster.Group group = groupInCluster.get();
query.trace(false, 2, "Dispatching internally to ", group);
- if (group.nodes().size() == 1) {
- SearchCluster.Node node = group.nodes().iterator().next();
- CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(),
- node.key());
- return Optional.of(channel);
+ Optional<SearchInvoker> invoker = invokerFactory.supply(query, group);
+ if (invoker.isPresent()) {
+ invoker.get().teardown(() -> loadBalancer.releaseGroup(group));
} else {
- query.setNoCache(true); // Note - multi-node request disables packet based caching
-
- Map<Integer, CloseableChannel> subchannels = new HashMap<>();
- for (SearchCluster.Node node : group.nodes()) {
- subchannels.put(node.key(), new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()));
- }
- CloseableChannel multinode = new InterleavedCloseableChannel(subchannels);
- return Optional.of(multinode);
+ loadBalancer.releaseGroup(group);
}
+ return invoker;
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java
new file mode 100644
index 00000000000..dd4c4494ac5
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java
@@ -0,0 +1,22 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.search.Result;
+
+/**
+ * FillInvoker encapsulates an allocated connection for running a document summary retrieval.
+ * The invocation object can be stateful and should not be reused.
+ *
+ * @author ollivir
+ */
+public abstract class FillInvoker extends CloseableInvoker {
+ /** Retrieve document summaries for the unfilled hits in the given {@link Result} */
+ public void fill(Result result, String summaryClass) {
+ sendFillRequest(result, summaryClass);
+ getFillResults(result, summaryClass);
+ }
+
+ protected abstract void getFillResults(Result result, String summaryClass);
+
+ protected abstract void sendFillRequest(Result result, String summaryClass);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java
deleted file mode 100644
index e461f6fc725..00000000000
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java
+++ /dev/null
@@ -1,98 +0,0 @@
-// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.search.dispatch;
-
-import com.yahoo.fs4.QueryPacket;
-import com.yahoo.prelude.fastsearch.CacheKey;
-import com.yahoo.prelude.fastsearch.FastHit;
-import com.yahoo.search.Query;
-import com.yahoo.search.Result;
-import com.yahoo.search.result.Hit;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * InterleavedCloseableChannel uses multiple {@link CloseableChannel} objects to interface with
- * content nodes in parallel. Operationally it first sends requests to all channels and then
- * collects the results. The invoker of this class is responsible for merging the results if
- * needed.
- *
- * @author ollivir
- */
-public class InterleavedCloseableChannel extends CloseableChannel {
- private final Map<Integer, CloseableChannel> subchannels;
- private Map<Integer, Result> expectedFillResults = null;
-
- public InterleavedCloseableChannel(Map<Integer, CloseableChannel> subchannels) {
- this.subchannels = subchannels;
- }
-
- /** Sends search queries to the contained {@link CloseableChannel} subchannels. If the
- * search query has an offset other than zero, it will be reset to zero and the expected
- * hit amount will be adjusted accordingly. */
- @Override
- protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
- for (CloseableChannel subchannel : subchannels.values()) {
- Query subquery = query.clone();
-
- subquery.setHits(subquery.getHits() + subquery.getOffset());
- subquery.setOffset(0);
- subchannel.sendSearchRequest(subquery, null);
- }
- }
-
- @Override
- protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
- List<Result> results = new ArrayList<>();
-
- for (CloseableChannel subchannel : subchannels.values()) {
- results.addAll(subchannel.getSearchResults(cacheKey));
- }
- return results;
- }
-
- @Override
- protected void sendPartialFillRequest(Result result, String summaryClass) {
- expectedFillResults = new HashMap<>();
-
- for (Iterator<Hit> it = result.hits().deepIterator(); it.hasNext();) {
- Hit hit = it.next();
- if (hit instanceof FastHit) {
- FastHit fhit = (FastHit) hit;
- Result res = expectedFillResults.computeIfAbsent(fhit.getDistributionKey(), dk -> new Result(result.getQuery()));
- res.hits().add(fhit);
- }
- }
- expectedFillResults.forEach((distKey, partialResult) -> {
- CloseableChannel channel = subchannels.get(distKey);
- if (channel != null) {
- channel.sendPartialFillRequest(partialResult, summaryClass);
- }
- });
- }
-
- @Override
- protected void getPartialFillResults(Result result, String summaryClass) {
- if (expectedFillResults == null) {
- return;
- }
- expectedFillResults.forEach((distKey, partialResult) -> {
- CloseableChannel channel = subchannels.get(distKey);
- if (channel != null) {
- channel.getPartialFillResults(partialResult, summaryClass);
- }
- });
- }
-
- @Override
- protected void closeChannel() {
- if (!subchannels.isEmpty()) {
- subchannels.values().forEach(CloseableChannel::close);
- subchannels.clear();
- }
- }
-}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java
new file mode 100644
index 00000000000..644e6f17bdb
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java
@@ -0,0 +1,67 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.search.Result;
+import com.yahoo.search.result.Hit;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * InterleavedFillInvoker uses multiple {@link FillInvoker} objects to interface with content
+ * nodes in parallel. Operationally it first sends requests with all contained invokers and then
+ * collects the results.
+ *
+ * @author ollivir
+ */
+public class InterleavedFillInvoker extends FillInvoker {
+ private final Map<Integer, FillInvoker> invokers;
+ private Map<Integer, Result> expectedFillResults = null;
+
+ public InterleavedFillInvoker(Map<Integer, FillInvoker> invokers) {
+ this.invokers = invokers;
+ }
+
+ @Override
+ protected void sendFillRequest(Result result, String summaryClass) {
+ expectedFillResults = new HashMap<>();
+
+ for (Iterator<Hit> it = result.hits().deepIterator(); it.hasNext();) {
+ Hit hit = it.next();
+ if (hit instanceof FastHit) {
+ FastHit fhit = (FastHit) hit;
+ Result res = expectedFillResults.computeIfAbsent(fhit.getDistributionKey(), dk -> new Result(result.getQuery()));
+ res.hits().add(fhit);
+ }
+ }
+ expectedFillResults.forEach((distKey, partialResult) -> {
+ FillInvoker invoker = invokers.get(distKey);
+ if (invoker != null) {
+ invoker.sendFillRequest(partialResult, summaryClass);
+ }
+ });
+ }
+
+ @Override
+ protected void getFillResults(Result result, String summaryClass) {
+ if (expectedFillResults == null) {
+ return;
+ }
+ expectedFillResults.forEach((distKey, partialResult) -> {
+ FillInvoker invoker = invokers.get(distKey);
+ if (invoker != null) {
+ invoker.getFillResults(partialResult, summaryClass);
+ }
+ });
+ }
+
+ @Override
+ protected void release() {
+ if (!invokers.isEmpty()) {
+ invokers.values().forEach(FillInvoker::close);
+ invokers.clear();
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
new file mode 100644
index 00000000000..d80f3a49213
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -0,0 +1,62 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.prelude.fastsearch.CacheKey;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content
+ * nodes in parallel. Operationally it first sends requests to all contained invokers and then
+ * collects the results. The user of this class is responsible for merging the results if needed.
+ *
+ * @author ollivir
+ */
+public class InterleavedSearchInvoker extends SearchInvoker {
+ private final Collection<SearchInvoker> invokers;
+
+ public InterleavedSearchInvoker(Map<Integer, SearchInvoker> invokers) {
+ this.invokers = new ArrayList<>(invokers.values());
+ }
+
+ /**
+ * Sends search queries to the contained {@link SearchInvoker} sub-invokers. If the search
+ * query has an offset other than zero, it will be reset to zero and the expected hit amount
+ * will be adjusted accordingly.
+ */
+ @Override
+ protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException {
+ for (SearchInvoker invoker : invokers) {
+ Query subquery = query.clone();
+
+ subquery.setHits(subquery.getHits() + subquery.getOffset());
+ subquery.setOffset(0);
+ invoker.sendSearchRequest(subquery, null);
+ }
+ }
+
+ @Override
+ protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException {
+ List<Result> results = new ArrayList<>();
+
+ for (SearchInvoker invoker : invokers) {
+ results.addAll(invoker.getSearchResults(cacheKey));
+ }
+ return results;
+ }
+
+ @Override
+ protected void release() {
+ if (!invokers.isEmpty()) {
+ invokers.forEach(SearchInvoker::close);
+ invokers.clear();
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
index 67e032eca37..2a4767bc389 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java
@@ -32,7 +32,7 @@ class RpcClient implements Client {
@Override
public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength,
- byte[] compressedSlime, Dispatcher.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
+ byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) {
Request request = new Request("proton.getDocsums");
request.parameters().add(new Int8Value(compression.getCode()));
request.parameters().add(new Int32Value(uncompressedLength));
@@ -89,9 +89,9 @@ class RpcClient implements Client {
private final RpcNodeConnection node;
/** The handler to which the response is forwarded */
- private final Dispatcher.GetDocsumsResponseReceiver handler;
+ private final RpcFillInvoker.GetDocsumsResponseReceiver handler;
- public RpcResponseWaiter(RpcNodeConnection node, Dispatcher.GetDocsumsResponseReceiver handler) {
+ public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) {
this.node = node;
this.handler = handler;
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java
new file mode 100644
index 00000000000..53f6015ad2e
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java
@@ -0,0 +1,251 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.collections.ListMap;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.container.protect.Error;
+import com.yahoo.data.access.Inspector;
+import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.FastHit;
+import com.yahoo.prelude.fastsearch.TimeoutException;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.query.SessionId;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.result.Hit;
+import com.yahoo.slime.ArrayTraverser;
+import com.yahoo.slime.BinaryFormat;
+import com.yahoo.slime.Cursor;
+import com.yahoo.slime.Slime;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * {@link FillInvoker} implementation using RPC
+ *
+ * @author bratseth
+ * @author ollivir
+ */
+public class RpcFillInvoker extends FillInvoker {
+ private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName());
+
+ private final DocumentDatabase documentDb;
+ private final RpcResourcePool resourcePool;
+
+ private GetDocsumsResponseReceiver responseReceiver;
+
+
+ public RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) {
+ this.documentDb = documentDb;
+ this.resourcePool = resourcePool;
+ }
+
+ @Override
+ protected void sendFillRequest(Result result, String summaryClass) {
+ ListMap<Integer, FastHit> hitsByNode = hitsByNode(result);
+
+ CompressionType compression = CompressionType
+ .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase());
+
+ if (result.getQuery().getTraceLevel() >= 3)
+ result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3);
+
+ responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), resourcePool.compressor(), result);
+ for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) {
+ sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver);
+ }
+ }
+
+ @Override
+ protected void getFillResults(Result result, String summaryClass) {
+ try {
+ responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb);
+ result.hits().setSorted(false);
+ result.analyzeHits();
+ } catch (TimeoutException e) {
+ result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage()));
+ }
+ }
+
+ @Override
+ protected void release() {
+ // nothing to release
+ }
+
+ /** Return a map of hits by their search node (partition) id */
+ private static ListMap<Integer, FastHit> hitsByNode(Result result) {
+ ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
+ for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
+ Hit h = i.next();
+ if (!(h instanceof FastHit))
+ continue;
+ FastHit hit = (FastHit) h;
+
+ hitsByNode.put(hit.getDistributionKey(), hit);
+ }
+ return hitsByNode;
+ }
+
+ /** Send a getDocsums request to a node. Responses will be added to the given receiver. */
+ private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression, Result result,
+ GetDocsumsResponseReceiver responseReceiver) {
+ Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId);
+ if (node == null) {
+ result.hits().addError(ErrorMessage.createEmptyDocsums("Could not fill hits from unknown node " + nodeId));
+ log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config");
+ return;
+ }
+
+ Query query = result.getQuery();
+ String rankProfile = query.getRanking().getProfile();
+ byte[] serializedSlime = BinaryFormat
+ .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(false), hits));
+ double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0;
+ Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime);
+ resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(),
+ responseReceiver, timeoutSeconds);
+ }
+
+ static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ if (summaryClass != null) {
+ root.setString("class", summaryClass);
+ }
+ if (sessionId != null) {
+ root.setData("sessionid", sessionId.asUtf8String().getBytes());
+ }
+ if (docType != null) {
+ root.setString("doctype", docType);
+ }
+ if (rankProfile != null) {
+ root.setString("ranking", rankProfile);
+ }
+ Cursor gids = root.setArray("gids");
+ for (FastHit hit : hits) {
+ gids.addData(hit.getGlobalId().getRawId());
+ }
+ return slime;
+ }
+
+ /** Receiver of the responses to a set of getDocsums requests */
+ public static class GetDocsumsResponseReceiver {
+
+ private final BlockingQueue<Client.GetDocsumsResponseOrError> responses;
+ private final Compressor compressor;
+ private final Result result;
+
+ /** Whether we have already logged/notified about an error - to avoid spamming */
+ private boolean hasReportedError = false;
+
+ /** The number of responses we should receive (and process) before this is complete */
+ private int outstandingResponses;
+
+ public GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) {
+ this.compressor = compressor;
+ responses = new LinkedBlockingQueue<>(requestCount);
+ outstandingResponses = requestCount;
+ this.result = result;
+ }
+
+ /** Called by a thread belonging to the client when a valid response becomes available */
+ public void receive(Client.GetDocsumsResponseOrError response) {
+ responses.add(response);
+ }
+
+ private void throwTimeout() throws TimeoutException {
+ throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding.");
+ }
+
+ /**
+ * Call this from the dispatcher thread to initiate and complete processing of responses.
+ * This will block until all responses are available and processed, or to timeout.
+ */
+ public void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException {
+ try {
+ int skippedHits = 0;
+ while (outstandingResponses > 0) {
+ long timeLeftMs = query.getTimeLeft();
+ if (timeLeftMs <= 0) {
+ throwTimeout();
+ }
+ Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS);
+ if (response == null)
+ throwTimeout();
+ skippedHits += processResponse(response, summaryClass, documentDb);
+ outstandingResponses--;
+ }
+ if (skippedHits != 0) {
+ result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " +
+ summaryClass + " for " + skippedHits + " hits"));
+ }
+ }
+ catch (InterruptedException e) {
+ // TODO: Add error
+ }
+ }
+
+ private int processResponse(Client.GetDocsumsResponseOrError responseOrError,
+ String summaryClass,
+ DocumentDatabase documentDb) {
+ if (responseOrError.error().isPresent()) {
+ if (hasReportedError) return 0;
+ String error = responseOrError.error().get();
+ result.hits().addError(ErrorMessage.createBackendCommunicationError(error));
+ log.log(Level.WARNING, "Error fetching summary data: "+ error);
+ }
+ else {
+ Client.GetDocsumsResponse response = responseOrError.response().get();
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize());
+ return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes);
+ }
+ return 0;
+ }
+
+ private void addErrors(com.yahoo.slime.Inspector errors) {
+ errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> {
+ int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString()))
+ ? Error.TIMEOUT.code
+ : Error.UNSPECIFIED.code;
+ result.hits().addError(new ErrorMessage(errorCode,
+ value.field("message").asString(), value.field("details").asString()));
+ });
+ }
+
+ private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) {
+ com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get();
+ com.yahoo.slime.Inspector errors = root.field("errors");
+ boolean hasErrors = errors.valid() && (errors.entries() > 0);
+ if (hasErrors) {
+ addErrors(errors);
+ }
+
+ Inspector summaries = new SlimeAdapter(root.field("docsums"));
+ if ( ! summaries.valid())
+ return 0; // No summaries; Perhaps we requested a non-existing summary class
+ int skippedHits = 0;
+ for (int i = 0; i < hits.size(); i++) {
+ Inspector summary = summaries.entry(i).field("docsum");
+ if (summary.fieldCount() != 0) {
+ hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName());
+ hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary);
+ hits.get(i).setFilled(summaryClass);
+ } else {
+ skippedHits++;
+ }
+ }
+ return skippedHits;
+ }
+
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java
new file mode 100644
index 00000000000..8ab80ec17dd
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java
@@ -0,0 +1,97 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.google.common.collect.ImmutableMap;
+import com.yahoo.compress.Compressor;
+import com.yahoo.container.search.LegacyEmulationConfig;
+import com.yahoo.prelude.fastsearch.DocumentDatabase;
+import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+import com.yahoo.vespa.config.search.DispatchConfig;
+
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains
+ * the RPC connection pool.
+ *
+ * @author ollivir
+ */
+public class RpcResourcePool {
+ /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
+ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
+
+ /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */
+ private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries");
+
+ private final Compressor compressor = new Compressor();
+ private final Client client;
+
+ /** Connections to the search nodes this talks to, indexed by node id ("partid") */
+ private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections;
+
+ public RpcResourcePool(Client client, Map<Integer, Client.NodeConnection> nodeConnections) {
+ this.client = client;
+ this.nodeConnections = ImmutableMap.copyOf(nodeConnections);
+ }
+
+ public RpcResourcePool(DispatchConfig dispatchConfig) {
+ this.client = new RpcClient();
+
+ // Create node rpc connections, indexed by the node distribution key
+ ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>();
+ for (DispatchConfig.Node node : dispatchConfig.node()) {
+ nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port()));
+ }
+ this.nodeConnections = nodeConnectionsBuilder.build();
+ }
+
+ public Optional<FillInvoker> getFillInvoker(Query query, VespaBackEndSearcher searcher, DocumentDatabase documentDb) {
+ if (query.properties().getBoolean(dispatchSummaries, true)
+ && ! searcher.summaryNeedsQuery(query)
+ && query.getRanking().getLocation() == null
+ && ! searcher.getCacheControl().useCache(query)
+ && ! legacyEmulationConfigIsSet(documentDb)) {
+
+ return Optional.of(new RpcFillInvoker(this, documentDb));
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ // for testing
+ public FillInvoker getFillInvoker(DocumentDatabase documentDb) {
+ return new RpcFillInvoker(this, documentDb);
+ }
+
+ private boolean legacyEmulationConfigIsSet(DocumentDatabase db) {
+ LegacyEmulationConfig config = db.getDocsumDefinitionSet().legacyEmulationConfig();
+ if (config.forceFillEmptyFields())
+ return true;
+ if (config.stringBackedFeatureData())
+ return true;
+ if (config.stringBackedStructuredData())
+ return true;
+ return false;
+ }
+
+ public Compressor compressor() {
+ return compressor;
+ }
+
+ public Client client() {
+ return client;
+ }
+
+ public ImmutableMap<Integer, Client.NodeConnection> nodeConnections() {
+ return nodeConnections;
+ }
+
+ public void release() {
+ for (Client.NodeConnection nodeConnection : nodeConnections.values()) {
+ nodeConnection.close();
+ }
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
new file mode 100644
index 00000000000..53e09823f32
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java
@@ -0,0 +1,32 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch;
+
+import com.yahoo.fs4.QueryPacket;
+import com.yahoo.prelude.fastsearch.CacheKey;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * SearchInvoker encapsulates an allocated connection for running a single search query.
+ * The invocation object can be stateful and should not be reused.
+ *
+ * @author ollivir
+ */
+public abstract class SearchInvoker extends CloseableInvoker {
+ /**
+ * Retrieve the hits for the given {@link Query}. The invoker may return more than one result, in which case the caller is responsible
+ * for merging the results. If multiple results are returned and the search query had a hit offset other than zero, that offset is
+ * set to zero and the number of requested hits is adjusted accordingly.
+ */
+ public List<Result> search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException {
+ sendSearchRequest(query, queryPacket);
+ return getSearchResults(cacheKey);
+ }
+
+ protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException;
+
+ protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException;
+}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java
index 5e3e0dc301e..f77a4d092ee 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java
@@ -1,7 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
-import com.yahoo.compress.CompressionType;
import com.yahoo.prelude.fastsearch.DocsumDefinition;
import com.yahoo.prelude.fastsearch.DocsumDefinitionSet;
import com.yahoo.prelude.fastsearch.DocsumField;
@@ -10,9 +9,6 @@ import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-
import java.util.ArrayList;
import java.util.Collections;
@@ -20,6 +16,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
/**
* Tests using a dispatcher to fill a result
@@ -36,7 +35,7 @@ public class FillTestCase {
nodes.put(0, client.createConnection("host0", 123));
nodes.put(1, client.createConnection("host1", 123));
nodes.put(2, client.createConnection("host2", 123));
- Dispatcher dispatcher = new Dispatcher(nodes, client);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
Query query = new Query();
Result result = new Result(query);
@@ -52,7 +51,7 @@ public class FillTestCase {
client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3));
client.setDocsumReponse("host0", 4, "summaryClass1", map("field1", "s.0.4", "field2", 4));
- dispatcher.fill(result, "summaryClass1", db(), CompressionType.valueOf("LZ4"));
+ rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1");
assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString());
assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString());
@@ -72,7 +71,7 @@ public class FillTestCase {
nodes.put(0, client.createConnection("host0", 123));
nodes.put(1, client.createConnection("host1", 123));
nodes.put(2, client.createConnection("host2", 123));
- Dispatcher dispatcher = new Dispatcher(nodes, client);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
Query query = new Query();
Result result = new Result(query);
@@ -87,7 +86,8 @@ public class FillTestCase {
client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>());
client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3));
client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>());
- dispatcher.fill(result, "summaryClass1", db(), CompressionType.valueOf("LZ4"));
+
+ rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1");
assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString());
assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString());
@@ -110,13 +110,13 @@ public class FillTestCase {
Map<Integer, Client.NodeConnection> nodes = new HashMap<>();
nodes.put(0, client.createConnection("host0", 123));
- Dispatcher dispatcher = new Dispatcher(nodes, client);
+ RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes);
Query query = new Query();
Result result = new Result(query);
result.hits().add(createHit(0, 0));
- dispatcher.fill(result, "summaryClass1", db(), CompressionType.valueOf("LZ4"));
+ rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1");
assertEquals("Malfunctioning", result.hits().getError().getDetailedMessage());
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java
index 4cac6293ae3..a4cb8ae641c 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java
@@ -36,7 +36,7 @@ public class MockClient implements Client {
@Override
public void getDocsums(List<FastHit> hitsContext, NodeConnection node, CompressionType compression,
- int uncompressedSize, byte[] compressedSlime, Dispatcher.GetDocsumsResponseReceiver responseReceiver,
+ int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver,
double timeoutSeconds) {
if (malfunctioning) {
responseReceiver.receive(GetDocsumsResponseOrError.fromError("Malfunctioning"));