From 7f56c30703f5319f5368653f6313a8f12f07b3cb Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Fri, 15 Mar 2019 18:14:17 +0100 Subject: Revert "Protobuf over jrt support in search" --- container-search/pom.xml | 5 - .../java/com/yahoo/fs4/mplex/FS4Connection.java | 1 + .../prelude/fastsearch/FS4InvokerFactory.java | 46 ++-- .../yahoo/prelude/fastsearch/FS4SearchInvoker.java | 36 ++- .../com/yahoo/prelude/fastsearch/FastSearcher.java | 11 +- .../prelude/fastsearch/VespaBackEndSearcher.java | 2 +- .../java/com/yahoo/prelude/query/Highlight.java | 8 +- .../src/main/java/com/yahoo/search/Query.java | 2 +- .../src/main/java/com/yahoo/search/Result.java | 8 +- .../java/com/yahoo/search/dispatch/Client.java | 90 ++++++++ .../java/com/yahoo/search/dispatch/Dispatcher.java | 56 ++--- .../com/yahoo/search/dispatch/InvokerFactory.java | 41 ---- .../java/com/yahoo/search/dispatch/RpcClient.java | 127 +++++++++++ .../com/yahoo/search/dispatch/RpcFillInvoker.java | 253 ++++++++++++++++++++ .../com/yahoo/search/dispatch/RpcResourcePool.java | 83 +++++++ .../com/yahoo/search/dispatch/SearchInvoker.java | 11 - .../java/com/yahoo/search/dispatch/rpc/Client.java | 144 ------------ .../yahoo/search/dispatch/rpc/MapConverter.java | 73 ------ .../search/dispatch/rpc/ProtobufSerialization.java | 223 ------------------ .../com/yahoo/search/dispatch/rpc/RpcClient.java | 174 -------------- .../yahoo/search/dispatch/rpc/RpcFillInvoker.java | 254 --------------------- .../search/dispatch/rpc/RpcInvokerFactory.java | 98 -------- .../yahoo/search/dispatch/rpc/RpcResourcePool.java | 61 ----- .../search/dispatch/rpc/RpcSearchInvoker.java | 118 ---------- .../yahoo/search/dispatch/searchcluster/Group.java | 6 - .../yahoo/search/dispatch/searchcluster/Node.java | 7 - .../main/java/com/yahoo/search/query/Model.java | 11 +- .../java/com/yahoo/search/query/Presentation.java | 4 +- .../main/java/com/yahoo/search/query/Ranking.java | 2 +- .../com/yahoo/search/dispatch/DispatcherTest.java | 35 +-- .../com/yahoo/search/dispatch/FillTestCase.java | 167 ++++++++++++++ .../java/com/yahoo/search/dispatch/MockClient.java | 121 ++++++++++ .../yahoo/search/dispatch/rpc/FillTestCase.java | 174 -------------- .../com/yahoo/search/dispatch/rpc/MockClient.java | 144 ------------ .../search/dispatch/rpc/RpcSearchInvokerTest.java | 95 -------- 35 files changed, 956 insertions(+), 1735 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/Client.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java delete mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java create mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java create mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java delete mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java delete mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java delete mode 100644 container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java (limited to 'container-search') diff --git a/container-search/pom.xml b/container-search/pom.xml index 1abaee91d20..35b1477f478 100644 --- a/container-search/pom.xml +++ b/container-search/pom.xml @@ -138,11 +138,6 @@ - - com.google.protobuf - protobuf-java - provided - javax.xml.bind jaxb-api diff --git a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java index 69267f4a6b2..11e7d4e3d35 100644 --- a/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java +++ b/container-search/src/main/java/com/yahoo/fs4/mplex/FS4Connection.java @@ -18,6 +18,7 @@ import com.yahoo.fs4.PacketListener; import com.yahoo.io.Connection; import com.yahoo.io.Listener; import com.yahoo.log.LogLevel; +import com.yahoo.search.Query; /** * diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java index 66088faed79..5700e316493 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4InvokerFactory.java @@ -8,10 +8,12 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.InterleavedFillInvoker; import com.yahoo.search.dispatch.InterleavedSearchInvoker; -import com.yahoo.search.dispatch.InvokerFactory; +import com.yahoo.search.dispatch.SearchErrorInvoker; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.search.result.Coverage; +import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import java.util.ArrayList; @@ -31,13 +33,15 @@ import java.util.Set; * * @author ollivir */ -public class FS4InvokerFactory extends InvokerFactory { +public class FS4InvokerFactory { private final FS4ResourcePool fs4ResourcePool; + private final VespaBackEndSearcher searcher; private final SearchCluster searchCluster; private final ImmutableMap nodesByKey; - public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster) { + public FS4InvokerFactory(FS4ResourcePool fs4ResourcePool, SearchCluster searchCluster, VespaBackEndSearcher searcher) { this.fs4ResourcePool = fs4ResourcePool; + this.searcher = searcher; this.searchCluster = searchCluster; ImmutableMap.Builder builder = ImmutableMap.builder(); @@ -45,7 +49,7 @@ public class FS4InvokerFactory extends InvokerFactory { this.nodesByKey = builder.build(); } - public SearchInvoker createSearchInvoker(VespaBackEndSearcher searcher, Query query, Node node) { + public SearchInvoker getSearchInvoker(Query query, Node node) { Backend backend = fs4ResourcePool.getBackend(node.hostname(), node.fs4port()); return new FS4SearchInvoker(searcher, query, backend.openChannel(), Optional.of(node)); } @@ -53,8 +57,6 @@ public class FS4InvokerFactory extends InvokerFactory { /** * Create a {@link SearchInvoker} for a list of content nodes. * - * @param searcher - * the searcher processing the query * @param query * the search query being processed * @param groupId @@ -68,9 +70,7 @@ public class FS4InvokerFactory extends InvokerFactory { * @return Optional containing the SearchInvoker or empty if some node in the * list is invalid and the remaining coverage is not sufficient */ - @Override - public Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List nodes, - boolean acceptIncompleteCoverage) { + public Optional getSearchInvoker(Query query, OptionalInt groupId, List nodes, boolean acceptIncompleteCoverage) { List invokers = new ArrayList<>(nodes.size()); Set failed = null; for (Node node : nodes) { @@ -114,27 +114,44 @@ public class FS4InvokerFactory extends InvokerFactory { } } - public FillInvoker createFillInvoker(VespaBackEndSearcher searcher, Result result, Node node) { - return new FS4FillInvoker(searcher, result.getQuery(), fs4ResourcePool, node.hostname(), node.fs4port()); + private SearchInvoker createCoverageErrorInvoker(List nodes, Set failed) { + StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); + int count = 0; + for (Node node : nodes) { + if (failed.contains(node.key())) { + if (count > 0) { + down.append(", "); + } + count++; + down.append(node.key()); + } + } + Coverage coverage = new Coverage(0, 0, 0); + coverage.setNodesTried(count); + return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); + } + + public FillInvoker getFillInvoker(Query query, Node node) { + return new FS4FillInvoker(searcher, query, fs4ResourcePool, node.hostname(), node.fs4port()); } /** * Create a {@link FillInvoker} for a the hits in a {@link Result}. * - * @param searcher the searcher processing the query * @param result the Result containing hits that need to be filled * @return Optional containing the FillInvoker or empty if some hit is from an unknown content node */ - public Optional createFillInvoker(VespaBackEndSearcher searcher, Result result) { + public Optional getFillInvoker(Result result) { Collection requiredNodes = requiredFillNodes(result); + Query query = result.getQuery(); Map invokers = new HashMap<>(); for (Integer distKey : requiredNodes) { Node node = nodesByKey.get(distKey); if (node == null) { return Optional.empty(); } - invokers.put(distKey, createFillInvoker(searcher, result, node)); + invokers.put(distKey, getFillInvoker(query, node)); } if (invokers.size() == 1) { @@ -155,5 +172,4 @@ public class FS4InvokerFactory extends InvokerFactory { } return requiredNodes; } - } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java index 24653db5671..d2910ba3fbc 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FS4SearchInvoker.java @@ -12,11 +12,13 @@ import com.yahoo.search.Result; import com.yahoo.search.dispatch.ResponseMonitor; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; import java.io.IOException; import java.util.Optional; +import java.util.logging.Level; import java.util.logging.Logger; /** @@ -25,7 +27,6 @@ import java.util.logging.Logger; * @author ollivir */ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor { - private static final Logger log = Logger.getLogger(FS4SearchInvoker.class.getName()); private final VespaBackEndSearcher searcher; private FS4Channel channel; @@ -45,7 +46,8 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor "got packets " + basicPackets.length + " packets"); + if (isLoggingFine()) + getLogger().finest("got packets " + basicPackets.length + " packets"); basicPackets[0].ensureInstanceOf(QueryResultPacket.class, getName()); QueryResultPacket resultPacket = (QueryResultPacket) basicPackets[0]; - log.finest(() -> "got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); + if (isLoggingFine()) + getLogger().finest("got query packet. " + "docsumClass=" + query.getPresentation().getSummary()); if (query.getPresentation().getSummary() == null) query.getPresentation().setSummary(searcher.getDefaultDocsumClass()); @@ -110,6 +114,14 @@ public class FS4SearchInvoker extends SearchInvoker implements ResponseMonitor invoker = dispatcher.getSearchInvoker(query, this); + Optional invoker = dispatcher.getSearchInvoker(query, fs4InvokerFactory); if (invoker.isPresent()) { return invoker.get(); } Optional direct = getDirectNode(query); if(direct.isPresent()) { - return dispatcher.getFS4InvokerFactory().createSearchInvoker(this, query, direct.get()); + return fs4InvokerFactory.getSearchInvoker(query, direct.get()); } return new FS4SearchInvoker(this, query, dispatchBackend.openChannel(), Optional.empty()); } @@ -225,14 +228,14 @@ public class FastSearcher extends VespaBackEndSearcher { */ private FillInvoker getFillInvoker(Result result) { Query query = result.getQuery(); - Optional invoker = dispatcher.getFillInvoker(result, this); + Optional invoker = dispatcher.getFillInvoker(result, this, getDocumentDatabase(query), fs4InvokerFactory); if (invoker.isPresent()) { return invoker.get(); } Optional direct = getDirectNode(query); if (direct.isPresent()) { - return dispatcher.getFS4InvokerFactory().createFillInvoker(this, result, direct.get()); + return fs4InvokerFactory.getFillInvoker(query, direct.get()); } return new FS4FillInvoker(this, query, dispatchBackend); } diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java index dccda0bf733..bee1fab5686 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/VespaBackEndSearcher.java @@ -122,7 +122,7 @@ public abstract class VespaBackEndSearcher extends PingableSearcher { public String getServerId() { return serverId; } - public DocumentDatabase getDocumentDatabase(Query query) { + protected DocumentDatabase getDocumentDatabase(Query query) { if (query.getModel().getRestrict().size() == 1) { String docTypeName = (String)query.getModel().getRestrict().toArray()[0]; DocumentDatabase db = documentDbs.get(docTypeName); diff --git a/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java b/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java index 44691b04b84..81c68ccc2b9 100644 --- a/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java +++ b/container-search/src/main/java/com/yahoo/prelude/query/Highlight.java @@ -1,11 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.prelude.query; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import static com.yahoo.language.LinguisticsCase.toLowerCase; @@ -135,6 +131,8 @@ public class Highlight implements Cloneable { } } + + } diff --git a/container-search/src/main/java/com/yahoo/search/Query.java b/container-search/src/main/java/com/yahoo/search/Query.java index 1fb199aa26a..97a8c35bfa3 100644 --- a/container-search/src/main/java/com/yahoo/search/Query.java +++ b/container-search/src/main/java/com/yahoo/search/Query.java @@ -46,7 +46,7 @@ import com.yahoo.search.yql.NullItemException; import com.yahoo.search.yql.VespaSerializer; import com.yahoo.search.yql.YqlParser; import com.yahoo.yolean.Exceptions; - +import edu.umd.cs.findbugs.annotations.Nullable; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; diff --git a/container-search/src/main/java/com/yahoo/search/Result.java b/container-search/src/main/java/com/yahoo/search/Result.java index 4080b09f40b..364e60e6263 100644 --- a/container-search/src/main/java/com/yahoo/search/Result.java +++ b/container-search/src/main/java/com/yahoo/search/Result.java @@ -5,12 +5,7 @@ import com.yahoo.collections.ListMap; import com.yahoo.net.URI; import com.yahoo.protect.Validator; import com.yahoo.search.query.context.QueryContext; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; -import com.yahoo.search.result.HitGroup; -import com.yahoo.search.result.HitOrderer; -import com.yahoo.search.result.HitSortOrderer; +import com.yahoo.search.result.*; import com.yahoo.search.statistics.ElapsedTime; import java.util.Iterator; @@ -330,4 +325,5 @@ public final class Result extends com.yahoo.processing.Response implements Clone headers = new ListMap<>(); return headers; } + } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java new file mode 100644 index 00000000000..431b36c2623 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Client.java @@ -0,0 +1,90 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.compress.CompressionType; +import com.yahoo.prelude.fastsearch.FastHit; + +import java.util.List; +import java.util.Optional; + +/** + * A dispatch client. + * + * @author bratseth + */ +interface Client { + + void getDocsums(List hits, NodeConnection node, CompressionType compression, + int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, + double timeoutSeconds); + + /** Creates a connection to a particular node in this */ + NodeConnection createConnection(String hostname, int port); + + class GetDocsumsResponseOrError { + + // One of these will be non empty and the other not + private Optional response; + private Optional error; + + public static GetDocsumsResponseOrError fromResponse(GetDocsumsResponse response) { + return new GetDocsumsResponseOrError(Optional.of(response), Optional.empty()); + } + + public static GetDocsumsResponseOrError fromError(String error) { + return new GetDocsumsResponseOrError(Optional.empty(), Optional.of(error)); + } + + private GetDocsumsResponseOrError(Optional response, Optional error) { + this.response = response; + this.error = error; + } + + /** Returns the response, or empty if there is an error */ + public Optional response() { return response; } + + /** Returns the error or empty if there is a response */ + public Optional error() { return error; } + + } + + class GetDocsumsResponse { + + private final byte compression; + private final int uncompressedSize; + private final byte[] compressedSlimeBytes; + private final List hitsContext; + + public GetDocsumsResponse(byte compression, int uncompressedSize, byte[] compressedSlimeBytes, List hitsContext) { + this.compression = compression; + this.uncompressedSize = uncompressedSize; + this.compressedSlimeBytes = compressedSlimeBytes; + this.hitsContext = hitsContext; + } + + public byte compression() { + return compression; + } + + public int uncompressedSize() { + return uncompressedSize; + } + + public byte[] compressedSlimeBytes() { + return compressedSlimeBytes; + } + + public List hitsContext() { + return hitsContext; + } + + } + + interface NodeConnection { + + /** Closes this connection */ + void close(); + + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 0aee51e1e32..146b132be22 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -3,6 +3,7 @@ package com.yahoo.search.dispatch; import com.yahoo.component.AbstractComponent; import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.DocumentDatabase; import com.yahoo.prelude.fastsearch.FS4InvokerFactory; import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; @@ -10,8 +11,6 @@ import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; -import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; -import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; @@ -43,38 +42,25 @@ public class Dispatcher extends AbstractComponent { /** If enabled, this internal dispatcher will be preferred over fdispatch whenever possible */ private static final CompoundName dispatchInternal = new CompoundName("dispatch.internal"); - /** If enabled, search queries will use protobuf rpc */ - private static final CompoundName dispatchProtobuf = new CompoundName("dispatch.protobuf"); - /** A model of the search cluster this dispatches to */ private final SearchCluster searchCluster; private final LoadBalancer loadBalancer; + private final RpcResourcePool rpcResourcePool; private final boolean multilevelDispatch; private final boolean internalDispatchByDefault; - private final FS4InvokerFactory fs4InvokerFactory; - private final RpcInvokerFactory rpcInvokerFactory; - public Dispatcher(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { - this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig, - fs4ResourcePool, new RpcResourcePool(dispatchConfig)); - } - - public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, RpcResourcePool rpcResourcePool) { - this(searchCluster, dispatchConfig, new FS4InvokerFactory(fs4ResourcePool, searchCluster), - new RpcInvokerFactory(rpcResourcePool, searchCluster)); + this(new SearchCluster(clusterId, dispatchConfig, fs4ResourcePool, containerClusterSize, vipStatus), dispatchConfig); } - public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, FS4InvokerFactory fs4InvokerFactory, RpcInvokerFactory rpcInvokerFactory) { + public Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig) { this.searchCluster = searchCluster; this.loadBalancer = new LoadBalancer(searchCluster, dispatchConfig.distributionPolicy() == DispatchConfig.DistributionPolicy.ROUNDROBIN); + this.rpcResourcePool = new RpcResourcePool(dispatchConfig); this.multilevelDispatch = dispatchConfig.useMultilevelDispatch(); this.internalDispatchByDefault = !dispatchConfig.useFdispatchByDefault(); - - this.fs4InvokerFactory = fs4InvokerFactory; - this.rpcInvokerFactory = rpcInvokerFactory; } /** Returns the search cluster this dispatches to */ @@ -84,16 +70,17 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { - rpcInvokerFactory.release(); + rpcResourcePool.release(); } - public Optional getFillInvoker(Result result, VespaBackEndSearcher searcher) { - Optional rpcInvoker = rpcInvokerFactory.createFillInvoker(searcher, result); + public Optional getFillInvoker(Result result, VespaBackEndSearcher searcher, DocumentDatabase documentDb, + FS4InvokerFactory fs4InvokerFactory) { + Optional rpcInvoker = rpcResourcePool.getFillInvoker(result.getQuery(), searcher, documentDb); if (rpcInvoker.isPresent()) { return rpcInvoker; } if (result.getQuery().properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { - Optional fs4Invoker = fs4InvokerFactory.createFillInvoker(searcher, result); + Optional fs4Invoker = fs4InvokerFactory.getFillInvoker(result); if (fs4Invoker.isPresent()) { return fs4Invoker; } @@ -101,17 +88,15 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } - public Optional getSearchInvoker(Query query, VespaBackEndSearcher searcher) { + public Optional getSearchInvoker(Query query, FS4InvokerFactory fs4InvokerFactory) { if (multilevelDispatch || ! query.properties().getBoolean(dispatchInternal, internalDispatchByDefault)) { return Optional.empty(); } - InvokerFactory factory = query.properties().getBoolean(dispatchProtobuf, false) ? rpcInvokerFactory : fs4InvokerFactory; - - Optional invoker = getSearchPathInvoker(query, factory, searcher); + Optional invoker = getSearchPathInvoker(query, fs4InvokerFactory::getSearchInvoker); if (!invoker.isPresent()) { - invoker = getInternalInvoker(query, factory, searcher); + invoker = getInternalInvoker(query, fs4InvokerFactory::getSearchInvoker); } if (invoker.isPresent() && query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); @@ -120,12 +105,13 @@ public class Dispatcher extends AbstractComponent { return invoker; } - public FS4InvokerFactory getFS4InvokerFactory() { - return fs4InvokerFactory; + @FunctionalInterface + private interface SearchInvokerSupplier { + Optional supply(Query query, OptionalInt groupId, List nodes, boolean acceptIncompleteCoverage); } // build invoker based on searchpath - private Optional getSearchPathInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) { + private Optional getSearchPathInvoker(Query query, SearchInvokerSupplier invokerFactory) { String searchPath = query.getModel().getSearchPath(); if(searchPath == null) { return Optional.empty(); @@ -136,19 +122,19 @@ public class Dispatcher extends AbstractComponent { return Optional.empty(); } else { query.trace(false, 2, "Dispatching internally with search path ", searchPath); - return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), nodes, true); + return invokerFactory.supply(query, OptionalInt.empty(), nodes, true); } } catch (InvalidSearchPathException e) { return Optional.of(new SearchErrorInvoker(ErrorMessage.createIllegalQuery(e.getMessage()))); } } - private Optional getInternalInvoker(Query query, InvokerFactory invokerFactory, VespaBackEndSearcher searcher) { + private Optional getInternalInvoker(Query query, SearchInvokerSupplier invokerFactory) { Optional directNode = searchCluster.directDispatchTarget(); if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching directly to ", node); - return invokerFactory.createSearchInvoker(searcher, query, OptionalInt.empty(), Arrays.asList(node), true); + return invokerFactory.supply(query, OptionalInt.empty(), Arrays.asList(node), true); } int covered = searchCluster.groupsWithSufficientCoverage(); @@ -163,7 +149,7 @@ public class Dispatcher extends AbstractComponent { } Group group = groupInCluster.get(); boolean acceptIncompleteCoverage = (i == max - 1); - Optional invoker = invokerFactory.createSearchInvoker(searcher, query, OptionalInt.of(group.id()), group.nodes(), + Optional invoker = invokerFactory.supply(query, OptionalInt.of(group.id()), group.nodes(), acceptIncompleteCoverage); if (invoker.isPresent()) { query.trace(false, 2, "Dispatching internally to search group ", group.id()); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java deleted file mode 100644 index ca471fb2baa..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch; - -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; - -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.Set; - -/** - * @author ollivir - */ -public abstract class InvokerFactory { - public abstract Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, - List nodes, boolean acceptIncompleteCoverage); - - public abstract Optional createFillInvoker(VespaBackEndSearcher searcher, Result result); - - protected static SearchInvoker createCoverageErrorInvoker(List nodes, Set failed) { - StringBuilder down = new StringBuilder("Connection failure on nodes with distribution-keys: "); - int count = 0; - for (Node node : nodes) { - if (failed.contains(node.key())) { - if (count > 0) { - down.append(", "); - } - count++; - down.append(node.key()); - } - } - Coverage coverage = new Coverage(0, 0, 0); - coverage.setNodesTried(count); - return new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError(down.toString()), coverage); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java new file mode 100644 index 00000000000..2a4767bc389 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcClient.java @@ -0,0 +1,127 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.compress.CompressionType; +import com.yahoo.jrt.DataValue; +import com.yahoo.jrt.Int32Value; +import com.yahoo.jrt.Int8Value; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.jrt.Spec; +import com.yahoo.jrt.Supervisor; +import com.yahoo.jrt.Target; +import com.yahoo.jrt.Transport; +import com.yahoo.jrt.Values; +import com.yahoo.prelude.fastsearch.FastHit; + +import java.util.List; + +/** + * A client which uses rpc request to search nodes to implement the Client API. + * + * @author bratseth + */ +class RpcClient implements Client { + + private final Supervisor supervisor = new Supervisor(new Transport()); + + @Override + public NodeConnection createConnection(String hostname, int port) { + return new RpcNodeConnection(hostname, port, supervisor); + } + + @Override + public void getDocsums(List hits, NodeConnection node, CompressionType compression, int uncompressedLength, + byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { + Request request = new Request("proton.getDocsums"); + request.parameters().add(new Int8Value(compression.getCode())); + request.parameters().add(new Int32Value(uncompressedLength)); + request.parameters().add(new DataValue(compressedSlime)); + + request.setContext(hits); + RpcNodeConnection rpcNode = ((RpcNodeConnection) node); + rpcNode.invokeAsync(request, timeoutSeconds, new RpcResponseWaiter(rpcNode, responseReceiver)); + } + + private static class RpcNodeConnection implements NodeConnection { + + // Information about the connected node + private final Supervisor supervisor; + private final String hostname; + private final int port; + private final String description; + + // The current shared connection. This will be recycled when it becomes invalid. + // All access to this must be synchronized + private Target target = null; + + public RpcNodeConnection(String hostname, int port, Supervisor supervisor) { + this.supervisor = supervisor; + this.hostname = hostname; + this.port = port; + description = "rpc node connection to " + hostname + ":" + port; + } + + public void invokeAsync(Request req, double timeout, RequestWaiter waiter) { + // TODO: Consider replacing this by a watcher on the target + synchronized(this) { // ensure we have exactly 1 valid connection across threads + if (target == null || ! target.isValid()) + target = supervisor.connect(new Spec(hostname, port)); + } + target.invokeAsync(req, timeout, waiter); + } + + @Override + public void close() { + target.close(); + } + + @Override + public String toString() { + return description; + } + + } + + private static class RpcResponseWaiter implements RequestWaiter { + + /** The node to which we made the request we are waiting for - for error messages only */ + private final RpcNodeConnection node; + + /** The handler to which the response is forwarded */ + private final RpcFillInvoker.GetDocsumsResponseReceiver handler; + + public RpcResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { + this.node = node; + this.handler = handler; + } + + @Override + public void handleRequestDone(Request requestWithResponse) { + if (requestWithResponse.isError()) { + handler.receive(GetDocsumsResponseOrError.fromError("Error response from " + node + ": " + + requestWithResponse.errorMessage())); + return; + } + + Values returnValues = requestWithResponse.returnValues(); + if (returnValues.size() < 3) { + handler.receive(GetDocsumsResponseOrError.fromError("Invalid getDocsums response from " + node + + ": Expected 3 return arguments, got " + + returnValues.size())); + return; + } + + byte compression = returnValues.get(0).asInt8(); + int uncompressedSize = returnValues.get(1).asInt32(); + byte[] compressedSlimeBytes = returnValues.get(2).asData(); + List hits = (List) requestWithResponse.getContext(); + handler.receive(GetDocsumsResponseOrError.fromResponse(new GetDocsumsResponse(compression, + uncompressedSize, + compressedSlimeBytes, + hits))); + } + + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java new file mode 100644 index 00000000000..578c447dfbe --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcFillInvoker.java @@ -0,0 +1,253 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.collections.ListMap; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.container.protect.Error; +import com.yahoo.data.access.Inspector; +import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.prelude.fastsearch.TimeoutException; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.query.SessionId; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.result.Hit; +import com.yahoo.slime.ArrayTraverser; +import com.yahoo.slime.BinaryFormat; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Slime; + +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * {@link FillInvoker} implementation using RPC + * + * @author bratseth + * @author ollivir + */ +public class RpcFillInvoker extends FillInvoker { + private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName()); + + private final DocumentDatabase documentDb; + private final RpcResourcePool resourcePool; + + private GetDocsumsResponseReceiver responseReceiver; + + + RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) { + this.documentDb = documentDb; + this.resourcePool = resourcePool; + } + + @Override + protected void sendFillRequest(Result result, String summaryClass) { + ListMap hitsByNode = hitsByNode(result); + + CompressionType compression = CompressionType + .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); + + if (result.getQuery().getTraceLevel() >= 3) + result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3); + + responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), resourcePool.compressor(), result); + for (Map.Entry> nodeHits : hitsByNode.entrySet()) { + sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver); + } + } + + @Override + protected void getFillResults(Result result, String summaryClass) { + try { + responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb); + result.hits().setSorted(false); + result.analyzeHits(); + } catch (TimeoutException e) { + result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage())); + } + } + + @Override + protected void release() { + // nothing to release + } + + /** Return a map of hits by their search node (partition) id */ + private static ListMap hitsByNode(Result result) { + ListMap hitsByNode = new ListMap<>(); + for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext();) { + Hit h = i.next(); + if (!(h instanceof FastHit)) + continue; + FastHit hit = (FastHit) h; + + hitsByNode.put(hit.getDistributionKey(), hit); + } + return hitsByNode; + } + + /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ + private void sendGetDocsumsRequest(int nodeId, List hits, String summaryClass, CompressionType compression, + Result result, GetDocsumsResponseReceiver responseReceiver) { + Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); + if (node == null) { + String error = "Could not fill hits from unknown node " + nodeId; + responseReceiver.receive(Client.GetDocsumsResponseOrError.fromError(error)); + result.hits().addError(ErrorMessage.createEmptyDocsums(error)); + log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); + return; + } + + Query query = result.getQuery(); + String rankProfile = query.getRanking().getProfile(); + byte[] serializedSlime = BinaryFormat + .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits)); + double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; + Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime); + resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(), + responseReceiver, timeoutSeconds); + } + + static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List hits) { + Slime slime = new Slime(); + Cursor root = slime.setObject(); + if (summaryClass != null) { + root.setString("class", summaryClass); + } + if (sessionId != null) { + root.setData("sessionid", sessionId.asUtf8String().getBytes()); + } + if (docType != null) { + root.setString("doctype", docType); + } + if (rankProfile != null) { + root.setString("ranking", rankProfile); + } + Cursor gids = root.setArray("gids"); + for (FastHit hit : hits) { + gids.addData(hit.getGlobalId().getRawId()); + } + return slime; + } + + /** Receiver of the responses to a set of getDocsums requests */ + public static class GetDocsumsResponseReceiver { + + private final BlockingQueue responses; + private final Compressor compressor; + private final Result result; + + /** Whether we have already logged/notified about an error - to avoid spamming */ + private boolean hasReportedError = false; + + /** The number of responses we should receive (and process) before this is complete */ + private int outstandingResponses; + + GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) { + this.compressor = compressor; + responses = new LinkedBlockingQueue<>(requestCount); + outstandingResponses = requestCount; + this.result = result; + } + + /** Called by a thread belonging to the client when a valid response becomes available */ + public void receive(Client.GetDocsumsResponseOrError response) { + responses.add(response); + } + + private void throwTimeout() throws TimeoutException { + throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding."); + } + + /** + * Call this from the dispatcher thread to initiate and complete processing of responses. + * This will block until all responses are available and processed, or to timeout. + */ + void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException { + try { + int skippedHits = 0; + while (outstandingResponses > 0) { + long timeLeftMs = query.getTimeLeft(); + if (timeLeftMs <= 0) { + throwTimeout(); + } + Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); + if (response == null) + throwTimeout(); + skippedHits += processResponse(response, summaryClass, documentDb); + outstandingResponses--; + } + if (skippedHits != 0) { + result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + + summaryClass + " for " + skippedHits + " hits")); + } + } + catch (InterruptedException e) { + // TODO: Add error + } + } + + private int processResponse(Client.GetDocsumsResponseOrError responseOrError, + String summaryClass, + DocumentDatabase documentDb) { + if (responseOrError.error().isPresent()) { + if (hasReportedError) return 0; + String error = responseOrError.error().get(); + result.hits().addError(ErrorMessage.createBackendCommunicationError(error)); + log.log(Level.WARNING, "Error fetching summary data: "+ error); + } + else { + Client.GetDocsumsResponse response = responseOrError.response().get(); + CompressionType compression = CompressionType.valueOf(response.compression()); + byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize()); + return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes); + } + return 0; + } + + private void addErrors(com.yahoo.slime.Inspector errors) { + errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> { + int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) + ? Error.TIMEOUT.code + : Error.UNSPECIFIED.code; + result.hits().addError(new ErrorMessage(errorCode, + value.field("message").asString(), value.field("details").asString())); + }); + } + + private int fill(List hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) { + com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get(); + com.yahoo.slime.Inspector errors = root.field("errors"); + boolean hasErrors = errors.valid() && (errors.entries() > 0); + if (hasErrors) { + addErrors(errors); + } + + Inspector summaries = new SlimeAdapter(root.field("docsums")); + if ( ! summaries.valid()) + return 0; // No summaries; Perhaps we requested a non-existing summary class + int skippedHits = 0; + for (int i = 0; i < hits.size(); i++) { + Inspector summary = summaries.entry(i).field("docsum"); + if (summary.fieldCount() != 0) { + hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName()); + hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary); + hits.get(i).setFilled(summaryClass); + } else { + skippedHits++; + } + } + return skippedHits; + } + + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java new file mode 100644 index 00000000000..29641080ba6 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/RpcResourcePool.java @@ -0,0 +1,83 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.google.common.collect.ImmutableMap; +import com.yahoo.compress.Compressor; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; +import com.yahoo.vespa.config.search.DispatchConfig; + +import java.util.Map; +import java.util.Optional; + +/** + * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains + * the RPC connection pool. + * + * @author ollivir + */ +public class RpcResourcePool { + /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ + public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); + + /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ + private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); + + private final Compressor compressor = new Compressor(); + private final Client client; + + /** Connections to the search nodes this talks to, indexed by node id ("partid") */ + private final ImmutableMap nodeConnections; + + public RpcResourcePool(Client client, Map nodeConnections) { + this.client = client; + this.nodeConnections = ImmutableMap.copyOf(nodeConnections); + } + + public RpcResourcePool(DispatchConfig dispatchConfig) { + this.client = new RpcClient(); + + // Create node rpc connections, indexed by the node distribution key + ImmutableMap.Builder nodeConnectionsBuilder = new ImmutableMap.Builder<>(); + for (DispatchConfig.Node node : dispatchConfig.node()) { + nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); + } + this.nodeConnections = nodeConnectionsBuilder.build(); + } + + public Optional getFillInvoker(Query query, VespaBackEndSearcher searcher, DocumentDatabase documentDb) { + if (query.properties().getBoolean(dispatchSummaries, true) + && ! searcher.summaryNeedsQuery(query) + && query.getRanking().getLocation() == null) + { + return Optional.of(new RpcFillInvoker(this, documentDb)); + } else { + return Optional.empty(); + } + } + + // for testing + public FillInvoker getFillInvoker(DocumentDatabase documentDb) { + return new RpcFillInvoker(this, documentDb); + } + + public Compressor compressor() { + return compressor; + } + + public Client client() { + return client; + } + + public ImmutableMap nodeConnections() { + return nodeConnections; + } + + public void release() { + for (Client.NodeConnection nodeConnection : nodeConnections.values()) { + nodeConnection.close(); + } + } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java index 1650494db3a..0d7ef53bb50 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchInvoker.java @@ -5,8 +5,6 @@ import com.yahoo.fs4.QueryPacket; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.searchchain.Execution; import java.io.IOException; @@ -55,13 +53,4 @@ public abstract class SearchInvoker extends CloseableInvoker { protected Optional distributionKey() { return node.map(Node::key); } - - protected Result errorResult(Query query, ErrorMessage errorMessage) { - Result error = new Result(query, errorMessage); - Coverage errorCoverage = new Coverage(0, 0, 0); - errorCoverage.setNodesTried(1); - error.setCoverage(errorCoverage); - return error; - } - } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java deleted file mode 100644 index 019e07221a6..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/Client.java +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.compress.CompressionType; -import com.yahoo.prelude.fastsearch.FastHit; - -import java.util.List; -import java.util.Optional; - -/** - * A dispatch client. - * - * @author bratseth - */ -interface Client { - - void getDocsums(List hits, NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, - double timeoutSeconds); - - void search(NodeConnection node, CompressionType compression, - int uncompressedLength, byte[] compressedPayload, RpcSearchInvoker responseReceiver, - double timeoutSeconds); - - /** Creates a connection to a particular node in this */ - NodeConnection createConnection(String hostname, int port); - - class GetDocsumsResponseOrError { - - // One of these will be non empty and the other not - private Optional response; - private Optional error; - - public static GetDocsumsResponseOrError fromResponse(GetDocsumsResponse response) { - return new GetDocsumsResponseOrError(Optional.of(response), Optional.empty()); - } - - public static GetDocsumsResponseOrError fromError(String error) { - return new GetDocsumsResponseOrError(Optional.empty(), Optional.of(error)); - } - - private GetDocsumsResponseOrError(Optional response, Optional error) { - this.response = response; - this.error = error; - } - - /** Returns the response, or empty if there is an error */ - public Optional response() { return response; } - - /** Returns the error or empty if there is a response */ - public Optional error() { return error; } - - } - - class GetDocsumsResponse { - - private final byte compression; - private final int uncompressedSize; - private final byte[] compressedSlimeBytes; - private final List hitsContext; - - public GetDocsumsResponse(byte compression, int uncompressedSize, byte[] compressedSlimeBytes, List hitsContext) { - this.compression = compression; - this.uncompressedSize = uncompressedSize; - this.compressedSlimeBytes = compressedSlimeBytes; - this.hitsContext = hitsContext; - } - - public byte compression() { - return compression; - } - - public int uncompressedSize() { - return uncompressedSize; - } - - public byte[] compressedSlimeBytes() { - return compressedSlimeBytes; - } - - public List hitsContext() { - return hitsContext; - } - - } - - interface NodeConnection { - - /** Closes this connection */ - void close(); - - } - - class SearchResponseOrError { - // One of these will be non empty and the other not - private Optional response; - private Optional error; - - public static SearchResponseOrError fromResponse(SearchResponse response) { - return new SearchResponseOrError(Optional.of(response), Optional.empty()); - } - - public static SearchResponseOrError fromError(String error) { - return new SearchResponseOrError(Optional.empty(), Optional.of(error)); - } - - private SearchResponseOrError(Optional response, Optional error) { - this.response = response; - this.error = error; - } - - /** Returns the response, or empty if there is an error */ - public Optional response() { return response; } - - /** Returns the error or empty if there is a response */ - public Optional error() { return error; } - - } - - class SearchResponse { - private final byte compression; - private final int uncompressedSize; - private final byte[] compressedPayload; - - public SearchResponse(byte compression, int uncompressedSize, byte[] compressedPayload) { - this.compression = compression; - this.uncompressedSize = uncompressedSize; - this.compressedPayload = compressedPayload; - } - - public byte compression() { - return compression; - } - - public int uncompressedSize() { - return uncompressedSize; - } - - public byte[] compressedPayload() { - return compressedPayload; - } - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java deleted file mode 100644 index 757315cb108..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/MapConverter.java +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import ai.vespa.searchlib.searchprotocol.protobuf.Search.StringProperty; -import ai.vespa.searchlib.searchprotocol.protobuf.Search.TensorProperty; -import com.google.protobuf.ByteString; -import com.yahoo.tensor.Tensor; -import com.yahoo.tensor.serialization.TypedBinaryFormat; - -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -/** - * @author ollivir - */ -public class MapConverter { - @FunctionalInterface - public interface PropertyInserter { - void add(T prop); - } - - public static void convertMapTensors(Map map, PropertyInserter inserter) { - for (var entry : map.entrySet()) { - var value = entry.getValue(); - if (value instanceof Tensor) { - byte[] tensor = TypedBinaryFormat.encode((Tensor) value); - inserter.add(TensorProperty.newBuilder().setName(entry.getKey()).setValue(ByteString.copyFrom(tensor))); - } - } - } - - public static void convertMapStrings(Map map, PropertyInserter inserter) { - for (var entry : map.entrySet()) { - var value = entry.getValue(); - if (!(value instanceof Tensor)) { - inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addValues(value.toString())); - } - } - } - - public static void convertStringMultiMap(Map> map, PropertyInserter inserter) { - for (var entry : map.entrySet()) { - var values = entry.getValue(); - if (values != null) { - inserter.add(StringProperty.newBuilder().setName(entry.getKey()).addAllValues(values)); - } - } - } - - public static void convertMultiMap(Map> map, PropertyInserter stringInserter, - PropertyInserter tensorInserter) { - for (var entry : map.entrySet()) { - if (entry.getValue() != null) { - var key = entry.getKey(); - var stringValues = new LinkedList(); - for (var value : entry.getValue()) { - if (value != null) { - if (value instanceof Tensor) { - byte[] tensor = TypedBinaryFormat.encode((Tensor) value); - tensorInserter.add(TensorProperty.newBuilder().setName(key).setValue(ByteString.copyFrom(tensor))); - } else { - stringValues.add(value.toString()); - } - } - } - if (!stringValues.isEmpty()) { - stringInserter.add(StringProperty.newBuilder().setName(key).addAllValues(stringValues)); - } - } - } - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java deleted file mode 100644 index 12be9a5c058..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/ProtobufSerialization.java +++ /dev/null @@ -1,223 +0,0 @@ -package com.yahoo.search.dispatch.rpc; - -import ai.vespa.searchlib.searchprotocol.protobuf.Search; -import ai.vespa.searchlib.searchprotocol.protobuf.Search.Request.Builder; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import com.yahoo.document.GlobalId; -import com.yahoo.fs4.GetDocSumsPacket; -import com.yahoo.io.GrowableByteBuffer; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.GroupingListHit; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.grouping.vespa.GroupingExecutor; -import com.yahoo.search.query.Model; -import com.yahoo.search.query.Presentation; -import com.yahoo.search.query.Ranking; -import com.yahoo.search.query.Sorting; -import com.yahoo.search.query.Sorting.Order; -import com.yahoo.search.query.ranking.RankFeatures; -import com.yahoo.search.query.ranking.RankProperties; -import com.yahoo.search.result.Coverage; -import com.yahoo.search.result.Relevance; -import com.yahoo.searchlib.aggregation.Grouping; -import com.yahoo.vespa.objects.BufferSerializer; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class ProtobufSerialization { - private static final int INITIAL_SERIALIZATION_BUFFER_SIZE = 10 * 1024; - - public static byte[] serializeQuery(Query query, String serverId, boolean includeQueryData) { - return convertFromQuery(query, serverId, includeQueryData).toByteArray(); - } - - public static byte[] serializeResult(Result searchResult) { - return convertFromResult(searchResult).toByteArray(); - } - - public static Result deserializeToResult(byte[] payload, Query query, VespaBackEndSearcher searcher) - throws InvalidProtocolBufferException { - var protobuf = Search.Reply.parseFrom(payload); - var result = convertToResult(query, protobuf, searcher.getDocumentDatabase(query)); - return result; - } - - private static Search.Request convertFromQuery(Query query, String serverId, boolean includeQueryData) { - var builder = Search.Request.newBuilder().setHits(query.getHits()).setOffset(query.getOffset()) - .setTimeout((int) query.getTimeLeft()); - - mergeToRequestFromRanking(query.getRanking(), builder, includeQueryData); - mergeToRequestFromModel(query.getModel(), builder); - - if (query.getGroupingSessionCache() || query.getRanking().getQueryCache()) { - // TODO verify that the session key is included whenever rank properties would have been - builder.setSessionKey(query.getSessionId(serverId).toString()); - } - if (query.properties().getBoolean(Model.ESTIMATE)) { - builder.setHits(0); - } - if (GroupingExecutor.hasGroupingList(query)) { - List groupingList = GroupingExecutor.getGroupingList(query); - BufferSerializer gbuf = new BufferSerializer(new GrowableByteBuffer()); - gbuf.putInt(null, groupingList.size()); - for (Grouping g : groupingList) { - g.serialize(gbuf); - } - gbuf.getBuf().flip(); - builder.setGroupingBlob(ByteString.copyFrom(gbuf.getBuf().getByteBuffer())); - } - - mergeToRequestFromPresentation(query.getPresentation(), builder, includeQueryData); - if (query.getGroupingSessionCache()) { - builder.setCacheGrouping(true); - } - - return builder.build(); - } - - private static void mergeToRequestFromModel(Model model, Search.Request.Builder builder) { - if (model.getDocumentDb() != null) { - builder.setDocumentType(model.getDocumentDb()); - } - int bufferSize = INITIAL_SERIALIZATION_BUFFER_SIZE; - boolean success = false; - while (!success) { - try { - ByteBuffer treeBuffer = ByteBuffer.allocate(bufferSize); - model.getQueryTree().encode(treeBuffer); - treeBuffer.flip(); - builder.setQueryTreeBlob(ByteString.copyFrom(treeBuffer)); - success = true; - } catch (java.nio.BufferOverflowException e) { - bufferSize *= 2; - } - } - } - - private static void mergeToRequestFromPresentation(Presentation presentation, Search.Request.Builder builder, - boolean includeQueryData) { - if (includeQueryData && presentation.getHighlight() != null) { - MapConverter.convertStringMultiMap(presentation.getHighlight().getHighlightTerms(), builder::addHighlightTerms); - } - } - - private static void mergeToRequestFromSorting(Sorting sorting, Search.Request.Builder builder, boolean includeQueryData) { - for (var field : sorting.fieldOrders()) { - var sortField = Search.SortField.newBuilder().setField(field.getSorter().getName()) - .setAscending(field.getSortOrder() == Order.ASCENDING).build(); - builder.addSorting(sortField); - } - } - - private static void mergeToRequestFromRanking(Ranking ranking, Search.Request.Builder builder, boolean includeQueryData) { - builder.setRankProfile(ranking.getProfile()); - if (ranking.getQueryCache()) { - builder.setCacheQuery(true); - } - if (ranking.getSorting() != null) { - mergeToRequestFromSorting(ranking.getSorting(), builder, includeQueryData); - } - if (ranking.getLocation() != null) { - builder.setGeoLocation(ranking.getLocation().toString()); - } - mergeToRequestFromRankFeatures(ranking.getFeatures(), builder, includeQueryData); - mergeToRequestFromRankProperties(ranking.getProperties(), builder, includeQueryData); - } - - private static void mergeToRequestFromRankFeatures(RankFeatures features, Search.Request.Builder builder, boolean includeQueryData) { - if (includeQueryData) { - MapConverter.convertMapStrings(features.asMap(), builder::addFeatureOverrides); - MapConverter.convertMapTensors(features.asMap(), builder::addTensorFeatureOverrides); - } - } - - private static void mergeToRequestFromRankProperties(RankProperties properties, Builder builder, boolean includeQueryData) { - if (includeQueryData) { - MapConverter.convertMultiMap(properties.asMap(), propB -> { - if (!GetDocSumsPacket.sessionIdKey.equals(propB.getName())) { - builder.addRankProperties(propB); - } - }, builder::addTensorRankProperties); - } - } - - private static Result convertToResult(Query query, Search.Reply protobuf, DocumentDatabase documentDatabase) { - var result = new Result(query); - - result.setTotalHitCount(protobuf.getTotalHitCount()); - result.setCoverage(convertToCoverage(protobuf)); - - if (protobuf.getGroupingBlob() != null && !protobuf.getGroupingBlob().isEmpty()) { - ArrayList list = new ArrayList<>(); - BufferSerializer buf = new BufferSerializer(new GrowableByteBuffer(protobuf.getGroupingBlob().asReadOnlyByteBuffer())); - int cnt = buf.getInt(null); - for (int i = 0; i < cnt; i++) { - Grouping g = new Grouping(); - g.deserialize(buf); - list.add(g); - } - GroupingListHit hit = new GroupingListHit(list, documentDatabase.getDocsumDefinitionSet()); - hit.setQuery(query); - result.hits().add(hit); - } - - for (var replyHit : protobuf.getHitsList()) { - FastHit hit = new FastHit(); - hit.setQuery(query); - - hit.setRelevance(new Relevance(replyHit.getRelevance())); - hit.setGlobalId(new GlobalId(replyHit.getGlobalId().toByteArray())); - - hit.setFillable(); - hit.setCached(false); - - result.hits().add(hit); - } - - return result; - } - - private static Coverage convertToCoverage(Search.Reply protobuf) { - var coverage = new Coverage(protobuf.getCoverageDocs(), protobuf.getActiveDocs(), 1); - coverage.setNodesTried(1).setSoonActive(protobuf.getSoonActiveDocs()); - - int degradedReason = 0; - if (protobuf.getDegradedByMatchPhase()) - degradedReason |= Coverage.DEGRADED_BY_MATCH_PHASE; - if (protobuf.getDegradedBySoftTimeout()) - degradedReason |= Coverage.DEGRADED_BY_TIMEOUT; - coverage.setDegradedReason(degradedReason); - - return coverage; - } - - private static Search.Reply convertFromResult(Result result) { - var builder = Search.Reply.newBuilder(); - - var coverage = result.getCoverage(false); - if (coverage != null) { - builder.setCoverageDocs(coverage.getDocs()).setActiveDocs(coverage.getActive()).setSoonActiveDocs(coverage.getSoonActive()) - .setDegradedBySoftTimeout(coverage.isDegradedByTimeout()).setDegradedByMatchPhase(coverage.isDegradedByMatchPhase()); - } - - result.hits().iterator().forEachRemaining(hit -> { - var hitBuilder = Search.Hit.newBuilder(); - if (hit.getRelevance() != null) { - hitBuilder.setRelevance(hit.getRelevance().getScore()); - } - if (hit instanceof FastHit) { - FastHit fhit = (FastHit) hit; - hitBuilder.setGlobalId(ByteString.copyFrom(fhit.getGlobalId().getRawId())); - } - builder.addHits(hitBuilder); - }); - return builder.build(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java deleted file mode 100644 index 32a7917d43c..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.compress.CompressionType; -import com.yahoo.jrt.DataValue; -import com.yahoo.jrt.Int32Value; -import com.yahoo.jrt.Int8Value; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.RequestWaiter; -import com.yahoo.jrt.Spec; -import com.yahoo.jrt.Supervisor; -import com.yahoo.jrt.Target; -import com.yahoo.jrt.Transport; -import com.yahoo.jrt.Values; -import com.yahoo.prelude.fastsearch.FastHit; - -import java.util.List; - -/** - * A client which uses rpc request to search nodes to implement the Client API. - * - * @author bratseth - */ -class RpcClient implements Client { - - private final Supervisor supervisor = new Supervisor(new Transport()); - - @Override - public NodeConnection createConnection(String hostname, int port) { - return new RpcNodeConnection(hostname, port, supervisor); - } - - @Override - public void getDocsums(List hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - Request request = new Request("proton.getDocsums"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedSlime)); - - request.setContext(hits); - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcDocsumResponseWaiter(rpcNode, responseReceiver)); - } - - @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { - Request request = new Request("vespa.searchprotocol.search"); - request.parameters().add(new Int8Value(compression.getCode())); - request.parameters().add(new Int32Value(uncompressedLength)); - request.parameters().add(new DataValue(compressedPayload)); - - RpcNodeConnection rpcNode = ((RpcNodeConnection) node); - rpcNode.invokeAsync(request, timeoutSeconds, new RpcSearchResponseWaiter(rpcNode, responseReceiver)); - } - - private static class RpcNodeConnection implements NodeConnection { - - // Information about the connected node - private final Supervisor supervisor; - private final String hostname; - private final int port; - private final String description; - - // The current shared connection. This will be recycled when it becomes invalid. - // All access to this must be synchronized - private Target target = null; - - public RpcNodeConnection(String hostname, int port, Supervisor supervisor) { - this.supervisor = supervisor; - this.hostname = hostname; - this.port = port; - description = "rpc node connection to " + hostname + ":" + port; - } - - public void invokeAsync(Request req, double timeout, RequestWaiter waiter) { - // TODO: Consider replacing this by a watcher on the target - synchronized(this) { // ensure we have exactly 1 valid connection across threads - if (target == null || ! target.isValid()) - target = supervisor.connect(new Spec(hostname, port)); - } - target.invokeAsync(req, timeout, waiter); - } - - @Override - public void close() { - target.close(); - } - - @Override - public String toString() { - return description; - } - - } - - private static class RpcDocsumResponseWaiter implements RequestWaiter { - - /** The node to which we made the request we are waiting for - for error messages only */ - private final RpcNodeConnection node; - - /** The handler to which the response is forwarded */ - private final RpcFillInvoker.GetDocsumsResponseReceiver handler; - - public RpcDocsumResponseWaiter(RpcNodeConnection node, RpcFillInvoker.GetDocsumsResponseReceiver handler) { - this.node = node; - this.handler = handler; - } - - @Override - public void handleRequestDone(Request requestWithResponse) { - if (requestWithResponse.isError()) { - handler.receive(GetDocsumsResponseOrError.fromError("Error response from " + node + ": " + - requestWithResponse.errorMessage())); - return; - } - - Values returnValues = requestWithResponse.returnValues(); - if (returnValues.size() < 3) { - handler.receive(GetDocsumsResponseOrError.fromError("Invalid getDocsums response from " + node + - ": Expected 3 return arguments, got " + - returnValues.size())); - return; - } - - byte compression = returnValues.get(0).asInt8(); - int uncompressedSize = returnValues.get(1).asInt32(); - byte[] compressedSlimeBytes = returnValues.get(2).asData(); - List hits = (List) requestWithResponse.getContext(); - handler.receive(GetDocsumsResponseOrError.fromResponse(new GetDocsumsResponse(compression, - uncompressedSize, - compressedSlimeBytes, - hits))); - } - - } - - private static class RpcSearchResponseWaiter implements RequestWaiter { - - /** The node to which we made the request we are waiting for - for error messages only */ - private final RpcNodeConnection node; - - /** The handler to which the response is forwarded */ - private final RpcSearchInvoker handler; - - public RpcSearchResponseWaiter(RpcNodeConnection node, RpcSearchInvoker handler) { - this.node = node; - this.handler = handler; - } - - @Override - public void handleRequestDone(Request requestWithResponse) { - if (requestWithResponse.isError()) { - handler.receive(SearchResponseOrError.fromError("Error response from " + node + ": " + requestWithResponse.errorMessage())); - return; - } - - Values returnValues = requestWithResponse.returnValues(); - if (returnValues.size() < 3) { - handler.receive(SearchResponseOrError.fromError( - "Invalid getDocsums response from " + node + ": Expected 3 return arguments, got " + returnValues.size())); - return; - } - - byte compression = returnValues.get(0).asInt8(); - int uncompressedSize = returnValues.get(1).asInt32(); - byte[] compressedPayload = returnValues.get(2).asData(); - handler.receive(SearchResponseOrError.fromResponse(new SearchResponse(compression, uncompressedSize, compressedPayload))); - } - - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java deleted file mode 100644 index b7286997514..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcFillInvoker.java +++ /dev/null @@ -1,254 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.collections.ListMap; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.container.protect.Error; -import com.yahoo.data.access.Inspector; -import com.yahoo.data.access.slime.SlimeAdapter; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.TimeoutException; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.search.query.SessionId; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.result.Hit; -import com.yahoo.slime.ArrayTraverser; -import com.yahoo.slime.BinaryFormat; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Slime; - -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.logging.Level; -import java.util.logging.Logger; - -/** - * {@link FillInvoker} implementation using RPC - * - * @author bratseth - * @author ollivir - */ -public class RpcFillInvoker extends FillInvoker { - private static final Logger log = Logger.getLogger(RpcFillInvoker.class.getName()); - - private final DocumentDatabase documentDb; - private final RpcResourcePool resourcePool; - - private GetDocsumsResponseReceiver responseReceiver; - - - RpcFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb) { - this.documentDb = documentDb; - this.resourcePool = resourcePool; - } - - @Override - protected void sendFillRequest(Result result, String summaryClass) { - ListMap hitsByNode = hitsByNode(result); - - CompressionType compression = CompressionType - .valueOf(result.getQuery().properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - if (result.getQuery().getTraceLevel() >= 3) - result.getQuery().trace("Sending " + hitsByNode.size() + " summary fetch RPC requests", 3); - - responseReceiver = new GetDocsumsResponseReceiver(hitsByNode.size(), resourcePool.compressor(), result); - for (Map.Entry> nodeHits : hitsByNode.entrySet()) { - sendGetDocsumsRequest(nodeHits.getKey(), nodeHits.getValue(), summaryClass, compression, result, responseReceiver); - } - } - - @Override - protected void getFillResults(Result result, String summaryClass) { - try { - responseReceiver.processResponses(result.getQuery(), summaryClass, documentDb); - result.hits().setSorted(false); - result.analyzeHits(); - } catch (TimeoutException e) { - result.hits().addError(ErrorMessage.createTimeout("Summary data is incomplete: " + e.getMessage())); - } - } - - @Override - protected void release() { - // nothing to release - } - - /** Return a map of hits by their search node (partition) id */ - private static ListMap hitsByNode(Result result) { - ListMap hitsByNode = new ListMap<>(); - for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext();) { - Hit h = i.next(); - if (!(h instanceof FastHit)) - continue; - FastHit hit = (FastHit) h; - - hitsByNode.put(hit.getDistributionKey(), hit); - } - return hitsByNode; - } - - /** Send a getDocsums request to a node. Responses will be added to the given receiver. */ - private void sendGetDocsumsRequest(int nodeId, List hits, String summaryClass, CompressionType compression, - Result result, GetDocsumsResponseReceiver responseReceiver) { - Client.NodeConnection node = resourcePool.nodeConnections().get(nodeId); - if (node == null) { - String error = "Could not fill hits from unknown node " + nodeId; - responseReceiver.receive(Client.GetDocsumsResponseOrError.fromError(error)); - result.hits().addError(ErrorMessage.createEmptyDocsums(error)); - log.warning("Got hits with partid " + nodeId + ", which is not included in the current dispatch config"); - return; - } - - Query query = result.getQuery(); - String rankProfile = query.getRanking().getProfile(); - byte[] serializedSlime = BinaryFormat - .encode(toSlime(rankProfile, summaryClass, query.getModel().getDocumentDb(), query.getSessionId(), hits)); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, serializedSlime); - resourcePool.client().getDocsums(hits, node, compressionResult.type(), serializedSlime.length, compressionResult.data(), - responseReceiver, timeoutSeconds); - } - - static private Slime toSlime(String rankProfile, String summaryClass, String docType, SessionId sessionId, List hits) { - Slime slime = new Slime(); - Cursor root = slime.setObject(); - if (summaryClass != null) { - root.setString("class", summaryClass); - } - if (sessionId != null) { - root.setData("sessionid", sessionId.asUtf8String().getBytes()); - } - if (docType != null) { - root.setString("doctype", docType); - } - if (rankProfile != null) { - root.setString("ranking", rankProfile); - } - Cursor gids = root.setArray("gids"); - for (FastHit hit : hits) { - gids.addData(hit.getGlobalId().getRawId()); - } - return slime; - } - - /** Receiver of the responses to a set of getDocsums requests */ - public static class GetDocsumsResponseReceiver { - - private final BlockingQueue responses; - private final Compressor compressor; - private final Result result; - - /** Whether we have already logged/notified about an error - to avoid spamming */ - private boolean hasReportedError = false; - - /** The number of responses we should receive (and process) before this is complete */ - private int outstandingResponses; - - GetDocsumsResponseReceiver(int requestCount, Compressor compressor, Result result) { - this.compressor = compressor; - responses = new LinkedBlockingQueue<>(requestCount); - outstandingResponses = requestCount; - this.result = result; - } - - /** Called by a thread belonging to the client when a valid response becomes available */ - public void receive(Client.GetDocsumsResponseOrError response) { - responses.add(response); - } - - private void throwTimeout() throws TimeoutException { - throw new TimeoutException("Timed out waiting for summary data. " + outstandingResponses + " responses outstanding."); - } - - /** - * Call this from the dispatcher thread to initiate and complete processing of responses. - * This will block until all responses are available and processed, or to timeout. - */ - void processResponses(Query query, String summaryClass, DocumentDatabase documentDb) throws TimeoutException { - try { - int skippedHits = 0; - while (outstandingResponses > 0) { - long timeLeftMs = query.getTimeLeft(); - if (timeLeftMs <= 0) { - throwTimeout(); - } - Client.GetDocsumsResponseOrError response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); - if (response == null) - throwTimeout(); - skippedHits += processResponse(response, summaryClass, documentDb); - outstandingResponses--; - } - if (skippedHits != 0) { - result.hits().addError(com.yahoo.search.result.ErrorMessage.createEmptyDocsums("Missing hit summary data for summary " + - summaryClass + " for " + skippedHits + " hits")); - } - } - catch (InterruptedException e) { - // TODO: Add error - } - } - - private int processResponse(Client.GetDocsumsResponseOrError responseOrError, - String summaryClass, - DocumentDatabase documentDb) { - if (responseOrError.error().isPresent()) { - if (hasReportedError) return 0; - String error = responseOrError.error().get(); - result.hits().addError(ErrorMessage.createBackendCommunicationError(error)); - log.log(Level.WARNING, "Error fetching summary data: "+ error); - } - else { - Client.GetDocsumsResponse response = responseOrError.response().get(); - CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] slimeBytes = compressor.decompress(response.compressedSlimeBytes(), compression, response.uncompressedSize()); - return fill(response.hitsContext(), summaryClass, documentDb, slimeBytes); - } - return 0; - } - - private void addErrors(com.yahoo.slime.Inspector errors) { - errors.traverse((ArrayTraverser) (int index, com.yahoo.slime.Inspector value) -> { - int errorCode = ("timeout".equalsIgnoreCase(value.field("type").asString())) - ? Error.TIMEOUT.code - : Error.UNSPECIFIED.code; - result.hits().addError(new ErrorMessage(errorCode, - value.field("message").asString(), value.field("details").asString())); - }); - } - - private int fill(List hits, String summaryClass, DocumentDatabase documentDb, byte[] slimeBytes) { - com.yahoo.slime.Inspector root = BinaryFormat.decode(slimeBytes).get(); - com.yahoo.slime.Inspector errors = root.field("errors"); - boolean hasErrors = errors.valid() && (errors.entries() > 0); - if (hasErrors) { - addErrors(errors); - } - - Inspector summaries = new SlimeAdapter(root.field("docsums")); - if ( ! summaries.valid()) - return 0; // No summaries; Perhaps we requested a non-existing summary class - int skippedHits = 0; - for (int i = 0; i < hits.size(); i++) { - Inspector summary = summaries.entry(i).field("docsum"); - if (summary.fieldCount() != 0) { - hits.get(i).setField(Hit.SDDOCNAME_FIELD, documentDb.getName()); - hits.get(i).addSummary(documentDb.getDocsumDefinitionSet().getDocsum(summaryClass), summary); - hits.get(i).setFilled(summaryClass); - } else { - skippedHits++; - } - } - return skippedHits; - } - - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java deleted file mode 100644 index c8019278710..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.search.dispatch.InterleavedSearchInvoker; -import com.yahoo.search.dispatch.InvokerFactory; -import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Optional; -import java.util.OptionalInt; -import java.util.Set; - -/** - * @author ollivir - */ -public class RpcInvokerFactory extends InvokerFactory { - /** Unless turned off this will fill summaries by dispatching directly to search nodes over RPC when possible */ - private final static CompoundName dispatchSummaries = new CompoundName("dispatch.summaries"); - - private final RpcResourcePool rpcResourcePool; - private final SearchCluster searchCluster; - - public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) { - this.rpcResourcePool = rpcResourcePool; - this.searchCluster = searchCluster; - } - - @Override - public Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, List nodes, - boolean acceptIncompleteCoverage) { - List invokers = new ArrayList<>(nodes.size()); - Set failed = null; - for (Node node : nodes) { - if (node.isWorking()) { - invokers.add(new RpcSearchInvoker(searcher, node, rpcResourcePool)); - } else { - if (failed == null) { - failed = new HashSet<>(); - } - failed.add(node.key()); - } - } - - if (failed != null) { - List success = new ArrayList<>(nodes.size() - failed.size()); - for (Node node : nodes) { - if (!failed.contains(node.key())) { - success.add(node); - } - } - if (!searchCluster.isPartialGroupCoverageSufficient(groupId, success)) { - if (acceptIncompleteCoverage) { - invokers.add(createCoverageErrorInvoker(nodes, failed)); - } else { - return Optional.empty(); - } - } - } - - if (invokers.size() == 1) { - return Optional.of(invokers.get(0)); - } else { - return Optional.of(new InterleavedSearchInvoker(invokers, searcher, searchCluster)); - } - } - - @Override - public Optional createFillInvoker(VespaBackEndSearcher searcher, Result result) { - Query query = result.getQuery(); - if (query.properties().getBoolean(dispatchSummaries, true) - && ! searcher.summaryNeedsQuery(query) - && query.getRanking().getLocation() == null) - { - return Optional.of(new RpcFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query))); - } else { - return Optional.empty(); - } - } - - // for testing - public FillInvoker createFillInvoker(DocumentDatabase documentDb) { - return new RpcFillInvoker(rpcResourcePool, documentDb); - } - - public void release() { - rpcResourcePool.release(); - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java deleted file mode 100644 index 830ba45ef0f..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.google.common.collect.ImmutableMap; -import com.yahoo.compress.Compressor; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.dispatch.FillInvoker; -import com.yahoo.vespa.config.search.DispatchConfig; - -import java.util.Map; - -/** - * RpcResourcePool constructs {@link FillInvoker} objects that communicate with content nodes over RPC. It also contains - * the RPC connection pool. - * - * @author ollivir - */ -public class RpcResourcePool { - /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ - public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - - private final Compressor compressor = new Compressor(); - private final Client client; - - /** Connections to the search nodes this talks to, indexed by node id ("partid") */ - private final ImmutableMap nodeConnections; - - public RpcResourcePool(Client client, Map nodeConnections) { - this.client = client; - this.nodeConnections = ImmutableMap.copyOf(nodeConnections); - } - - public RpcResourcePool(DispatchConfig dispatchConfig) { - this.client = new RpcClient(); - - // Create node rpc connections, indexed by the node distribution key - ImmutableMap.Builder nodeConnectionsBuilder = new ImmutableMap.Builder<>(); - for (DispatchConfig.Node node : dispatchConfig.node()) { - nodeConnectionsBuilder.put(node.key(), client.createConnection(node.host(), node.port())); - } - this.nodeConnections = nodeConnectionsBuilder.build(); - } - - public Compressor compressor() { - return compressor; - } - - public Client client() { - return client; - } - - public ImmutableMap nodeConnections() { - return nodeConnections; - } - - public void release() { - for (Client.NodeConnection nodeConnection : nodeConnections.values()) { - nodeConnection.close(); - } - } -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java deleted file mode 100644 index 88d77c760e3..00000000000 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.SearchInvoker; -import com.yahoo.search.dispatch.rpc.Client.SearchResponse; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.result.ErrorMessage; -import com.yahoo.search.searchchain.Execution; - -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -/** - * {@link SearchInvoker} implementation using RPC - * - * @author ollivir - */ -public class RpcSearchInvoker extends SearchInvoker { - private final VespaBackEndSearcher searcher; - private final Node node; - private final RpcResourcePool resourcePool; - private final BlockingQueue responses; - - private Query query; - - RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool) { - super(Optional.of(node)); - this.searcher = searcher; - this.node = node; - this.resourcePool = resourcePool; - this.responses = new LinkedBlockingQueue<>(1); - } - - @Override - protected void sendSearchRequest(Query query, QueryPacket queryPacket) throws IOException { - this.query = query; - - CompressionType compression = CompressionType - .valueOf(query.properties().getString(RpcResourcePool.dispatchCompression, "LZ4").toUpperCase()); - - Client.NodeConnection nodeConnection = resourcePool.nodeConnections().get(node.key()); - if (nodeConnection == null) { - responses.add(Client.SearchResponseOrError.fromError("Could send search to unknown node " + node.key())); - responseAvailable(); - return; - } - - var payload = ProtobufSerialization.serializeQuery(query, searcher.getServerId(), true); - double timeoutSeconds = ((double) query.getTimeLeft() - 3.0) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(compression, payload); - resourcePool.client().search(nodeConnection, compressionResult.type(), payload.length, compressionResult.data(), this, - timeoutSeconds); - } - - @Override - protected Result getSearchResult(Execution execution) throws IOException { - long timeLeftMs = query.getTimeLeft(); - if (timeLeftMs <= 0) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } - Client.SearchResponseOrError response = null; - try { - response = responses.poll(timeLeftMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - // handled as timeout - } - if (response == null) { - return errorResult(query, ErrorMessage.createTimeout("Timeout while waiting for " + getName())); - } - if (response.error().isPresent()) { - return errorResult(query, ErrorMessage.createBackendCommunicationError(response.error().get())); - } - if (response.response().isEmpty()) { - return errorResult(query, ErrorMessage.createInternalServerError("Neither error nor result available")); - } - - SearchResponse searchResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(searchResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(searchResponse.compressedPayload(), compression, - searchResponse.uncompressedSize()); - var result = ProtobufSerialization.deserializeToResult(payload, query, searcher); - result.hits().unorderedIterator().forEachRemaining(hit -> { - if(hit instanceof FastHit) { - FastHit fhit = (FastHit) hit; - fhit.setPartId(node.pathIndex()); - fhit.setDistributionKey(node.key()); - } - hit.setSource(getName()); - }); - - return result; - } - - @Override - protected void release() { - // nothing to release - } - - public void receive(Client.SearchResponseOrError response) { - responses.add(response); - responseAvailable(); - } - - private String getName() { - return searcher.getName(); - } - -} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java index bafc72b9b43..cb86f19e761 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Group.java @@ -25,12 +25,6 @@ public class Group { public Group(int id, List nodes) { this.id = id; this.nodes = ImmutableList.copyOf(nodes); - - int idx = 0; - for(var node: nodes) { - node.setPathIndex(idx); - idx++; - } } /** Returns the unique identity of this group */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index 7e0e3117628..98deb9e3199 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -14,7 +14,6 @@ import java.util.concurrent.atomic.AtomicLong; public class Node { private final int key; - private int pathIndex; private final String hostname; private final int fs4port; final int group; @@ -32,12 +31,6 @@ public class Node { /** Returns the unique and stable distribution key of this node */ public int key() { return key; } - public int pathIndex() { return pathIndex; } - - void setPathIndex(int index) { - pathIndex = index; - } - public String hostname() { return hostname; } public int fs4port() { return fs4port; } diff --git a/container-search/src/main/java/com/yahoo/search/query/Model.java b/container-search/src/main/java/com/yahoo/search/query/Model.java index a874ed45e30..f01951047d0 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Model.java +++ b/container-search/src/main/java/com/yahoo/search/query/Model.java @@ -34,6 +34,7 @@ import static com.yahoo.text.Lowercase.toLowerCase; * @author bratseth */ public class Model implements Cloneable { + /** The type representing the property arguments consumed by this */ private static final QueryProfileType argumentType; private static final CompoundName argumentTypeName; @@ -100,9 +101,9 @@ public class Model implements Cloneable { /** * Gets the language to use for parsing. If this is explicitly set in the model, that language is returned. - * Otherwise, if a query tree is already produced and any node in it specifies a language the first such - * node encountered in a depth first - * left to right search is returned. Otherwise the language is guessed from the query string. + * Otherwise, if a query tree is already produced and any node in it specifies a language the first such + * node encountered in a depth first + * left to right search is returned. Otherwise the language is guessed from the query string. * If this does not yield an actual language, English is returned as the default. * * @return the language determined, never null @@ -120,7 +121,7 @@ public class Model implements Cloneable { if (queryTree != null) language = languageBelow(queryTree); if (language != Language.UNKNOWN) return language; - + Linguistics linguistics = execution.context().getLinguistics(); if (linguistics != null) language = linguistics.getDetector().detect(languageDetectionText, null).getLanguage(); // TODO: Set language if detected @@ -128,7 +129,7 @@ public class Model implements Cloneable { return Language.ENGLISH; } - + private Language languageBelow(Item item) { if (item.getLanguage() != Language.UNKNOWN) return item.getLanguage(); if (item instanceof CompositeItem) { diff --git a/container-search/src/main/java/com/yahoo/search/query/Presentation.java b/container-search/src/main/java/com/yahoo/search/query/Presentation.java index 6edef386d49..6b10fd0847c 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Presentation.java +++ b/container-search/src/main/java/com/yahoo/search/query/Presentation.java @@ -4,8 +4,8 @@ package com.yahoo.search.query; import com.google.common.base.Splitter; import com.yahoo.collections.LazySet; import com.yahoo.component.ComponentSpecification; -import com.yahoo.prelude.query.Highlight; -import com.yahoo.prelude.query.IndexedItem; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.prelude.query.*; import com.yahoo.search.Query; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.QueryProfileType; diff --git a/container-search/src/main/java/com/yahoo/search/query/Ranking.java b/container-search/src/main/java/com/yahoo/search/query/Ranking.java index 7444c94f491..903eedfe870 100644 --- a/container-search/src/main/java/com/yahoo/search/query/Ranking.java +++ b/container-search/src/main/java/com/yahoo/search/query/Ranking.java @@ -1,9 +1,9 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.query; +import com.yahoo.processing.request.CompoundName; import com.yahoo.prelude.Freshness; import com.yahoo.prelude.Location; -import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; import com.yahoo.search.query.profile.types.FieldDescription; import com.yahoo.search.query.profile.types.QueryProfileFieldType; diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index 0cc58801298..708caafa3f5 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -2,10 +2,8 @@ package com.yahoo.search.dispatch; import com.yahoo.prelude.fastsearch.FS4InvokerFactory; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.processing.request.CompoundName; import com.yahoo.search.Query; -import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.vespa.config.search.DispatchConfig; @@ -38,7 +36,7 @@ public class DispatcherTest { builder.useMultilevelDispatch(true); DispatchConfig dc = new DispatchConfig(builder); - Dispatcher disp = new Dispatcher(cl, dc, new MockFS4InvokerFactory(cl), new MockRpcInvokerFactory()); + Dispatcher disp = new Dispatcher(cl, dc); assertThat(disp.getSearchInvoker(query(), null).isPresent(), is(false)); } @@ -47,13 +45,13 @@ public class DispatcherTest { SearchCluster cl = new MockSearchCluster("1", 2, 2); Query q = query(); q.getModel().setSearchPath("1/0"); // second node in first group + Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (nodes, a) -> { assertThat(nodes.size(), is(1)); assertThat(nodes.get(0).key(), is(2)); return true; }); - Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockRpcInvokerFactory()); - Optional invoker = disp.getSearchInvoker(q, null); + Optional invoker = disp.getSearchInvoker(q, invokerFactory); assertThat(invoker.isPresent(), is(true)); invokerFactory.verifyAllEventsProcessed(); } @@ -66,9 +64,9 @@ public class DispatcherTest { return Optional.of(new Node(1, "test", 123, 1)); } }; + Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (n, a) -> true); - Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockRpcInvokerFactory()); - Optional invoker = disp.getSearchInvoker(query(), null); + Optional invoker = disp.getSearchInvoker(query(), invokerFactory); assertThat(invoker.isPresent(), is(true)); invokerFactory.verifyAllEventsProcessed(); } @@ -77,6 +75,7 @@ public class DispatcherTest { public void requireThatInvokerConstructionIsRetriedAndLastAcceptsAnyCoverage() { SearchCluster cl = new MockSearchCluster("1", 2, 1); + Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (n, acceptIncompleteCoverage) -> { assertThat(acceptIncompleteCoverage, is(false)); return false; @@ -84,8 +83,7 @@ public class DispatcherTest { assertThat(acceptIncompleteCoverage, is(true)); return true; }); - Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, new MockRpcInvokerFactory()); - Optional invoker = disp.getSearchInvoker(query(), null); + Optional invoker = disp.getSearchInvoker(query(), invokerFactory); assertThat(invoker.isPresent(), is(true)); invokerFactory.verifyAllEventsProcessed(); } @@ -94,9 +92,9 @@ public class DispatcherTest { public void requireThatInvokerConstructionDoesNotRepeatGroups() { SearchCluster cl = new MockSearchCluster("1", 2, 1); + Dispatcher disp = new Dispatcher(cl, createDispatchConfig()); MockFS4InvokerFactory invokerFactory = new MockFS4InvokerFactory(cl, (n, a) -> false, (n, a) -> false); - Dispatcher disp = new Dispatcher(cl, createDispatchConfig(), invokerFactory, null); - Optional invoker = disp.getSearchInvoker(query(), null); + Optional invoker = disp.getSearchInvoker(query(), invokerFactory); assertThat(invoker.isPresent(), is(false)); invokerFactory.verifyAllEventsProcessed(); } @@ -110,13 +108,12 @@ public class DispatcherTest { private int step = 0; public MockFS4InvokerFactory(SearchCluster cl, FactoryStep... events) { - super(null, cl); + super(null, cl, null); this.events = events; } @Override - public Optional createSearchInvoker(VespaBackEndSearcher searcher, Query query, OptionalInt groupId, - List nodes, boolean acceptIncompleteCoverage) { + public Optional getSearchInvoker(Query query, OptionalInt groupId, List nodes, boolean acceptIncompleteCoverage) { if (step >= events.length) { throw new RuntimeException("Was not expecting more calls to getSearchInvoker"); } @@ -133,14 +130,4 @@ public class DispatcherTest { assertThat(step, is(events.length)); } } - - public class MockRpcInvokerFactory extends RpcInvokerFactory { - public MockRpcInvokerFactory() { - super(null, null); - } - - @Override - public void release() { - } - } } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java new file mode 100644 index 00000000000..173b2de494f --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/dispatch/FillTestCase.java @@ -0,0 +1,167 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.prelude.fastsearch.DocsumDefinition; +import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; +import com.yahoo.prelude.fastsearch.DocsumField; +import com.yahoo.prelude.fastsearch.DocumentDatabase; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * Tests using a dispatcher to fill a result + * + * @author bratseth + */ +public class FillTestCase { + + private MockClient client = new MockClient(); + + @Test + public void testFilling() { + Map nodes = new HashMap<>(); + nodes.put(0, client.createConnection("host0", 123)); + nodes.put(1, client.createConnection("host1", 123)); + nodes.put(2, client.createConnection("host2", 123)); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + + Query query = new Query(); + Result result = new Result(query); + result.hits().add(createHit(0, 0)); + result.hits().add(createHit(2, 1)); + result.hits().add(createHit(1, 2)); + result.hits().add(createHit(2, 3)); + result.hits().add(createHit(0, 4)); + + client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); + client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); + client.setDocsumReponse("host1", 2, "summaryClass1", map("field1", "s.1.2", "field2", 2)); + client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); + client.setDocsumReponse("host0", 4, "summaryClass1", map("field1", "s.0.4", "field2", 4)); + + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + + assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); + assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); + assertEquals("s.1.2", result.hits().get("hit:2").getField("field1").toString()); + assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); + assertEquals("s.0.4", result.hits().get("hit:4").getField("field1").toString()); + assertEquals(0L, result.hits().get("hit:0").getField("field2")); + assertEquals(1L, result.hits().get("hit:1").getField("field2")); + assertEquals(2L, result.hits().get("hit:2").getField("field2")); + assertEquals(3L, result.hits().get("hit:3").getField("field2")); + assertEquals(4L, result.hits().get("hit:4").getField("field2")); + } + + @Test + public void testEmptyHits() { + Map nodes = new HashMap<>(); + nodes.put(0, client.createConnection("host0", 123)); + nodes.put(1, client.createConnection("host1", 123)); + nodes.put(2, client.createConnection("host2", 123)); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + + Query query = new Query(); + Result result = new Result(query); + result.hits().add(createHit(0, 0)); + result.hits().add(createHit(2, 1)); + result.hits().add(createHit(1, 2)); + result.hits().add(createHit(2, 3)); + result.hits().add(createHit(0, 4)); + + client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); + client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); + client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>()); + client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); + client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>()); + + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + + assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); + assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); + assertNull(result.hits().get("hit:2").getField("field1")); + assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); + assertNull(result.hits().get("hit:4").getField("field1")); + + assertEquals(0L, result.hits().get("hit:0").getField("field2")); + assertEquals(1L, result.hits().get("hit:1").getField("field2")); + assertNull(result.hits().get("hit:2").getField("field2")); + assertEquals(3L, result.hits().get("hit:3").getField("field2")); + assertNull(result.hits().get("hit:4").getField("field2")); + + assertEquals("Missing hit summary data for summary summaryClass1 for 2 hits", result.hits().getError().getDetailedMessage()); + } + + @Test + public void testErrorHandling() { + client.setMalfunctioning(true); + + Map nodes = new HashMap<>(); + nodes.put(0, client.createConnection("host0", 123)); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + + Query query = new Query(); + Result result = new Result(query); + result.hits().add(createHit(0, 0)); + + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + + assertEquals("Malfunctioning", result.hits().getError().getDetailedMessage()); + } + + @Test + public void testSendingFill2UnknownNode() { + client.setMalfunctioning(true); + + Map nodes = new HashMap<>(); + nodes.put(0, client.createConnection("host0", 123)); + RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); + + Query query = new Query(); + Result result = new Result(query); + result.hits().add(createHit(0, 0)); + result.hits().add(createHit(1, 1)); + + + rpcResourcePool.getFillInvoker(db()).fill(result, "summaryClass1"); + + assertEquals("Could not fill hits from unknown node 1", result.hits().getError().getDetailedMessage()); + } + + private DocumentDatabase db() { + List fields = new ArrayList<>(); + fields.add(DocsumField.create("field1", "string")); + fields.add(DocsumField.create("field2", "int64")); + DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1", + fields))); + return new DocumentDatabase("default", docsums, Collections.emptySet()); + } + + private FastHit createHit(int sourceNodeId, int hitId) { + FastHit hit = new FastHit("hit:" + hitId, 1.0); + hit.setPartId(sourceNodeId); + hit.setDistributionKey(sourceNodeId); + hit.setGlobalId(client.globalIdFrom(hitId)); + return hit; + } + + private Map map(String stringKey, String stringValue, String intKey, int intValue) { + Map map = new HashMap<>(); + map.put(stringKey, stringValue); + map.put(intKey, intValue); + return map; + } + +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java new file mode 100644 index 00000000000..a4cb8ae641c --- /dev/null +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockClient.java @@ -0,0 +1,121 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.document.GlobalId; +import com.yahoo.document.idstring.IdIdString; +import com.yahoo.prelude.fastsearch.FastHit; +import com.yahoo.slime.ArrayTraverser; +import com.yahoo.slime.BinaryFormat; +import com.yahoo.slime.Cursor; +import com.yahoo.slime.Inspector; +import com.yahoo.slime.Slime; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author bratseth + */ +public class MockClient implements Client { + + private final Map> docsums = new HashMap<>(); + private final Compressor compressor = new Compressor(); + private boolean malfunctioning = false; + + /** Set to true to cause this to produce an error instead of a regular response */ + public void setMalfunctioning(boolean malfunctioning) { this.malfunctioning = malfunctioning; } + + @Override + public NodeConnection createConnection(String hostname, int port) { + return new MockNodeConnection(hostname, port); + } + + @Override + public void getDocsums(List hitsContext, NodeConnection node, CompressionType compression, + int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, + double timeoutSeconds) { + if (malfunctioning) { + responseReceiver.receive(GetDocsumsResponseOrError.fromError("Malfunctioning")); + return; + } + + Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get(); + String docsumClass = request.field("class").asString(); + List> docsumsToReturn = new ArrayList<>(); + request.field("gids").traverse((ArrayTraverser)(index, gid) -> { + GlobalId docId = new GlobalId(gid.asData()); + docsumsToReturn.add(docsums.get(new DocsumKey(node.toString(), docId, docsumClass))); + }); + Slime responseSlime = new Slime(); + Cursor root = responseSlime.setObject(); + Cursor docsums = root.setArray("docsums"); + for (Map docsumFields : docsumsToReturn) { + Cursor docsumItem = docsums.addObject(); + Cursor docsum = docsumItem.setObject("docsum"); + for (Map.Entry field : docsumFields.entrySet()) { + if (field.getValue() instanceof Integer) + docsum.setLong(field.getKey(), (Integer)field.getValue()); + else if (field.getValue() instanceof String) + docsum.setString(field.getKey(), (String)field.getValue()); + else + throw new RuntimeException(); + } + } + byte[] slimeBytes = BinaryFormat.encode(responseSlime); + Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes); + GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length, + compressionResult.data(), hitsContext); + responseReceiver.receive(GetDocsumsResponseOrError.fromResponse(response)); + } + + public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map docsumValues) { + docsums.put(new DocsumKey(nodeId, globalIdFrom(docId), docsumClass), docsumValues); + } + + public GlobalId globalIdFrom(int hitId) { + return new GlobalId(new IdIdString("", "test", "", String.valueOf(hitId))); + } + + private static class MockNodeConnection implements Client.NodeConnection { + + private final String hostname; + + public MockNodeConnection(String hostname, int port) { + this.hostname = hostname; + } + + @Override + public void close() { } + + @Override + public String toString() { return hostname; } + + } + + private static class DocsumKey { + + private final String internalKey; + + public DocsumKey(String nodeId, GlobalId docId, String docsumClass) { + internalKey = docsumClass + "." + nodeId + "." + docId; + } + + @Override + public int hashCode() { return internalKey.hashCode(); } + + @Override + public boolean equals(Object other) { + if ( ! (other instanceof DocsumKey)) return false; + return ((DocsumKey)other).internalKey.equals(this.internalKey); + } + + @Override + public String toString() { return internalKey; } + + } + +} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java deleted file mode 100644 index 2adbd12a2aa..00000000000 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/FillTestCase.java +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.prelude.fastsearch.DocsumDefinition; -import com.yahoo.prelude.fastsearch.DocsumDefinitionSet; -import com.yahoo.prelude.fastsearch.DocsumField; -import com.yahoo.prelude.fastsearch.DocumentDatabase; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.rpc.Client; -import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; -import com.yahoo.search.dispatch.rpc.RpcResourcePool; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; - - -/** - * Tests using a dispatcher to fill a result - * - * @author bratseth - */ -public class FillTestCase { - - private MockClient client = new MockClient(); - - @Test - public void testFilling() { - Map nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - nodes.put(1, client.createConnection("host1", 123)); - nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(2, 1)); - result.hits().add(createHit(1, 2)); - result.hits().add(createHit(2, 3)); - result.hits().add(createHit(0, 4)); - - client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); - client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); - client.setDocsumReponse("host1", 2, "summaryClass1", map("field1", "s.1.2", "field2", 2)); - client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); - client.setDocsumReponse("host0", 4, "summaryClass1", map("field1", "s.0.4", "field2", 4)); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); - assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); - assertEquals("s.1.2", result.hits().get("hit:2").getField("field1").toString()); - assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); - assertEquals("s.0.4", result.hits().get("hit:4").getField("field1").toString()); - assertEquals(0L, result.hits().get("hit:0").getField("field2")); - assertEquals(1L, result.hits().get("hit:1").getField("field2")); - assertEquals(2L, result.hits().get("hit:2").getField("field2")); - assertEquals(3L, result.hits().get("hit:3").getField("field2")); - assertEquals(4L, result.hits().get("hit:4").getField("field2")); - } - - @Test - public void testEmptyHits() { - Map nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - nodes.put(1, client.createConnection("host1", 123)); - nodes.put(2, client.createConnection("host2", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(2, 1)); - result.hits().add(createHit(1, 2)); - result.hits().add(createHit(2, 3)); - result.hits().add(createHit(0, 4)); - - client.setDocsumReponse("host0", 0, "summaryClass1", map("field1", "s.0.0", "field2", 0)); - client.setDocsumReponse("host2", 1, "summaryClass1", map("field1", "s.2.1", "field2", 1)); - client.setDocsumReponse("host1", 2, "summaryClass1", new HashMap<>()); - client.setDocsumReponse("host2", 3, "summaryClass1", map("field1", "s.2.3", "field2", 3)); - client.setDocsumReponse("host0", 4, "summaryClass1",new HashMap<>()); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("s.0.0", result.hits().get("hit:0").getField("field1").toString()); - assertEquals("s.2.1", result.hits().get("hit:1").getField("field1").toString()); - assertNull(result.hits().get("hit:2").getField("field1")); - assertEquals("s.2.3", result.hits().get("hit:3").getField("field1").toString()); - assertNull(result.hits().get("hit:4").getField("field1")); - - assertEquals(0L, result.hits().get("hit:0").getField("field2")); - assertEquals(1L, result.hits().get("hit:1").getField("field2")); - assertNull(result.hits().get("hit:2").getField("field2")); - assertEquals(3L, result.hits().get("hit:3").getField("field2")); - assertNull(result.hits().get("hit:4").getField("field2")); - - assertEquals("Missing hit summary data for summary summaryClass1 for 2 hits", result.hits().getError().getDetailedMessage()); - } - - @Test - public void testErrorHandling() { - client.setMalfunctioning(true); - - Map nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("Malfunctioning", result.hits().getError().getDetailedMessage()); - } - - @Test - public void testSendingFill2UnknownNode() { - client.setMalfunctioning(true); - - Map nodes = new HashMap<>(); - nodes.put(0, client.createConnection("host0", 123)); - RpcResourcePool rpcResourcePool = new RpcResourcePool(client, nodes); - RpcInvokerFactory factory = new RpcInvokerFactory(rpcResourcePool, null); - - Query query = new Query(); - Result result = new Result(query); - result.hits().add(createHit(0, 0)); - result.hits().add(createHit(1, 1)); - - - factory.createFillInvoker(db()).fill(result, "summaryClass1"); - - assertEquals("Could not fill hits from unknown node 1", result.hits().getError().getDetailedMessage()); - } - - private DocumentDatabase db() { - List fields = new ArrayList<>(); - fields.add(DocsumField.create("field1", "string")); - fields.add(DocsumField.create("field2", "int64")); - DocsumDefinitionSet docsums = new DocsumDefinitionSet(Collections.singleton(new DocsumDefinition("summaryClass1", - fields))); - return new DocumentDatabase("default", docsums, Collections.emptySet()); - } - - private FastHit createHit(int sourceNodeId, int hitId) { - FastHit hit = new FastHit("hit:" + hitId, 1.0); - hit.setPartId(sourceNodeId); - hit.setDistributionKey(sourceNodeId); - hit.setGlobalId(client.globalIdFrom(hitId)); - return hit; - } - - private Map map(String stringKey, String stringValue, String intKey, int intValue) { - Map map = new HashMap<>(); - map.put(stringKey, stringValue); - map.put(intKey, intValue); - return map; - } - -} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java deleted file mode 100644 index f9b628e594a..00000000000 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/MockClient.java +++ /dev/null @@ -1,144 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.search.dispatch.rpc; - -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.document.GlobalId; -import com.yahoo.document.idstring.IdIdString; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.rpc.Client; -import com.yahoo.search.dispatch.rpc.RpcFillInvoker; -import com.yahoo.search.dispatch.rpc.RpcSearchInvoker; -import com.yahoo.slime.ArrayTraverser; -import com.yahoo.slime.BinaryFormat; -import com.yahoo.slime.Cursor; -import com.yahoo.slime.Inspector; -import com.yahoo.slime.Slime; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author bratseth - */ -public class MockClient implements Client { - - private final Map> docsums = new HashMap<>(); - private final Compressor compressor = new Compressor(); - private boolean malfunctioning = false; - private Result searchResult; - - /** Set to true to cause this to produce an error instead of a regular response */ - public void setMalfunctioning(boolean malfunctioning) { this.malfunctioning = malfunctioning; } - - @Override - public NodeConnection createConnection(String hostname, int port) { - return new MockNodeConnection(hostname, port); - } - - @Override - public void getDocsums(List hitsContext, NodeConnection node, CompressionType compression, - int uncompressedSize, byte[] compressedSlime, RpcFillInvoker.GetDocsumsResponseReceiver responseReceiver, - double timeoutSeconds) { - if (malfunctioning) { - responseReceiver.receive(GetDocsumsResponseOrError.fromError("Malfunctioning")); - return; - } - - Inspector request = BinaryFormat.decode(compressor.decompress(compressedSlime, compression, uncompressedSize)).get(); - String docsumClass = request.field("class").asString(); - List> docsumsToReturn = new ArrayList<>(); - request.field("gids").traverse((ArrayTraverser)(index, gid) -> { - GlobalId docId = new GlobalId(gid.asData()); - docsumsToReturn.add(docsums.get(new DocsumKey(node.toString(), docId, docsumClass))); - }); - Slime responseSlime = new Slime(); - Cursor root = responseSlime.setObject(); - Cursor docsums = root.setArray("docsums"); - for (Map docsumFields : docsumsToReturn) { - Cursor docsumItem = docsums.addObject(); - Cursor docsum = docsumItem.setObject("docsum"); - for (Map.Entry field : docsumFields.entrySet()) { - if (field.getValue() instanceof Integer) - docsum.setLong(field.getKey(), (Integer)field.getValue()); - else if (field.getValue() instanceof String) - docsum.setString(field.getKey(), (String)field.getValue()); - else - throw new RuntimeException(); - } - } - byte[] slimeBytes = BinaryFormat.encode(responseSlime); - Compressor.Compression compressionResult = compressor.compress(compression, slimeBytes); - GetDocsumsResponse response = new GetDocsumsResponse(compressionResult.type().getCode(), slimeBytes.length, - compressionResult.data(), hitsContext); - responseReceiver.receive(GetDocsumsResponseOrError.fromResponse(response)); - } - - @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { - if (malfunctioning) { - responseReceiver.receive(SearchResponseOrError.fromError("Malfunctioning")); - return; - } - - if(searchResult == null) { - responseReceiver.receive(SearchResponseOrError.fromError("No result defined")); - return; - } - var payload = ProtobufSerialization.serializeResult(searchResult); - var compressionResult = compressor.compress(compression, payload); - var response = new SearchResponse(compressionResult.type().getCode(), payload.length, compressionResult.data()); - responseReceiver.receive(SearchResponseOrError.fromResponse(response)); - } - - public void setDocsumReponse(String nodeId, int docId, String docsumClass, Map docsumValues) { - docsums.put(new DocsumKey(nodeId, globalIdFrom(docId), docsumClass), docsumValues); - } - - public GlobalId globalIdFrom(int hitId) { - return new GlobalId(new IdIdString("", "test", "", String.valueOf(hitId))); - } - - private static class MockNodeConnection implements Client.NodeConnection { - - private final String hostname; - - public MockNodeConnection(String hostname, int port) { - this.hostname = hostname; - } - - @Override - public void close() { } - - @Override - public String toString() { return hostname; } - - } - - private static class DocsumKey { - - private final String internalKey; - - public DocsumKey(String nodeId, GlobalId docId, String docsumClass) { - internalKey = docsumClass + "." + nodeId + "." + docId; - } - - @Override - public int hashCode() { return internalKey.hashCode(); } - - @Override - public boolean equals(Object other) { - if ( ! (other instanceof DocsumKey)) return false; - return ((DocsumKey)other).internalKey.equals(this.internalKey); - } - - @Override - public String toString() { return internalKey; } - - } - -} diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java deleted file mode 100644 index 4792b45281f..00000000000 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -package com.yahoo.search.dispatch.rpc; - -import ai.vespa.searchlib.searchprotocol.protobuf.Search; -import com.google.common.collect.ImmutableMap; -import com.yahoo.compress.CompressionType; -import com.yahoo.fs4.QueryPacket; -import com.yahoo.prelude.fastsearch.FastHit; -import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; -import com.yahoo.search.Query; -import com.yahoo.search.Result; -import com.yahoo.search.dispatch.rpc.Client; -import com.yahoo.search.dispatch.rpc.RpcResourcePool; -import com.yahoo.search.dispatch.rpc.RpcSearchInvoker; -import com.yahoo.search.dispatch.rpc.RpcFillInvoker.GetDocsumsResponseReceiver; -import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.searchchain.Execution; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; - -/** - * @author ollivir - */ -public class RpcSearchInvokerTest { - @Test - public void testProtobufSerialization() throws IOException { - var compressionTypeHolder = new AtomicReference(); - var payloadHolder = new AtomicReference(); - var lengthHolder = new AtomicInteger(); - var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); - var mockPool = new RpcResourcePool(mockClient, ImmutableMap.of(7, () -> {})); - @SuppressWarnings("resource") - var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 77, 1), mockPool); - - Query q = new Query("search/?query=test&hits=10&offset=3"); - invoker.sendSearchRequest(q, null); - - var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); - var request = Search.Request.newBuilder().mergeFrom(bytes).build(); - - assertThat(request.getHits(), equalTo(10)); - assertThat(request.getOffset(), equalTo(3)); - assertThat(request.getQueryTreeBlob().size(), greaterThan(0)); - } - - private Client parameterCollectorClient(AtomicReference compressionTypeHolder, AtomicReference payloadHolder, - AtomicInteger lengthHolder) { - return new Client() { - @Override - public void search(NodeConnection node, CompressionType compression, int uncompressedLength, byte[] compressedPayload, - RpcSearchInvoker responseReceiver, double timeoutSeconds) { - compressionTypeHolder.set(compression); - payloadHolder.set(compressedPayload); - lengthHolder.set(uncompressedLength); - } - - @Override - public void getDocsums(List hits, NodeConnection node, CompressionType compression, int uncompressedLength, - byte[] compressedSlime, GetDocsumsResponseReceiver responseReceiver, double timeoutSeconds) { - fail("Unexpected call"); - } - - @Override - public NodeConnection createConnection(String hostname, int port) { - fail("Unexpected call"); - return null; - } - }; - } - - private VespaBackEndSearcher mockSearcher() { - return new VespaBackEndSearcher() { - @Override - protected Result doSearch2(Query query, QueryPacket queryPacket, Execution execution) { - fail("Unexpected call"); - return null; - } - - @Override - protected void doPartialFill(Result result, String summaryClass) { - fail("Unexpected call"); - } - }; - } -} -- cgit v1.2.3