diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch')
13 files changed, 51 insertions, 722 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java index 019e07221a6..431b36c2623 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java @@ -1,5 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; +package com.yahoo.search.dispatch; import com.yahoo.compress.CompressionType; import com.yahoo.prelude.fastsearch.FastHit; @@ -18,10 +18,6 @@ interface Client { int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds); - void search(NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedPayload, RpcSearchInvoker responseReceiver, - double timeoutSeconds); - /** Creates a connection to a particular node in this */ NodeConnection createConnection(String hostname, int port); @@ -91,54 +87,4 @@ interface Client { } - class SearchResponseOrError { - // One of these will be non empty and the other not - private Optional<SearchResponse> response; - private Optional<String> error; - - public static SearchResponseOrError fromResponse(SearchResponse response) { - return new SearchResponseOrError(Optional.of(response), Optional.empty()); - } - - public static SearchResponseOrError fromError(String error) { - return new SearchResponseOrError(Optional.empty(), Optional.of(error)); - } - - private SearchResponseOrError(Optional<SearchResponse> response, Optional<String> error) { - this.response = response; - this.error = error; - } - - /** Returns the response, or empty if there is an error */ - public Optional<SearchResponse> response() { return response; } - - /** Returns the error or empty if there is a response */ - public Optional<String> error() { return error; } - - } - - class SearchResponse { - private final byte compression; - private final int uncompressedSize; - private final byte[] compressedPayload; - - public SearchResponse(byte compression, int uncompressedSize, byte[] compressedPayload) { - this.compression = compression; - this.uncompressedSize = uncompressedSize; - this.compressedPayload = compressedPayload; - } - - public byte compression() { - return compression; - } - - public int uncompressedSize() { - return uncompressedSize; - } - - public byte[] compressedPayload() { - return compressedPayload; - } - } - } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 0aee51e1e32..146b132be22 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -3,6 +3,7 @@ package com.yahoo.search.dispatch; import com.yahoo.component.AbstractComponent; import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.prelude.fastsearch.FS4InvokerFactory; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -10,8 +11,6 @@ import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; -import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; -import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; @@ -43,38 +42,25 @@ public class Dispatcher extends AbstractComponent { /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); - /** If enabled, search queries will use protobuf rpc */ - private static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf"); - /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; private final LoadBalancer loadBalancer; + private final RpcResourcePool rpcResourcePool; private final boolean multilevelDispatch; private final boolean internalDispatchByDefault; - private final FS4InvokerFactory fs4InvokerFactory; - private final RpcInvokerFactory rpcInvokerFactory; - public Dispatcher(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { - this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig, - fs4ResourcePool, new RpcResourcePool(dispatchConfig)); - } - - public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, RpcResourcePool rpcResourcePool) { - this(searchCluster, dispatchConfig, new FS4InvokerFactory(fs4ResourcePool, searchCluster), - new RpcInvokerFactory(rpcResourcePool, searchCluster)); + this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig); } - public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4InvokerFactory fs4InvokerFactory, RpcInvokerFactory rpcInvokerFactory) { + public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig) { this.searchCluster = searchCluster; this.loadBalancer = new LoadBalancer(searchCluster, dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); + this.rpcResourcePool = new RpcResourcePool(dispatchConfig); this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault(); - - this.fs4InvokerFactory = fs4InvokerFactory; - this.rpcInvokerFactory = rpcInvokerFactory; } /** Returns the search cluster this dispatches to */ @@ -84,16 +70,17 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { - rpcInvokerFactory.release(); + rpcResourcePool.release(); } - public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher) { - Optional<FillInvoker> rpcInvoker = rpcInvokerFactory.createFillInvoker(searcher, result); + public Optional<FillInvoker> getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb, + FS4InvokerFactory fs4InvokerFactory) { + Optional<FillInvoker> rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb); if (rpcInvoker.isPresent()) { return rpcInvoker; } if (result.getQuery().properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { - Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.createFillInvoker(searcher, result); + Optional<FillInvoker> fs4Invoker = fs4InvokerFactory.getFillInvoker(result); if (fs4Invoker.isPresent()) { return fs4Invoker; } @@ -101,17 +88,15 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } - public Optional<SearchInvoker> getSearchInvoker(Query query, VespaBackEndSearcher searcher) { + public Optional<SearchInvoker> getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) { if (multilevelDispatch || ! query.properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { return Optional.empty(); } - InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, false) ? rpcInvokerFactory : fs4InvokerFactory; - - Optional<SearchInvoker> invoker = getSearchPathInvoker(query, factory, searcher); + Optional<SearchInvoker> invoker = getSearchPathInvoker(query, fs4InvokerFactory::getSearchInvoker); if (!invoker.isPresent()) { - invoker = getInternalInvoker(query, factory, searcher); + invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker); } if (invoker.isPresent() && query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); @@ -120,12 +105,13 @@ public class Dispatcher extends AbstractComponent { return invoker; } - public FS4InvokerFactory getFS4InvokerFactory() { - return fs4InvokerFactory; + @FunctionalInterface + private interface SearchInvokerSupplier { + Optional<SearchInvoker> supply(Query query, OptionalInt groupId, List<Node> nodes, boolean acceptIncompleteCoverage); } // build invoker based on searchpath - private Optional<SearchInvoker> getSearchPathInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) { + private Optional<SearchInvoker> getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) { String searchPath = query.getModel().getSearchPath(); if(searchPath == null) { return Optional.empty(); @@ -136,19 +122,19 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } else { query.trace(false, 2, "Dispatching internally with search path ", searchPath); - return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true); + return invokerFactory.supply(query, OptionalInt.empty(), nodes, true); } } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage()))); } } - private Optional<SearchInvoker> getInternalInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) { + private Optional<SearchInvoker> getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) { Optional<Node> directNode = searchCluster.directDispatchTarget(); if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching directly to ", node); - return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true); + return invokerFactory.supply(query, OptionalInt.empty(), Arrays.asList(node), true); } int covered = searchCluster.groupsWithSufficientCoverage(); @@ -163,7 +149,7 @@ public class Dispatcher extends AbstractComponent { } Group group = groupInCluster.get(); boolean acceptIncompleteCoverage = (i == max - 1); - Optional<SearchInvoker> invoker = invokerFactory.createSearchInvoker(searcher, query, OptionalInt.of(group.id()), group.nodes(), + Optional<SearchInvoker> invoker = invokerFactory.supply(query, OptionalInt.of(group.id()), group.nodes(), acceptIncompleteCoverage); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching internally to search group ", group.id()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java deleted file mode 100644 index ca471fb2baa..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; - -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; - -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.Set; - -/** - * @author ollivir - */ -public abstract class InvokerFactory { - public abstract Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, - List<Node> nodes, boolean acceptIncompleteCoverage); - - public abstract Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result); - - protected static SearchInvoker createCoverageErrorInvoker(List<Node> nodes, Set<Integer> failed) { - StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); - int count = 0; - for (Node node : nodes) { - if (failed.contains(node.key())) { - if (count > 0) { - down.append(", "); - } - count++; - down.append(node.key()); - } - } - Coverage coverage = new Coverage(0, 0, 0); - coverage.setNodesTried(count); - return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java index 32a7917d43c..2a4767bc389 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java @@ -1,5 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; +package com.yahoo.search.dispatch; import com.yahoo.compress.CompressionType; import com.yahoo.jrt.DataValue; @@ -40,19 +40,7 @@ class RpcClient implements Client { request.setContext(hits); RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver)); - } - - @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { - Request request = new Request("vespa.searchprotocol.search"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedPayload)); - - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcSearchResponseWaiter(rpcNode, responseReceiver)); + rpcNode.invokeAsync(request, timeoutSeconds, new RpcResponseWaiter(rpcNode, responseReceiver)); } private static class RpcNodeConnection implements NodeConnection { @@ -95,7 +83,7 @@ class RpcClient implements Client { } - private static class RpcDocsumResponseWaiter implements RequestWaiter { + private static class RpcResponseWaiter implements RequestWaiter { /** The node to which we made the request we are waiting for - for error messages only */ private final RpcNodeConnection node; @@ -103,7 +91,7 @@ class RpcClient implements Client { /** The handler to which the response is forwarded */ private final RpcFillInvoker.GetDocsumsResponseReceiver handler; - public RpcDocsumResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { + public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { this.node = node; this.handler = handler; } @@ -136,39 +124,4 @@ class RpcClient implements Client { } - private static class RpcSearchResponseWaiter implements RequestWaiter { - - /** The node to which we made the request we are waiting for - for error messages only */ - private final RpcNodeConnection node; - - /** The handler to which the response is forwarded */ - private final RpcSearchInvoker handler; - - public RpcSearchResponseWaiter(RpcNodeConnection node, RpcSearchInvoker handler) { - this.node = node; - this.handler = handler; - } - - @Override - public void handleRequestDone(Request requestWithResponse) { - if (requestWithResponse.isError()) { - handler.receive(SearchResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); - return; - } - - Values returnValues = requestWithResponse.returnValues(); - if (returnValues.size() < 3) { - handler.receive(SearchResponseOrError.fromError( - "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size())); - return; - } - - byte compression = returnValues.get(0).asInt8(); - int uncompressedSize = returnValues.get(1).asInt32(); - byte[] compressedPayload = returnValues.get(2).asData(); - handler.receive(SearchResponseOrError.fromResponse(new SearchResponse(compression, uncompressedSize, compressedPayload))); - } - - } - } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java index b7286997514..578c447dfbe 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java @@ -1,5 +1,5 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; +package com.yahoo.search.dispatch; import com.yahoo.collections.ListMap; import com.yahoo.compress.CompressionType; @@ -12,7 +12,6 @@ import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.search.Query; import com.yahoo.search.Result; -import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.query.SessionId; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java index 830ba45ef0f..29641080ba6 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java @@ -1,13 +1,16 @@ // Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; +package com.yahoo.search.dispatch; import com.google.common.collect.ImmutableMap; import com.yahoo.compress.Compressor; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.dispatch.FillInvoker; +import com.yahoo.search.Query; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Map; +import java.util.Optional; /** * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains @@ -19,6 +22,9 @@ public class RpcResourcePool { /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); + /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ + private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); + private final Compressor compressor = new Compressor(); private final Client client; @@ -41,6 +47,22 @@ public class RpcResourcePool { this.nodeConnections = nodeConnectionsBuilder.build(); } + public Optional<FillInvoker> getFillInvoker(Query query, VespaBackEndSearcher searcher, DocumentDatabase documentDb) { + if (query.properties().getBoolean(dispatchSummaries, true) + && ! searcher.summaryNeedsQuery(query) + && query.getRanking().getLocation() == null) + { + return Optional.of(new RpcFillInvoker(this, documentDb)); + } else { + return Optional.empty(); + } + } + + // for testing + public FillInvoker getFillInvoker(DocumentDatabase documentDb) { + return new RpcFillInvoker(this, documentDb); + } + public Compressor compressor() { return compressor; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java index 1650494db3a..0d7ef53bb50 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -5,8 +5,6 @@ import com.yahoo.fs4.QueryPacket; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; import java.io.IOException; @@ -55,13 +53,4 @@ public abstract class SearchInvoker extends CloseableInvoker { protected Optional<Integer> distributionKey() { return node.map(Node::key); } - - protected Result errorResult(Query query, ErrorMessage errorMessage) { - Result error = new Result(query, errorMessage); - Coverage errorCoverage = new Coverage(0, 0, 0); - errorCoverage.setNodesTried(1); - error.setCoverage(errorCoverage); - return error; - } - } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java deleted file mode 100644 index 757315cb108..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import ai.vespa.searchlib.searchprotocol.protobuf.Search.StringProperty; -import ai.vespa.searchlib.searchprotocol.protobuf.Search.TensorProperty; -import com.google.protobuf.ByteString; -import com.yahoo.tensor.Tensor; -import com.yahoo.tensor.serialization.TypedBinaryFormat; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * @author ollivir - */ -public class MapConverter { - @FunctionalInterface - public interface PropertyInserter<T> { - void add(T prop); - } - - public static void convertMapTensors(Map<String, Object> map, PropertyInserter<TensorProperty.Builder> inserter) { - for (var entry : map.entrySet()) { - var value = entry.getValue(); - if (value instanceof Tensor) { - byte[] tensor = TypedBinaryFormat.encode((Tensor) value); - inserter.add(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor))); - } - } - } - - public static void convertMapStrings(Map<String, Object> map, PropertyInserter<StringProperty.Builder> inserter) { - for (var entry : map.entrySet()) { - var value = entry.getValue(); - if (!(value instanceof Tensor)) { - inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString())); - } - } - } - - public static void convertStringMultiMap(Map<String, List<String>> map, PropertyInserter<StringProperty.Builder> inserter) { - for (var entry : map.entrySet()) { - var values = entry.getValue(); - if (values != null) { - inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values)); - } - } - } - - public static void convertMultiMap(Map<String, List<Object>> map, PropertyInserter<StringProperty.Builder> stringInserter, - PropertyInserter<TensorProperty.Builder> tensorInserter) { - for (var entry : map.entrySet()) { - if (entry.getValue() != null) { - var key = entry.getKey(); - var stringValues = new LinkedList<String>(); - for (var value : entry.getValue()) { - if (value != null) { - if (value instanceof Tensor) { - byte[] tensor = TypedBinaryFormat.encode((Tensor) value); - tensorInserter.add(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor))); - } else { - stringValues.add(value.toString()); - } - } - } - if (!stringValues.isEmpty()) { - stringInserter.add(StringProperty.newBuilder().setName(key).addAllValues(stringValues)); - } - } - } - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java deleted file mode 100644 index 12be9a5c058..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java +++ /dev/null @@ -1,223 +0,0 @@ -package com.yahoo.search.dispatch.rpc; - -import ai.vespa.searchlib.searchprotocol.protobuf.Search; -import ai.vespa.searchlib.searchprotocol.protobuf.Search.Request.Builder; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.yahoo.document.GlobalId; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.io.GrowableByteBuffer; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.GroupingListHit; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.grouping.vespa.GroupingExecutor; -import com.yahoo.search.query.Model; -import com.yahoo.search.query.Presentation; -import com.yahoo.search.query.Ranking; -import com.yahoo.search.query.Sorting; -import com.yahoo.search.query.Sorting.Order; -import com.yahoo.search.query.ranking.RankFeatures; -import com.yahoo.search.query.ranking.RankProperties; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.Relevance; -import com.yahoo.searchlib.aggregation.Grouping; -import com.yahoo.vespa.objects.BufferSerializer; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class ProtobufSerialization { - private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024; - - public static byte[] serializeQuery(Query query, String serverId, boolean includeQueryData) { - return convertFromQuery(query, serverId, includeQueryData).toByteArray(); - } - - public static byte[] serializeResult(Result searchResult) { - return convertFromResult(searchResult).toByteArray(); - } - - public static Result deserializeToResult(byte[] payload, Query query, VespaBackEndSearcher searcher) - throws InvalidProtocolBufferException { - var protobuf = Search.Reply.parseFrom(payload); - var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query)); - return result; - } - - private static Search.Request convertFromQuery(Query query, String serverId, boolean includeQueryData) { - var builder = Search.Request.newBuilder().setHits(query.getHits()).setOffset(query.getOffset()) - .setTimeout((int) query.getTimeLeft()); - - mergeToRequestFromRanking(query.getRanking(), builder, includeQueryData); - mergeToRequestFromModel(query.getModel(), builder); - - if (query.getGroupingSessionCache() || query.getRanking().getQueryCache()) { - // TODO verify that the session key is included whenever rank properties would have been - builder.setSessionKey(query.getSessionId(serverId).toString()); - } - if (query.properties().getBoolean(Model.ESTIMATE)) { - builder.setHits(0); - } - if (GroupingExecutor.hasGroupingList(query)) { - List<Grouping> groupingList = GroupingExecutor.getGroupingList(query); - BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer()); - gbuf.putInt(null, groupingList.size()); - for (Grouping g : groupingList) { - g.serialize(gbuf); - } - gbuf.getBuf().flip(); - builder.setGroupingBlob(ByteString.copyFrom(gbuf.getBuf().getByteBuffer())); - } - - mergeToRequestFromPresentation(query.getPresentation(), builder, includeQueryData); - if (query.getGroupingSessionCache()) { - builder.setCacheGrouping(true); - } - - return builder.build(); - } - - private static void mergeToRequestFromModel(Model model, Search.Request.Builder builder) { - if (model.getDocumentDb() != null) { - builder.setDocumentType(model.getDocumentDb()); - } - int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE; - boolean success = false; - while (!success) { - try { - ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize); - model.getQueryTree().encode(treeBuffer); - treeBuffer.flip(); - builder.setQueryTreeBlob(ByteString.copyFrom(treeBuffer)); - success = true; - } catch (java.nio.BufferOverflowException e) { - bufferSize *= 2; - } - } - } - - private static void mergeToRequestFromPresentation(Presentation presentation, Search.Request.Builder builder, - boolean includeQueryData) { - if (includeQueryData && presentation.getHighlight() != null) { - MapConverter.convertStringMultiMap(presentation.getHighlight().getHighlightTerms(), builder::addHighlightTerms); - } - } - - private static void mergeToRequestFromSorting(Sorting sorting, Search.Request.Builder builder, boolean includeQueryData) { - for (var field : sorting.fieldOrders()) { - var sortField = Search.SortField.newBuilder().setField(field.getSorter().getName()) - .setAscending(field.getSortOrder() == Order.ASCENDING).build(); - builder.addSorting(sortField); - } - } - - private static void mergeToRequestFromRanking(Ranking ranking, Search.Request.Builder builder, boolean includeQueryData) { - builder.setRankProfile(ranking.getProfile()); - if (ranking.getQueryCache()) { - builder.setCacheQuery(true); - } - if (ranking.getSorting() != null) { - mergeToRequestFromSorting(ranking.getSorting(), builder, includeQueryData); - } - if (ranking.getLocation() != null) { - builder.setGeoLocation(ranking.getLocation().toString()); - } - mergeToRequestFromRankFeatures(ranking.getFeatures(), builder, includeQueryData); - mergeToRequestFromRankProperties(ranking.getProperties(), builder, includeQueryData); - } - - private static void mergeToRequestFromRankFeatures(RankFeatures features, Search.Request.Builder builder, boolean includeQueryData) { - if (includeQueryData) { - MapConverter.convertMapStrings(features.asMap(), builder::addFeatureOverrides); - MapConverter.convertMapTensors(features.asMap(), builder::addTensorFeatureOverrides); - } - } - - private static void mergeToRequestFromRankProperties(RankProperties properties, Builder builder, boolean includeQueryData) { - if (includeQueryData) { - MapConverter.convertMultiMap(properties.asMap(), propB -> { - if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) { - builder.addRankProperties(propB); - } - }, builder::addTensorRankProperties); - } - } - - private static Result convertToResult(Query query, Search.Reply protobuf, DocumentDatabase documentDatabase) { - var result = new Result(query); - - result.setTotalHitCount(protobuf.getTotalHitCount()); - result.setCoverage(convertToCoverage(protobuf)); - - if (protobuf.getGroupingBlob() != null && !protobuf.getGroupingBlob().isEmpty()) { - ArrayList<Grouping> list = new ArrayList<>(); - BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(protobuf.getGroupingBlob().asReadOnlyByteBuffer())); - int cnt = buf.getInt(null); - for (int i = 0; i < cnt; i++) { - Grouping g = new Grouping(); - g.deserialize(buf); - list.add(g); - } - GroupingListHit hit = new GroupingListHit(list, documentDatabase.getDocsumDefinitionSet()); - hit.setQuery(query); - result.hits().add(hit); - } - - for (var replyHit : protobuf.getHitsList()) { - FastHit hit = new FastHit(); - hit.setQuery(query); - - hit.setRelevance(new Relevance(replyHit.getRelevance())); - hit.setGlobalId(new GlobalId(replyHit.getGlobalId().toByteArray())); - - hit.setFillable(); - hit.setCached(false); - - result.hits().add(hit); - } - - return result; - } - - private static Coverage convertToCoverage(Search.Reply protobuf) { - var coverage = new Coverage(protobuf.getCoverageDocs(), protobuf.getActiveDocs(), 1); - coverage.setNodesTried(1).setSoonActive(protobuf.getSoonActiveDocs()); - - int degradedReason = 0; - if (protobuf.getDegradedByMatchPhase()) - degradedReason |= Coverage.DEGRADED_BY_MATCH_PHASE; - if (protobuf.getDegradedBySoftTimeout()) - degradedReason |= Coverage.DEGRADED_BY_TIMEOUT; - coverage.setDegradedReason(degradedReason); - - return coverage; - } - - private static Search.Reply convertFromResult(Result result) { - var builder = Search.Reply.newBuilder(); - - var coverage = result.getCoverage(false); - if (coverage != null) { - builder.setCoverageDocs(coverage.getDocs()).setActiveDocs(coverage.getActive()).setSoonActiveDocs(coverage.getSoonActive()) - .setDegradedBySoftTimeout(coverage.isDegradedByTimeout()).setDegradedByMatchPhase(coverage.isDegradedByMatchPhase()); - } - - result.hits().iterator().forEachRemaining(hit -> { - var hitBuilder = Search.Hit.newBuilder(); - if (hit.getRelevance() != null) { - hitBuilder.setRelevance(hit.getRelevance().getScore()); - } - if (hit instanceof FastHit) { - FastHit fhit = (FastHit) hit; - hitBuilder.setGlobalId(ByteString.copyFrom(fhit.getGlobalId().getRawId())); - } - builder.addHits(hitBuilder); - }); - return builder.build(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java deleted file mode 100644 index c8019278710..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.search.dispatch.InterleavedSearchInvoker; -import com.yahoo.search.dispatch.InvokerFactory; -import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.Set; - -/** - * @author ollivir - */ -public class RpcInvokerFactory extends InvokerFactory { - /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ - private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); - - private final RpcResourcePool rpcResourcePool; - private final SearchCluster searchCluster; - - public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) { - this.rpcResourcePool = rpcResourcePool; - this.searchCluster = searchCluster; - } - - @Override - public Optional<SearchInvoker> createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List<Node> nodes, - boolean acceptIncompleteCoverage) { - List<SearchInvoker> invokers = new ArrayList<>(nodes.size()); - Set<Integer> failed = null; - for (Node node : nodes) { - if (node.isWorking()) { - invokers.add(new RpcSearchInvoker(searcher, node, rpcResourcePool)); - } else { - if (failed == null) { - failed = new HashSet<>(); - } - failed.add(node.key()); - } - } - - if (failed != null) { - List<Node> success = new ArrayList<>(nodes.size() - failed.size()); - for (Node node : nodes) { - if (!failed.contains(node.key())) { - success.add(node); - } - } - if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) { - if (acceptIncompleteCoverage) { - invokers.add(createCoverageErrorInvoker(nodes, failed)); - } else { - return Optional.empty(); - } - } - } - - if (invokers.size() == 1) { - return Optional.of(invokers.get(0)); - } else { - return Optional.of(new InterleavedSearchInvoker(invokers, searcher, searchCluster)); - } - } - - @Override - public Optional<FillInvoker> createFillInvoker(VespaBackEndSearcher searcher, Result result) { - Query query = result.getQuery(); - if (query.properties().getBoolean(dispatchSummaries, true) - && ! searcher.summaryNeedsQuery(query) - && query.getRanking().getLocation() == null) - { - return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))); - } else { - return Optional.empty(); - } - } - - // for testing - public FillInvoker createFillInvoker(DocumentDatabase documentDb) { - return new RpcFillInvoker(rpcResourcePool, documentDb); - } - - public void release() { - rpcResourcePool.release(); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java deleted file mode 100644 index 88d77c760e3..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.rpc.Client.SearchResponse; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.searchchain.Execution; - -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * {@link SearchInvoker} implementation using RPC - * - * @author ollivir - */ -public class RpcSearchInvoker extends SearchInvoker { - private final VespaBackEndSearcher searcher; - private final Node node; - private final RpcResourcePool resourcePool; - private final BlockingQueue<Client.SearchResponseOrError> responses; - - private Query query; - - RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) { - super(Optional.of(node)); - this.searcher = searcher; - this.node = node; - this.resourcePool = resourcePool; - this.responses = new LinkedBlockingQueue<>(1); - } - - @Override - protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { - this.query = query; - - CompressionType compression = CompressionType - .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); - if (nodeConnection == null) { - responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key())); - responseAvailable(); - return; - } - - var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, - timeoutSeconds); - } - - @Override - protected Result getSearchResult(Execution execution) throws IOException { - long timeLeftMs = query.getTimeLeft(); - if (timeLeftMs <= 0) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } - Client.SearchResponseOrError response = null; - try { - response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // handled as timeout - } - if (response == null) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } - if (response.error().isPresent()) { - return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get())); - } - if (response.response().isEmpty()) { - return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available")); - } - - SearchResponse searchResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(searchResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression, - searchResponse.uncompressedSize()); - var result = ProtobufSerialization.deserializeToResult(payload, query, searcher); - result.hits().unorderedIterator().forEachRemaining(hit -> { - if(hit instanceof FastHit) { - FastHit fhit = (FastHit) hit; - fhit.setPartId(node.pathIndex()); - fhit.setDistributionKey(node.key()); - } - hit.setSource(getName()); - }); - - return result; - } - - @Override - protected void release() { - // nothing to release - } - - public void receive(Client.SearchResponseOrError response) { - responses.add(response); - responseAvailable(); - } - - private String getName() { - return searcher.getName(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index bafc72b9b43..cb86f19e761 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -25,12 +25,6 @@ public class Group { public Group(int id, List<Node> nodes) { this.id = id; this.nodes = ImmutableList.copyOf(nodes); - - int idx = 0; - for(var node: nodes) { - node.setPathIndex(idx); - idx++; - } } /** Returns the unique identity of this group */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index 7e0e3117628..98deb9e3199 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -14,7 +14,6 @@ import java.util.concurrent.atomic.AtomicLong; public class Node { private final int key; - private int pathIndex; private final String hostname; private final int fs4port; final int group; @@ -32,12 +31,6 @@ public class Node { /** Returns the unique and stable distribution key of this node */ public int key() { return key; } - public int pathIndex() { return pathIndex; } - - void setPathIndex(int index) { - pathIndex = index; - } - public String hostname() { return hostname; } public int fs4port() { return fs4port; } |