diff options
author | Jon Bratseth <bratseth@oath.com> | 2018-10-02 09:35:17 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-02 09:35:17 +0200 |
commit | 553250535e399607d3363fc38753f10d9f47a78b (patch) | |
tree | 95d299989ec5273a45794aa4b7c37b6e33ee8f28 /container-search | |
parent | b7c77e487c576455c809d207d0171fa1111764cd (diff) | |
parent | 9caaebede97014fd3427a63a765932c9fbede1a8 (diff) |
Merge pull request #7150 from vespa-engine/ollivir/split-search-and-fill
Java dispatch: CloseableChannel split to search and fill invokers
Diffstat (limited to 'container-search')
20 files changed, 968 insertions, 609 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java index c9e771fe48c..07997f0c8f6 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/DocsumDefinitionSet.java @@ -50,7 +50,7 @@ public final class DocsumDefinitionSet { this.emulationConfig = emulConfig; } - LegacyEmulationConfig legacyEmulationConfig() { return emulationConfig; } + public LegacyEmulationConfig legacyEmulationConfig() { return emulationConfig; } /** * Returns the summary definition of the given name, or the default if not found. diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java index 10a640c54c8..90eb0b611bf 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4CloseableChannel.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4FillInvoker.java @@ -7,50 +7,38 @@ import com.yahoo.fs4.DocsumPacket; import com.yahoo.fs4.GetDocSumsPacket; import com.yahoo.fs4.Packet; import com.yahoo.fs4.QueryPacket; -import com.yahoo.fs4.QueryResultPacket; import com.yahoo.fs4.mplex.Backend; import com.yahoo.fs4.mplex.FS4Channel; import com.yahoo.fs4.mplex.InvalidChannelException; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher.FillHitsResult; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.CloseableChannel; +import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import com.yahoo.search.result.HitGroup; import java.io.IOException; import java.util.Iterator; -import java.util.List; -import java.util.Optional; -import java.util.logging.Level; -import java.util.logging.Logger; import static com.yahoo.prelude.fastsearch.VespaBackEndSearcher.hitIterator; -import static java.util.Arrays.asList; /** - * {@link CloseableChannel} implementation for FS4 nodes and fdispatch + * {@link FillInvoker} implementation for FS4 nodes and fdispatch * * @author ollivir */ -public class FS4CloseableChannel extends CloseableChannel { +public class FS4FillInvoker extends FillInvoker { private final VespaBackEndSearcher searcher; private FS4Channel channel; - private final Optional<Integer> distributionKey; - - private ErrorMessage pendingSearchError = null; - private Query query = null; - private QueryPacket queryPacket = null; private int expectedFillResults = 0; private CacheKey summaryCacheKey = null; private DocsumPacketKey[] summaryPacketKeys = null; - public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port, + public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port, int distributionKey) { this.searcher = searcher; - this.distributionKey = Optional.of(distributionKey); Backend backend = fs4ResourcePool.getBackend(hostname, port); this.channel = backend.openChannel(); @@ -58,96 +46,14 @@ public class FS4CloseableChannel extends CloseableChannel { } // fdispatch code path - public FS4CloseableChannel(VespaBackEndSearcher searcher, Query query, Backend backend) { + public FS4FillInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { this.searcher = searcher; - this.distributionKey = Optional.empty(); this.channel = backend.openChannel(); channel.setQuery(query); } @Override - protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { - if (isLoggingFine()) - getLogger().finest("sending query packet"); - - if(queryPacket == null) { - // query changed for subchannel - queryPacket = searcher.createQueryPacket(query); - } - - this.query = query; - this.queryPacket = queryPacket; - - try { - boolean couldSend = channel.sendPacket(queryPacket); - if (!couldSend) { - pendingSearchError = ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"); - } - } catch (InvalidChannelException e) { - pendingSearchError = ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()); - } catch (IllegalStateException e) { - pendingSearchError = ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()); - } - } - - @Override - protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { - if(pendingSearchError != null) { - return asList(new Result(query, pendingSearchError)); - } - BasicPacket[] basicPackets; - - try { - basicPackets = channel.receivePackets(query.getTimeLeft(), 1); - } catch (ChannelTimeoutException e) { - return asList(new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()))); - } catch (InvalidChannelException e) { - return asList(new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()))); - } - - if (basicPackets.length == 0) { - return asList(new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"))); - } - - if (isLoggingFine()) - getLogger().finest("got packets " + basicPackets.length + " packets"); - - basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); - QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - - if (isLoggingFine()) - getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); - - if (query.getPresentation().getSummary() == null) - query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); - - Result result = new Result(query); - - searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); - - searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey); - Packet[] packets; - CacheControl cacheControl = searcher.getCacheControl(); - PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); - - if (packetWrapper != null) { - cacheControl.updateCacheEntry(cacheKey, query, resultPacket); - } else { - if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) { - // Don't add error here, it was done in first phase - // No check if packetWrapper already exists, since incomplete - // first phase data won't be cached anyway. - } else { - packets = new Packet[1]; - packets[0] = resultPacket; - cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey); - } - } - return asList(result); - } - - @Override - protected void sendPartialFillRequest(Result result, String summaryClass) { + protected void sendFillRequest(Result result, String summaryClass) { summaryCacheKey = null; if (searcher.getCacheControl().useCache(channel.getQuery())) { summaryCacheKey = fetchCacheKeyFromHits(result.hits(), summaryClass); @@ -185,7 +91,7 @@ public class FS4CloseableChannel extends CloseableChannel { @Override - protected void getPartialFillResults(Result result, String summaryClass) { + protected void getFillResults(Result result, String summaryClass) { if (expectedFillResults == 0) { return; } @@ -248,7 +154,7 @@ public class FS4CloseableChannel extends CloseableChannel { } @Override - public void closeChannel() { + public void release() { if (channel != null) { channel.close(); channel = null; @@ -383,12 +289,4 @@ public class FS4CloseableChannel extends CloseableChannel { private String getName() { return searcher.getName(); } - - private Logger getLogger() { - return searcher.getLogger(); - } - - private boolean isLoggingFine() { - return getLogger().isLoggable(Level.FINE); - } } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java new file mode 100644 index 00000000000..cec7fd2ce52 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -0,0 +1,115 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.google.common.collect.ImmutableMap; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.CloseableInvoker; +import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.dispatch.InterleavedFillInvoker; +import com.yahoo.search.dispatch.InterleavedSearchInvoker; +import com.yahoo.search.dispatch.SearchCluster; +import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.result.Hit; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * FS4InvokerFactory constructs {@link FillInvoker} and {@link SearchInvoker} objects that communicate with + * content nodes or dispatchers over the fnet/FS4 protocol + * + * @author ollivir + */ +public class FS4InvokerFactory { + private final FS4ResourcePool fs4ResourcePool; + private final VespaBackEndSearcher searcher; + private final ImmutableMap<Integer, SearchCluster.Node> nodesByKey; + + public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster, VespaBackEndSearcher searcher) { + this.fs4ResourcePool = fs4ResourcePool; + this.searcher = searcher; + + ImmutableMap.Builder<Integer, SearchCluster.Node> builder = ImmutableMap.builder(); + searchCluster.groups().values().forEach(group -> group.nodes().forEach(node -> builder.put(node.key(), node))); + this.nodesByKey = builder.build(); + } + + public SearchInvoker getSearchInvoker(Query query, SearchCluster.Node node) { + return new FS4SearchInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); + } + + public Optional<SearchInvoker> getSearchInvoker(Query query, SearchCluster.Group group) { + return getInvoker(group.nodes(), node -> getSearchInvoker(query, node), InterleavedSearchInvoker::new); + } + + public FillInvoker getFillInvoker(Query query, SearchCluster.Node node) { + return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key()); + } + + public Optional<FillInvoker> getFillInvoker(Result result) { + Collection<Integer> requiredNodes = requiredFillNodes(result); + List<SearchCluster.Node> nodes = new ArrayList<>(requiredNodes.size()); + + for (Integer distKey : requiredNodes) { + SearchCluster.Node node = nodesByKey.get(distKey); + if (node == null) { + return Optional.empty(); + } + nodes.add(node); + } + + Query query = result.getQuery(); + return getInvoker(nodes, node -> getFillInvoker(query, node), InterleavedFillInvoker::new); + } + + private static Collection<Integer> requiredFillNodes(Result result) { + Set<Integer> requiredNodes = new HashSet<>(); + for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) { + Hit h = i.next(); + if (h instanceof FastHit) { + FastHit hit = (FastHit) h; + requiredNodes.add(hit.getDistributionKey()); + } + } + return requiredNodes; + } + + @FunctionalInterface + private interface InvokerConstructor<INVOKER> { + INVOKER construct(SearchCluster.Node node); + } + + @FunctionalInterface + private interface ClusterInvokerConstructor<CLUSTERINVOKER extends INVOKER, INVOKER> { + CLUSTERINVOKER construct(Map<Integer, INVOKER> subinvokers); + } + + private <INVOKER extends CloseableInvoker, CLUSTERINVOKER extends INVOKER> Optional<INVOKER> getInvoker( + Collection<SearchCluster.Node> nodes, InvokerConstructor<INVOKER> singleNodeCtor, + ClusterInvokerConstructor<CLUSTERINVOKER, INVOKER> clusterCtor) { + if (nodes.size() == 1) { + SearchCluster.Node node = nodes.iterator().next(); + return Optional.of(singleNodeCtor.construct(node)); + } else { + Map<Integer, INVOKER> nodeInvokers = new HashMap<>(); + for (SearchCluster.Node node : nodes) { + if (node.isWorking()) { + nodeInvokers.put(node.key(), singleNodeCtor.construct(node)); + } + } + if (nodeInvokers.size() == 1) { + return Optional.of(nodeInvokers.values().iterator().next()); + } else { + return Optional.of(clusterCtor.construct(nodeInvokers)); + } + } + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java new file mode 100644 index 00000000000..82f87fcac19 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -0,0 +1,157 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.fastsearch; + +import com.yahoo.fs4.BasicPacket; +import com.yahoo.fs4.ChannelTimeoutException; +import com.yahoo.fs4.Packet; +import com.yahoo.fs4.QueryPacket; +import com.yahoo.fs4.QueryResultPacket; +import com.yahoo.fs4.mplex.Backend; +import com.yahoo.fs4.mplex.FS4Channel; +import com.yahoo.fs4.mplex.InvalidChannelException; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.dispatch.SearchInvoker; +import com.yahoo.search.result.ErrorMessage; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static java.util.Arrays.asList; + +/** + * {@link SearchInvoker} implementation for FS4 nodes and fdispatch + * + * @author ollivir + */ +public class FS4SearchInvoker extends SearchInvoker { + private final VespaBackEndSearcher searcher; + private FS4Channel channel; + private final Optional<Integer> distributionKey; + + private ErrorMessage pendingSearchError = null; + private Query query = null; + private QueryPacket queryPacket = null; + + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, FS4ResourcePool fs4ResourcePool, String hostname, int port, + int distributionKey) { + this.searcher = searcher; + this.distributionKey = Optional.of(distributionKey); + + Backend backend = fs4ResourcePool.getBackend(hostname, port); + this.channel = backend.openChannel(); + channel.setQuery(query); + } + + // fdispatch code path + public FS4SearchInvoker(VespaBackEndSearcher searcher, Query query, Backend backend) { + this.searcher = searcher; + this.distributionKey = Optional.empty(); + this.channel = backend.openChannel(); + channel.setQuery(query); + } + + @Override + protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { + if (isLoggingFine()) + getLogger().finest("sending query packet"); + + if(queryPacket == null) { + // query changed for subchannel + queryPacket = searcher.createQueryPacket(query); + } + + this.query = query; + this.queryPacket = queryPacket; + + try { + boolean couldSend = channel.sendPacket(queryPacket); + if (!couldSend) { + pendingSearchError = ErrorMessage.createBackendCommunicationError("Could not reach '" + getName() + "'"); + } + } catch (InvalidChannelException e) { + pendingSearchError = ErrorMessage.createBackendCommunicationError("Invalid channel " + getName()); + } catch (IllegalStateException e) { + pendingSearchError = ErrorMessage.createBackendCommunicationError("Illegal state in FS4: " + e.getMessage()); + } + } + + @Override + protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { + if(pendingSearchError != null) { + return asList(new Result(query, pendingSearchError)); + } + BasicPacket[] basicPackets; + + try { + basicPackets = channel.receivePackets(query.getTimeLeft(), 1); + } catch (ChannelTimeoutException e) { + return asList(new Result(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName()))); + } catch (InvalidChannelException e) { + return asList(new Result(query, ErrorMessage.createBackendCommunicationError("Invalid channel for " + getName()))); + } + + if (basicPackets.length == 0) { + return asList(new Result(query, ErrorMessage.createBackendCommunicationError(getName() + " got no packets back"))); + } + + if (isLoggingFine()) + getLogger().finest("got packets " + basicPackets.length + " packets"); + + basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); + QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; + + if (isLoggingFine()) + getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); + + if (query.getPresentation().getSummary() == null) + query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); + + Result result = new Result(query); + + searcher.addMetaInfo(query, queryPacket.getQueryPacketData(), resultPacket, result); + + searcher.addUnfilledHits(result, resultPacket.getDocuments(), false, queryPacket.getQueryPacketData(), cacheKey, distributionKey); + Packet[] packets; + CacheControl cacheControl = searcher.getCacheControl(); + PacketWrapper packetWrapper = cacheControl.lookup(cacheKey, query); + + if (packetWrapper != null) { + cacheControl.updateCacheEntry(cacheKey, query, resultPacket); + } else { + if (resultPacket.getCoverageFeature() && !resultPacket.getCoverageFull()) { + // Don't add error here, it was done in first phase + // No check if packetWrapper already exists, since incomplete + // first phase data won't be cached anyway. + } else { + packets = new Packet[1]; + packets[0] = resultPacket; + cacheControl.cache(cacheKey, query, new DocsumPacketKey[0], packets, distributionKey); + } + } + return asList(result); + } + + @Override + public void release() { + if (channel != null) { + channel.close(); + channel = null; + } + } + + private String getName() { + return searcher.getName(); + } + + private Logger getLogger() { + return searcher.getLogger(); + } + + private boolean isLoggingFine() { + return getLogger().isLoggable(Level.FINE); + } +} diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 9acf48a7c67..b429995460d 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -1,8 +1,6 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.fastsearch; -import com.yahoo.compress.CompressionType; -import com.yahoo.container.search.LegacyEmulationConfig; import com.yahoo.fs4.BasicPacket; import com.yahoo.fs4.ChannelTimeoutException; import com.yahoo.fs4.PingPacket; @@ -17,9 +15,10 @@ import com.yahoo.prelude.querytransform.QueryRewrite; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.CloseableChannel; import com.yahoo.search.dispatch.Dispatcher; +import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.SearchCluster; +import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.grouping.GroupingRequest; import com.yahoo.search.grouping.request.GroupingOperation; import com.yahoo.search.query.Ranking; @@ -51,22 +50,12 @@ public class FastSearcher extends VespaBackEndSearcher { /** If this is turned on this will make search queries directly to the local search node when possible */ private final static CompoundName dispatchDirect = new CompoundName("dispatch.direct"); - /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ - private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); - - /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ - private final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - - /** If enabled, the dispatcher internal to the search container will be preferred over fdispatch - * whenever possible */ - private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); - /** Used to dispatch directly to search nodes over RPC, replacing the old fnet communication path */ private final Dispatcher dispatcher; private final Backend dispatchBackend; - private final FS4ResourcePool fs4ResourcePool; + private final FS4InvokerFactory fs4InvokerFactory; /** * Creates a Fastsearcher. @@ -89,8 +78,8 @@ public class FastSearcher extends VespaBackEndSearcher { CacheParams cacheParams, DocumentdbInfoConfig documentdbInfoConfig) { init(docSumParams, clusterParams, cacheParams, documentdbInfoConfig); this.dispatchBackend = dispatchBackend; - this.fs4ResourcePool = fs4ResourcePool; this.dispatcher = dispatcher; + this.fs4InvokerFactory = new FS4InvokerFactory(fs4ResourcePool, dispatcher.searchCluster(), this); } /** @@ -161,8 +150,8 @@ public class FastSearcher extends VespaBackEndSearcher { public Result doSearch2(Query query, QueryPacket queryPacket, CacheKey cacheKey, Execution execution) { if (dispatcher.searchCluster().groupSize() == 1) forceSinglePassGrouping(query); - try(CloseableChannel channel = getChannel(query)) { - List<Result> results = channel.search(query, queryPacket, cacheKey); + try(SearchInvoker invoker = getSearchInvoker(query)) { + List<Result> results = invoker.search(query, queryPacket, cacheKey); Result result = mergeResults(results, query, execution); if (query.properties().getBoolean(Ranking.RANKFEATURES, false)) { @@ -186,6 +175,25 @@ public class FastSearcher extends VespaBackEndSearcher { } } + /** + * Perform a partial docsum fill for a temporary result + * representing a partition of the complete fill request. + * + * @param result result containing a partition of the unfilled hits + * @param summaryClass the summary class we want to fill with + **/ + @Override + protected void doPartialFill(Result result, String summaryClass) { + if (result.isFilled(summaryClass)) return; + + Query query = result.getQuery(); + traceQuery(getName(), "fill", query, query.getOffset(), query.getHits(), 1, quotedSummaryClass(summaryClass)); + + try (FillInvoker invoker = getFillInvoker(result)) { + invoker.fill(result, summaryClass); + } + } + /** When we only search a single node, doing all grouping in one pass is more efficient */ private void forceSinglePassGrouping(Query query) { for (GroupingRequest groupingRequest : query.getSelect().getGrouping()) @@ -199,71 +207,60 @@ public class FastSearcher extends VespaBackEndSearcher { } /** - * Returns a request interface object for the given query. - * Normally this is built from the backend field of this instance, which connects to the dispatch node - * this component talks to (which is why this instance was chosen by the cluster controller). However, - * under certain conditions we will instead return an interface which connects directly to the relevant - * search nodes. + * Returns an invocation object for use in a single search request. The specific implementation returned + * depends on query properties with the default being an invoker that interfaces with a dispatcher + * on the same host. */ - private CloseableChannel getChannel(Query query) { - if (query.properties().getBoolean(dispatchInternal, false)) { - Optional<CloseableChannel> dispatchedChannel = dispatcher.getDispatchedChannel(this, query); - if (dispatchedChannel.isPresent()) { - return dispatchedChannel.get(); - } + private SearchInvoker getSearchInvoker(Query query) { + Optional<SearchInvoker> invoker = dispatcher.getSearchInvoker(query, fs4InvokerFactory); + if (invoker.isPresent()) { + return invoker.get(); } - if (!query.properties().getBoolean(dispatchDirect, true)) - return new FS4CloseableChannel(this, query, dispatchBackend); - if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) - return new FS4CloseableChannel(this, query, dispatchBackend); - - Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); - if (!directDispatchRecipient.isPresent()) - return new FS4CloseableChannel(this, query, dispatchBackend); - // Dispatch directly to the single, local search node - SearchCluster.Node local = directDispatchRecipient.get(); - query.trace(false, 2, "Dispatching directly to ", directDispatchRecipient.get()); - return new FS4CloseableChannel(this, query, fs4ResourcePool, local.hostname(), local.fs4port(), local.key()); + Optional<SearchCluster.Node> direct = getDirectNode(query); + if(direct.isPresent()) { + return fs4InvokerFactory.getSearchInvoker(query, direct.get()); + } + return new FS4SearchInvoker(this, query, dispatchBackend); } /** - * Perform a partial docsum fill for a temporary result - * representing a partition of the complete fill request. - * - * @param result result containing a partition of the unfilled hits - * @param summaryClass the summary class we want to fill with - **/ - @Override - protected void doPartialFill(Result result, String summaryClass) { - if (result.isFilled(summaryClass)) return; - + * Returns an invocation object for use in a single fill request. The specific implementation returned + * depends on query properties with the default being an invoker that uses RPC to interface with + * content nodes. + */ + private FillInvoker getFillInvoker(Result result) { Query query = result.getQuery(); - traceQuery(getName(), "fill", query, query.getOffset(), query.getHits(), 1, quotedSummaryClass(summaryClass)); - - if (query.properties().getBoolean(dispatchSummaries, true) - && ! summaryNeedsQuery(query) - && query.getRanking().getLocation() == null - && ! cacheControl.useCache(query) - && ! legacyEmulationConfigIsSet(getDocumentDatabase(query))) { - - CompressionType compression = - CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); - dispatcher.fill(result, summaryClass, getDocumentDatabase(query), compression); - return; + Optional<FillInvoker> invoker = dispatcher.getFillInvoker(result, this, getDocumentDatabase(query), fs4InvokerFactory); + if (invoker.isPresent()) { + return invoker.get(); } - try (CloseableChannel channel = getChannel(query)) { - channel.partialFill(result, summaryClass); + Optional<SearchCluster.Node> direct = getDirectNode(query); + if (direct.isPresent()) { + return fs4InvokerFactory.getFillInvoker(query, direct.get()); } + return new FS4FillInvoker(this, query, dispatchBackend); } - private boolean legacyEmulationConfigIsSet(DocumentDatabase db) { - LegacyEmulationConfig config = db.getDocsumDefinitionSet().legacyEmulationConfig(); - if (config.forceFillEmptyFields()) return true; - if (config.stringBackedFeatureData()) return true; - if (config.stringBackedStructuredData()) return true; - return false; + /** + * If the query can be directed to a single local content node, returns that node. Otherwise, + * returns an empty value. + */ + private Optional<SearchCluster.Node> getDirectNode(Query query) { + if (!query.properties().getBoolean(dispatchDirect, true)) + return Optional.empty(); + if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) + return Optional.empty(); + + Optional<SearchCluster.Node> directDispatchRecipient = dispatcher.searchCluster().directDispatchTarget(); + if (!directDispatchRecipient.isPresent()) + return Optional.empty(); + + // Dispatch directly to the single, local search node + SearchCluster.Node local = directDispatchRecipient.get(); + query.trace(false, 2, "Dispatching directly to ", local); + return Optional.of(local); } private Result mergeResults(List<Result> results, Query query, Execution execution) { @@ -289,8 +286,6 @@ public class FastSearcher extends VespaBackEndSearcher { result.hits().trim(query.getOffset(), query.getHits()); } - // TODO grouping - return result; } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index 409d05e3aaf..42903dcfa90 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -102,7 +102,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { * Returns whether we need to send the query when fetching summaries. * This is necessary if the query requests summary features or dynamic snippeting */ - boolean summaryNeedsQuery(Query query) { + public boolean summaryNeedsQuery(Query query) { if (query.getRanking().getQueryCache()) return false; // Query is cached in backend DocumentDatabase documentDb = getDocumentDatabase(query); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java index f2d7750ebe4..431b36c2623 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java @@ -15,7 +15,7 @@ import java.util.Optional; interface Client { void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedSlime, Dispatcher.GetDocsumsResponseReceiver responseReceiver, + int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); /** Creates a connection to a particular node in this */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java deleted file mode 100644 index fc337d589ec..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableChannel.java +++ /dev/null @@ -1,59 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; - -import com.yahoo.fs4.QueryPacket; -import com.yahoo.prelude.fastsearch.CacheKey; -import com.yahoo.search.Query; -import com.yahoo.search.Result; - -import java.io.Closeable; -import java.io.IOException; -import java.util.List; - -/** - * CloseableChannel is an interface for running a search query and getting document summaries against some content node, node group or - * dispatcher while abstracting the specifics of the invocation target. ClosebleChannel objects are stateful and should not be reused. - * - * @author ollivir - */ -public abstract class CloseableChannel implements Closeable { - /** Retrieve the hits for the given {@link Query}. The channel may return more than one result, in - * which case the caller is responsible for merging the results. If multiple results are returned - * and the search query had a hit offset other than zero, that offset will be set to zero and the - * number of requested hits will be adjusted accordingly. */ - public List<Result> search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { - sendSearchRequest(query, queryPacket); - return getSearchResults(cacheKey); - } - - protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException; - - protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException; - - /** Retrieve document summaries for the unfilled hits in the given {@link Result} */ - public void partialFill(Result result, String summaryClass) { - sendPartialFillRequest(result, summaryClass); - getPartialFillResults(result, summaryClass); - } - - protected abstract void getPartialFillResults(Result result, String summaryClass); - - protected abstract void sendPartialFillRequest(Result result, String summaryClass); - - protected abstract void closeChannel(); - - private Runnable teardown = null; - - public void teardown(Runnable teardown) { - this.teardown = teardown; - } - - @Override - public final void close() { - if (teardown != null) { - teardown.run(); - teardown = null; - } - closeChannel(); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java new file mode 100644 index 00000000000..481940a33b7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/CloseableInvoker.java @@ -0,0 +1,30 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import java.io.Closeable; + +/** + * CloseableInvoker is an abstract implementation of {@link Closeable} with an additional hook for + * executing code at closing. Classes that extend CloseableInvoker need to override {@link #release()} + * instead of {@link #close()} which is final to avoid accidental overriding. + * + * @author ollivir + */ +public abstract class CloseableInvoker implements Closeable { + protected abstract void release(); + + private Runnable teardown = null; + + public void teardown(Runnable teardown) { + this.teardown = teardown; + } + + @Override + public final void close() { + if (teardown != null) { + teardown.run(); + teardown = null; + } + release(); + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index ce0d48f5638..31e6070423d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,296 +1,94 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.google.common.collect.ImmutableMap; -import com.yahoo.collections.ListMap; import com.yahoo.component.AbstractComponent; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; import com.yahoo.container.handler.VipStatus; -import com.yahoo.container.protect.Error; -import com.yahoo.data.access.Inspector; -import com.yahoo.data.access.slime.SlimeAdapter; import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FS4CloseableChannel; +import com.yahoo.prelude.fastsearch.FS4InvokerFactory; import com.yahoo.prelude.fastsearch.FS4ResourcePool; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.query.SessionId; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; -import com.yahoo.slime.ArrayTraverser; -import com.yahoo.slime.BinaryFormat; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Slime; import com.yahoo.vespa.config.search.DispatchConfig; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; /** * A dispatcher communicates with search nodes to perform queries and fill hits. * - * This is currently not functionally complete: Queries can only be dispatched to a single node, - * and summaries can only be requested when they do not need the query. + * This class allocates {@link SearchInvoker} and {@link FillInvoker} objects based + * on query properties and general system status. The caller can then use the provided + * invocation object to execute the search or fill. * * This class is multithread safe. * * @author bratseth + * @author ollvir */ public class Dispatcher extends AbstractComponent { - - private final static Logger log = Logger.getLogger(Dispatcher.class.getName()); - private final Client client; + /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ + private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; - /** Connections to the search nodes this talks to, indexed by node id ("partid") */ - private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections; - - private final Compressor compressor = new Compressor(); - private final LoadBalancer loadBalancer; - private final FS4ResourcePool fs4ResourcePool; + private final RpcResourcePool rpcResourcePool; - public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, - int containerClusterSize, VipStatus vipStatus) { - this.client = new RpcClient(); + public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus); - this.fs4ResourcePool = fs4ResourcePool; this.loadBalancer = new LoadBalancer(searchCluster); - - // Create node rpc connections, indexed by the node distribution key - ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) { - nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); - } - nodeConnections = nodeConnectionsBuilder.build(); + this.rpcResourcePool = new RpcResourcePool(dispatchConfig); } /** For testing */ public Dispatcher(Map<Integer, Client.NodeConnection> nodeConnections, Client client) { this.searchCluster = null; - this.nodeConnections = ImmutableMap.copyOf(nodeConnections); - this.client = client; - this.fs4ResourcePool = null; this.loadBalancer = new LoadBalancer(searchCluster); + this.rpcResourcePool = new RpcResourcePool(client, nodeConnections); } /** Returns the search cluster this dispatches to */ - public SearchCluster searchCluster() { return searchCluster; } - - /** Fills the given summary class by sending RPC requests to the right search nodes */ - public void fill(Result result, String summaryClass, DocumentDatabase documentDb, CompressionType compression) { - try { - ListMap<Integer, FastHit> hitsByNode = hitsByNode(result); - - if (result.getQuery().getTraceLevel() >=3) - result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3); - - GetDocsumsResponseReceiver responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), compressor, result); - for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) { - sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver); - } - responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb); - result.hits().setSorted(false); - result.analyzeHits(); - } - catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage())); - } - } - - /** Return a map of hits by their search node (partition) id */ - private static ListMap<Integer, FastHit> hitsByNode(Result result) { - ListMap<Integer, FastHit> hitsByNode = new ListMap<>(); - for (Iterator<Hit> i = result.hits().unorderedDeepIterator() ; i.hasNext(); ) { - Hit h = i.next(); - if ( ! (h instanceof FastHit)) continue; - FastHit hit = (FastHit)h; - - hitsByNode.put(hit.getDistributionKey(), hit); - } - return hitsByNode; - } - - /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ - private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, - CompressionType compression, - Result result, GetDocsumsResponseReceiver responseReceiver) { - Client.NodeConnection node = nodeConnections.get(nodeId); - if (node == null) { - result.hits().addError(ErrorMessage.createEmptyDocsums("Could not fill hits from unknown node " + nodeId)); - log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); - return; - } - - Query query = result.getQuery(); - String rankProfile = query.getRanking().getProfile(); - byte[] serializedSlime = BinaryFormat.encode(toSlime(rankProfile, summaryClass, - query.getModel().getDocumentDb(), query.getSessionId(false), hits)); - double timeoutSeconds = ((double)query.getTimeLeft()-3.0)/1000.0; - Compressor.Compression compressionResult = compressor.compress(compression, serializedSlime); - client.getDocsums(hits, node, compressionResult.type(), - serializedSlime.length, compressionResult.data(), responseReceiver, timeoutSeconds); - } - - static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) { - Slime slime = new Slime(); - Cursor root = slime.setObject(); - if (summaryClass != null) { - root.setString("class", summaryClass); - } - if (sessionId != null) { - root.setData("sessionid", sessionId.asUtf8String().getBytes()); - } - if (docType != null) { - root.setString("doctype", docType); - } - if (rankProfile != null) { - root.setString("ranking", rankProfile); - } - Cursor gids = root.setArray("gids"); - for (FastHit hit : hits) { - gids.addData(hit.getGlobalId().getRawId()); - } - return slime; + public SearchCluster searchCluster() { + return searchCluster; } @Override public void deconstruct() { - for (Client.NodeConnection nodeConnection : nodeConnections.values()) - nodeConnection.close(); + rpcResourcePool.release(); } - /** Receiver of the responses to a set of getDocsums requests */ - public static class GetDocsumsResponseReceiver { - - private final BlockingQueue<Client.GetDocsumsResponseOrError> responses; - private final Compressor compressor; - private final Result result; - - /** Whether we have already logged/notified about an error - to avoid spamming */ - private boolean hasReportedError = false; - - /** The number of responses we should receive (and process) before this is complete */ - private int outstandingResponses; - - public GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) { - this.compressor = compressor; - responses = new LinkedBlockingQueue<>(requestCount); - outstandingResponses = requestCount; - this.result = result; - } - - /** Called by a thread belonging to the client when a valid response becomes available */ - public void receive(Client.GetDocsumsResponseOrError response) { - responses.add(response); - } - - private void throwTimeout() throws TimeoutException { - throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding."); - } - - /** - * Call this from the dispatcher thread to initiate and complete processing of responses. - * This will block until all responses are available and processed, or to timeout. - */ - public void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException { - try { - int skippedHits = 0; - while (outstandingResponses > 0) { - long timeLeftMs = query.getTimeLeft(); - if (timeLeftMs <= 0) { - throwTimeout(); - } - Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); - if (response == null) - throwTimeout(); - skippedHits += processResponse(response, summaryClass, documentDb); - outstandingResponses--; - } - if (skippedHits != 0) { - result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + - summaryClass + " for " + skippedHits + " hits")); - } - } - catch (InterruptedException e) { - // TODO: Add error - } - } - - private int processResponse(Client.GetDocsumsResponseOrError responseOrError, - String summaryClass, - DocumentDatabase documentDb) { - if (responseOrError.error().isPresent()) { - if (hasReportedError) return 0; - String error = responseOrError.error().get(); - result.hits().addError(ErrorMessage.createBackendCommunicationError(error)); - log.log(Level.WARNING, "Error fetching summary data: "+ error); - } - else { - Client.GetDocsumsResponse response = responseOrError.response().get(); - CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize()); - return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes); - } - return 0; - } + @FunctionalInterface + private interface SearchInvokerSupplier { + Optional<SearchInvoker> supply(Query query, SearchCluster.Group group); + } - private void addErrors(com.yahoo.slime.Inspector errors) { - errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> { - int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) - ? Error.TIMEOUT.code - : Error.UNSPECIFIED.code; - result.hits().addError(new ErrorMessage(errorCode, - value.field("message").asString(), value.field("details").asString())); - }); + public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb, + FS4InvokerFactory fs4InvokerFactory) { + Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb); + if (rpcInvoker.isPresent()) { + return rpcInvoker; } - - private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) { - com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get(); - com.yahoo.slime.Inspector errors = root.field("errors"); - boolean hasErrors = errors.valid() && (errors.entries() > 0); - if (hasErrors) { - addErrors(errors); + if (result.getQuery().properties().getBoolean(dispatchInternal, false)) { + Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.getFillInvoker(result); + if (fs4Invoker.isPresent()) { + return fs4Invoker; } - - Inspector summaries = new SlimeAdapter(root.field("docsums")); - if ( ! summaries.valid()) - return 0; // No summaries; Perhaps we requested a non-existing summary class - int skippedHits = 0; - for (int i = 0; i < hits.size(); i++) { - Inspector summary = summaries.entry(i).field("docsum"); - if (summary.fieldCount() != 0) { - hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName()); - hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary); - hits.get(i).setFilled(summaryClass); - } else { - skippedHits++; - } - } - return skippedHits; } - + return Optional.empty(); } - public Optional<CloseableChannel> getDispatchedChannel(VespaBackEndSearcher searcher, Query query) { - if (!query.getSelect().getGrouping().isEmpty()) { - return Optional.empty(); + public Optional<SearchInvoker> getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) { + if (query.properties().getBoolean(dispatchInternal, false)) { + Optional<SearchInvoker> invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker); + return invoker; } + return Optional.empty(); + } + private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) { Optional<SearchCluster.Group> groupInCluster = loadBalancer.takeGroupForQuery(query); if (!groupInCluster.isPresent()) { return Optional.empty(); @@ -298,20 +96,12 @@ public class Dispatcher extends AbstractComponent { SearchCluster.Group group = groupInCluster.get(); query.trace(false, 2, "Dispatching internally to ", group); - if (group.nodes().size() == 1) { - SearchCluster.Node node = group.nodes().iterator().next(); - CloseableChannel channel = new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), - node.key()); - return Optional.of(channel); + Optional<SearchInvoker> invoker = invokerFactory.supply(query, group); + if (invoker.isPresent()) { + invoker.get().teardown(() -> loadBalancer.releaseGroup(group)); } else { - query.setNoCache(true); // Note - multi-node request disables packet based caching - - Map<Integer, CloseableChannel> subchannels = new HashMap<>(); - for (SearchCluster.Node node : group.nodes()) { - subchannels.put(node.key(), new FS4CloseableChannel(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port(), node.key())); - } - CloseableChannel multinode = new InterleavedCloseableChannel(subchannels); - return Optional.of(multinode); + loadBalancer.releaseGroup(group); } + return invoker; } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java new file mode 100644 index 00000000000..dd4c4494ac5 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/FillInvoker.java @@ -0,0 +1,22 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.search.Result; + +/** + * FillInvoker encapsulates an allocated connection for running a document summary retrieval. + * The invocation object can be stateful and should not be reused. + * + * @author ollivir + */ +public abstract class FillInvoker extends CloseableInvoker { + /** Retrieve document summaries for the unfilled hits in the given {@link Result} */ + public void fill(Result result, String summaryClass) { + sendFillRequest(result, summaryClass); + getFillResults(result, summaryClass); + } + + protected abstract void getFillResults(Result result, String summaryClass); + + protected abstract void sendFillRequest(Result result, String summaryClass); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java deleted file mode 100644 index e461f6fc725..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedCloseableChannel.java +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; - -import com.yahoo.fs4.QueryPacket; -import com.yahoo.prelude.fastsearch.CacheKey; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.result.Hit; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * InterleavedCloseableChannel uses multiple {@link CloseableChannel} objects to interface with - * content nodes in parallel. Operationally it first sends requests to all channels and then - * collects the results. The invoker of this class is responsible for merging the results if - * needed. - * - * @author ollivir - */ -public class InterleavedCloseableChannel extends CloseableChannel { - private final Map<Integer, CloseableChannel> subchannels; - private Map<Integer, Result> expectedFillResults = null; - - public InterleavedCloseableChannel(Map<Integer, CloseableChannel> subchannels) { - this.subchannels = subchannels; - } - - /** Sends search queries to the contained {@link CloseableChannel} subchannels. If the - * search query has an offset other than zero, it will be reset to zero and the expected - * hit amount will be adjusted accordingly. */ - @Override - protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { - for (CloseableChannel subchannel : subchannels.values()) { - Query subquery = query.clone(); - - subquery.setHits(subquery.getHits() + subquery.getOffset()); - subquery.setOffset(0); - subchannel.sendSearchRequest(subquery, null); - } - } - - @Override - protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { - List<Result> results = new ArrayList<>(); - - for (CloseableChannel subchannel : subchannels.values()) { - results.addAll(subchannel.getSearchResults(cacheKey)); - } - return results; - } - - @Override - protected void sendPartialFillRequest(Result result, String summaryClass) { - expectedFillResults = new HashMap<>(); - - for (Iterator<Hit> it = result.hits().deepIterator(); it.hasNext();) { - Hit hit = it.next(); - if (hit instanceof FastHit) { - FastHit fhit = (FastHit) hit; - Result res = expectedFillResults.computeIfAbsent(fhit.getDistributionKey(), dk -> new Result(result.getQuery())); - res.hits().add(fhit); - } - } - expectedFillResults.forEach((distKey, partialResult) -> { - CloseableChannel channel = subchannels.get(distKey); - if (channel != null) { - channel.sendPartialFillRequest(partialResult, summaryClass); - } - }); - } - - @Override - protected void getPartialFillResults(Result result, String summaryClass) { - if (expectedFillResults == null) { - return; - } - expectedFillResults.forEach((distKey, partialResult) -> { - CloseableChannel channel = subchannels.get(distKey); - if (channel != null) { - channel.getPartialFillResults(partialResult, summaryClass); - } - }); - } - - @Override - protected void closeChannel() { - if (!subchannels.isEmpty()) { - subchannels.values().forEach(CloseableChannel::close); - subchannels.clear(); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java new file mode 100644 index 00000000000..644e6f17bdb --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedFillInvoker.java @@ -0,0 +1,67 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.search.Result; +import com.yahoo.search.result.Hit; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** + * InterleavedFillInvoker uses multiple {@link FillInvoker} objects to interface with content + * nodes in parallel. Operationally it first sends requests with all contained invokers and then + * collects the results. + * + * @author ollivir + */ +public class InterleavedFillInvoker extends FillInvoker { + private final Map<Integer, FillInvoker> invokers; + private Map<Integer, Result> expectedFillResults = null; + + public InterleavedFillInvoker(Map<Integer, FillInvoker> invokers) { + this.invokers = invokers; + } + + @Override + protected void sendFillRequest(Result result, String summaryClass) { + expectedFillResults = new HashMap<>(); + + for (Iterator<Hit> it = result.hits().deepIterator(); it.hasNext();) { + Hit hit = it.next(); + if (hit instanceof FastHit) { + FastHit fhit = (FastHit) hit; + Result res = expectedFillResults.computeIfAbsent(fhit.getDistributionKey(), dk -> new Result(result.getQuery())); + res.hits().add(fhit); + } + } + expectedFillResults.forEach((distKey, partialResult) -> { + FillInvoker invoker = invokers.get(distKey); + if (invoker != null) { + invoker.sendFillRequest(partialResult, summaryClass); + } + }); + } + + @Override + protected void getFillResults(Result result, String summaryClass) { + if (expectedFillResults == null) { + return; + } + expectedFillResults.forEach((distKey, partialResult) -> { + FillInvoker invoker = invokers.get(distKey); + if (invoker != null) { + invoker.getFillResults(partialResult, summaryClass); + } + }); + } + + @Override + protected void release() { + if (!invokers.isEmpty()) { + invokers.values().forEach(FillInvoker::close); + invokers.clear(); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java new file mode 100644 index 00000000000..d80f3a49213 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -0,0 +1,62 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.CacheKey; +import com.yahoo.search.Query; +import com.yahoo.search.Result; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; + +/** + * InterleavedSearchInvoker uses multiple {@link SearchInvoker} objects to interface with content + * nodes in parallel. Operationally it first sends requests to all contained invokers and then + * collects the results. The user of this class is responsible for merging the results if needed. + * + * @author ollivir + */ +public class InterleavedSearchInvoker extends SearchInvoker { + private final Collection<SearchInvoker> invokers; + + public InterleavedSearchInvoker(Map<Integer, SearchInvoker> invokers) { + this.invokers = new ArrayList<>(invokers.values()); + } + + /** + * Sends search queries to the contained {@link SearchInvoker} sub-invokers. If the search + * query has an offset other than zero, it will be reset to zero and the expected hit amount + * will be adjusted accordingly. + */ + @Override + protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { + for (SearchInvoker invoker : invokers) { + Query subquery = query.clone(); + + subquery.setHits(subquery.getHits() + subquery.getOffset()); + subquery.setOffset(0); + invoker.sendSearchRequest(subquery, null); + } + } + + @Override + protected List<Result> getSearchResults(CacheKey cacheKey) throws IOException { + List<Result> results = new ArrayList<>(); + + for (SearchInvoker invoker : invokers) { + results.addAll(invoker.getSearchResults(cacheKey)); + } + return results; + } + + @Override + protected void release() { + if (!invokers.isEmpty()) { + invokers.forEach(SearchInvoker::close); + invokers.clear(); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java index 67e032eca37..2a4767bc389 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java @@ -32,7 +32,7 @@ class RpcClient implements Client { @Override public void getDocsums(List<FastHit> hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, Dispatcher.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { Request request = new Request("proton.getDocsums"); request.parameters().add(new Int8Value(compression.getCode())); request.parameters().add(new Int32Value(uncompressedLength)); @@ -89,9 +89,9 @@ class RpcClient implements Client { private final RpcNodeConnection node; /** The handler to which the response is forwarded */ - private final Dispatcher.GetDocsumsResponseReceiver handler; + private final RpcFillInvoker.GetDocsumsResponseReceiver handler; - public RpcResponseWaiter(RpcNodeConnection node, Dispatcher.GetDocsumsResponseReceiver handler) { + public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { this.node = node; this.handler = handler; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java new file mode 100644 index 00000000000..53f6015ad2e --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java @@ -0,0 +1,251 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.collections.ListMap; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.container.protect.Error; +import com.yahoo.data.access.Inspector; +import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.TimeoutException; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.query.SessionId; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.slime.ArrayTraverser; +import com.yahoo.slime.BinaryFormat; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Slime; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * {@link FillInvoker} implementation using RPC + * + * @author bratseth + * @author ollivir + */ +public class RpcFillInvoker extends FillInvoker { + private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName()); + + private final DocumentDatabase documentDb; + private final RpcResourcePool resourcePool; + + private GetDocsumsResponseReceiver responseReceiver; + + + public RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) { + this.documentDb = documentDb; + this.resourcePool = resourcePool; + } + + @Override + protected void sendFillRequest(Result result, String summaryClass) { + ListMap<Integer, FastHit> hitsByNode = hitsByNode(result); + + CompressionType compression = CompressionType + .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); + + if (result.getQuery().getTraceLevel() >= 3) + result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3); + + responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), resourcePool.compressor(), result); + for (Map.Entry<Integer, List<FastHit>> nodeHits : hitsByNode.entrySet()) { + sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver); + } + } + + @Override + protected void getFillResults(Result result, String summaryClass) { + try { + responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb); + result.hits().setSorted(false); + result.analyzeHits(); + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage())); + } + } + + @Override + protected void release() { + // nothing to release + } + + /** Return a map of hits by their search node (partition) id */ + private static ListMap<Integer, FastHit> hitsByNode(Result result) { + ListMap<Integer, FastHit> hitsByNode = new ListMap<>(); + for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) { + Hit h = i.next(); + if (!(h instanceof FastHit)) + continue; + FastHit hit = (FastHit) h; + + hitsByNode.put(hit.getDistributionKey(), hit); + } + return hitsByNode; + } + + /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ + private void sendGetDocsumsRequest(int nodeId, List<FastHit> hits, String summaryClass, CompressionType compression, Result result, + GetDocsumsResponseReceiver responseReceiver) { + Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + if (node == null) { + result.hits().addError(ErrorMessage.createEmptyDocsums("Could not fill hits from unknown node " + nodeId)); + log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); + return; + } + + Query query = result.getQuery(); + String rankProfile = query.getRanking().getProfile(); + byte[] serializedSlime = BinaryFormat + .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(false), hits)); + double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; + Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime); + resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(), + responseReceiver, timeoutSeconds); + } + + static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List<FastHit> hits) { + Slime slime = new Slime(); + Cursor root = slime.setObject(); + if (summaryClass != null) { + root.setString("class", summaryClass); + } + if (sessionId != null) { + root.setData("sessionid", sessionId.asUtf8String().getBytes()); + } + if (docType != null) { + root.setString("doctype", docType); + } + if (rankProfile != null) { + root.setString("ranking", rankProfile); + } + Cursor gids = root.setArray("gids"); + for (FastHit hit : hits) { + gids.addData(hit.getGlobalId().getRawId()); + } + return slime; + } + + /** Receiver of the responses to a set of getDocsums requests */ + public static class GetDocsumsResponseReceiver { + + private final BlockingQueue<Client.GetDocsumsResponseOrError> responses; + private final Compressor compressor; + private final Result result; + + /** Whether we have already logged/notified about an error - to avoid spamming */ + private boolean hasReportedError = false; + + /** The number of responses we should receive (and process) before this is complete */ + private int outstandingResponses; + + public GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) { + this.compressor = compressor; + responses = new LinkedBlockingQueue<>(requestCount); + outstandingResponses = requestCount; + this.result = result; + } + + /** Called by a thread belonging to the client when a valid response becomes available */ + public void receive(Client.GetDocsumsResponseOrError response) { + responses.add(response); + } + + private void throwTimeout() throws TimeoutException { + throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding."); + } + + /** + * Call this from the dispatcher thread to initiate and complete processing of responses. + * This will block until all responses are available and processed, or to timeout. + */ + public void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException { + try { + int skippedHits = 0; + while (outstandingResponses > 0) { + long timeLeftMs = query.getTimeLeft(); + if (timeLeftMs <= 0) { + throwTimeout(); + } + Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); + if (response == null) + throwTimeout(); + skippedHits += processResponse(response, summaryClass, documentDb); + outstandingResponses--; + } + if (skippedHits != 0) { + result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + + summaryClass + " for " + skippedHits + " hits")); + } + } + catch (InterruptedException e) { + // TODO: Add error + } + } + + private int processResponse(Client.GetDocsumsResponseOrError responseOrError, + String summaryClass, + DocumentDatabase documentDb) { + if (responseOrError.error().isPresent()) { + if (hasReportedError) return 0; + String error = responseOrError.error().get(); + result.hits().addError(ErrorMessage.createBackendCommunicationError(error)); + log.log(Level.WARNING, "Error fetching summary data: "+ error); + } + else { + Client.GetDocsumsResponse response = responseOrError.response().get(); + CompressionType compression = CompressionType.valueOf(response.compression()); + byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize()); + return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes); + } + return 0; + } + + private void addErrors(com.yahoo.slime.Inspector errors) { + errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> { + int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) + ? Error.TIMEOUT.code + : Error.UNSPECIFIED.code; + result.hits().addError(new ErrorMessage(errorCode, + value.field("message").asString(), value.field("details").asString())); + }); + } + + private int fill(List<FastHit> hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) { + com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get(); + com.yahoo.slime.Inspector errors = root.field("errors"); + boolean hasErrors = errors.valid() && (errors.entries() > 0); + if (hasErrors) { + addErrors(errors); + } + + Inspector summaries = new SlimeAdapter(root.field("docsums")); + if ( ! summaries.valid()) + return 0; // No summaries; Perhaps we requested a non-existing summary class + int skippedHits = 0; + for (int i = 0; i < hits.size(); i++) { + Inspector summary = summaries.entry(i).field("docsum"); + if (summary.fieldCount() != 0) { + hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName()); + hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary); + hits.get(i).setFilled(summaryClass); + } else { + skippedHits++; + } + } + return skippedHits; + } + + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java new file mode 100644 index 00000000000..8ab80ec17dd --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java @@ -0,0 +1,97 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.google.common.collect.ImmutableMap; +import com.yahoo.compress.Compressor; +import com.yahoo.container.search.LegacyEmulationConfig; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.vespa.config.search.DispatchConfig; + +import java.util.Map; +import java.util.Optional; + +/** + * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains + * the RPC connection pool. + * + * @author ollivir + */ +public class RpcResourcePool { + /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ + public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); + + /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ + private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); + + private final Compressor compressor = new Compressor(); + private final Client client; + + /** Connections to the search nodes this talks to, indexed by node id ("partid") */ + private final ImmutableMap<Integer, Client.NodeConnection> nodeConnections; + + public RpcResourcePool(Client client, Map<Integer, Client.NodeConnection> nodeConnections) { + this.client = client; + this.nodeConnections = ImmutableMap.copyOf(nodeConnections); + } + + public RpcResourcePool(DispatchConfig dispatchConfig) { + this.client = new RpcClient(); + + // Create node rpc connections, indexed by the node distribution key + ImmutableMap.Builder<Integer, Client.NodeConnection> nodeConnectionsBuilder = new ImmutableMap.Builder<>(); + for (DispatchConfig.Node node : dispatchConfig.node()) { + nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); + } + this.nodeConnections = nodeConnectionsBuilder.build(); + } + + public Optional<FillInvoker> getFillInvoker(Query query, VespaBackEndSearcher searcher, DocumentDatabase documentDb) { + if (query.properties().getBoolean(dispatchSummaries, true) + && ! searcher.summaryNeedsQuery(query) + && query.getRanking().getLocation() == null + && ! searcher.getCacheControl().useCache(query) + && ! legacyEmulationConfigIsSet(documentDb)) { + + return Optional.of(new RpcFillInvoker(this, documentDb)); + } else { + return Optional.empty(); + } + } + + // for testing + public FillInvoker getFillInvoker(DocumentDatabase documentDb) { + return new RpcFillInvoker(this, documentDb); + } + + private boolean legacyEmulationConfigIsSet(DocumentDatabase db) { + LegacyEmulationConfig config = db.getDocsumDefinitionSet().legacyEmulationConfig(); + if (config.forceFillEmptyFields()) + return true; + if (config.stringBackedFeatureData()) + return true; + if (config.stringBackedStructuredData()) + return true; + return false; + } + + public Compressor compressor() { + return compressor; + } + + public Client client() { + return client; + } + + public ImmutableMap<Integer, Client.NodeConnection> nodeConnections() { + return nodeConnections; + } + + public void release() { + for (Client.NodeConnection nodeConnection : nodeConnections.values()) { + nodeConnection.close(); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java new file mode 100644 index 00000000000..53e09823f32 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -0,0 +1,32 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.fs4.QueryPacket; +import com.yahoo.prelude.fastsearch.CacheKey; +import com.yahoo.search.Query; +import com.yahoo.search.Result; + +import java.io.IOException; +import java.util.List; + +/** + * SearchInvoker encapsulates an allocated connection for running a single search query. + * The invocation object can be stateful and should not be reused. + * + * @author ollivir + */ +public abstract class SearchInvoker extends CloseableInvoker { + /** + * Retrieve the hits for the given {@link Query}. The invoker may return more than one result, in which case the caller is responsible + * for merging the results. If multiple results are returned and the search query had a hit offset other than zero, that offset is + * set to zero and the number of requested hits is adjusted accordingly. + */ + public List<Result> search(Query query, QueryPacket queryPacket, CacheKey cacheKey) throws IOException { + sendSearchRequest(query, queryPacket); + return getSearchResults(cacheKey); + } + + protected abstract void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException; + + protected abstract List<Result> getSearchResults(CacheKey cacheKey) throws IOException; +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java index 5e3e0dc301e..f77a4d092ee 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; -import com.yahoo.compress.CompressionType; import com.yahoo.prelude.fastsearch.DocsumDefinition; import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; import com.yahoo.prelude.fastsearch.DocsumField; @@ -10,9 +9,6 @@ import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.search.Query; import com.yahoo.search.Result; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - import java.util.ArrayList; import java.util.Collections; @@ -20,6 +16,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + /** * Tests using a dispatcher to fill a result @@ -36,7 +35,7 @@ public class FillTestCase { nodes.put(0, client.createConnection("host0", 123)); nodes.put(1, client.createConnection("host1", 123)); nodes.put(2, client.createConnection("host2", 123)); - Dispatcher dispatcher = new Dispatcher(nodes, client); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); Query query = new Query(); Result result = new Result(query); @@ -52,7 +51,7 @@ public class FillTestCase { client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); client.setDocsumReponse("host0", 4, "summaryClass1", map("field1", "s.0.4", "field2", 4)); - dispatcher.fill(result, "summaryClass1", db(), CompressionType.valueOf("LZ4")); + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); @@ -72,7 +71,7 @@ public class FillTestCase { nodes.put(0, client.createConnection("host0", 123)); nodes.put(1, client.createConnection("host1", 123)); nodes.put(2, client.createConnection("host2", 123)); - Dispatcher dispatcher = new Dispatcher(nodes, client); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); Query query = new Query(); Result result = new Result(query); @@ -87,7 +86,8 @@ public class FillTestCase { client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>()); client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>()); - dispatcher.fill(result, "summaryClass1", db(), CompressionType.valueOf("LZ4")); + + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); @@ -110,13 +110,13 @@ public class FillTestCase { Map<Integer, Client.NodeConnection> nodes = new HashMap<>(); nodes.put(0, client.createConnection("host0", 123)); - Dispatcher dispatcher = new Dispatcher(nodes, client); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); Query query = new Query(); Result result = new Result(query); result.hits().add(createHit(0, 0)); - dispatcher.fill(result, "summaryClass1", db(), CompressionType.valueOf("LZ4")); + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); assertEquals("Malfunctioning", result.hits().getError().getDetailedMessage()); } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java index 4cac6293ae3..a4cb8ae641c 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java @@ -36,7 +36,7 @@ public class MockClient implements Client { @Override public void getDocsums(List<FastHit> hitsContext, NodeConnection node, CompressionType compression, - int uncompressedSize, byte[] compressedSlime, Dispatcher.GetDocsumsResponseReceiver responseReceiver, + int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { if (malfunctioning) { responseReceiver.receive(GetDocsumsResponseOrError.fromError("Malfunctioning")); |