diff options
19 files changed, 194 insertions, 167 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java index 34ddd8e16cc..a2155cc8649 100644 --- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java +++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java @@ -84,7 +84,7 @@ public class FastSearcher extends VespaBackEndSearcher { @Override public Result doSearch2(Query query, Execution execution) { - if (dispatcher.searchCluster().allGroupsHaveSize1()) + if (dispatcher.allGroupsHaveSize1()) forceSinglePassGrouping(query); try (SearchInvoker invoker = getSearchInvoker(query)) { Result result = invoker.search(query, execution); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 36c5c8a16fa..b7a2d9f70ba 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -58,9 +58,9 @@ public class Dispatcher extends AbstractComponent { private final ClusterMonitor<Node> clusterMonitor; private final LoadBalancer loadBalancer; private final InvokerFactory invokerFactory; - private final RpcResourcePool rpcResourcePool; private final RpcClient rpcClient; private final int maxHitsPerNode; + private final RpcResourcePool rpcResourcePool; private static final QueryProfileType argumentType; @@ -101,7 +101,7 @@ public class Dispatcher extends AbstractComponent { RpcClient rpcClient, RpcResourcePool rpcResourcePool) { this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, - new RpcInvokerFactory(rpcResourcePool, searchCluster), + new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig), rpcClient, rpcResourcePool); } @@ -161,9 +161,8 @@ public class Dispatcher extends AbstractComponent { new Compressor().warmup(seconds); } - /** Returns the search cluster this dispatches to */ - public SearchCluster searchCluster() { - return searchCluster; + public boolean allGroupsHaveSize1() { + return searchCluster.allGroupsHaveSize1(); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index 39be91cf3e8..8751a969fe0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; @@ -38,27 +37,37 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private final Timer timer; private final Set<SearchInvoker> invokers; - private final SearchCluster searchCluster; + private final DispatchConfig dispatchConfig; private final Group group; private final LinkedBlockingQueue<SearchInvoker> availableForProcessing; private final Set<Integer> alreadyFailedNodes; private final CoverageAggregator coverageAggregator; + private final TopKEstimator hitEstimator; private Query query; private TimeoutHandler timeoutHandler; public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers, - SearchCluster searchCluster, + TopKEstimator hitEstimator, + DispatchConfig dispatchConfig, Group group, Set<Integer> alreadyFailedNodes) { super(Optional.empty()); this.timer = timer; this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); this.invokers.addAll(invokers); - this.searchCluster = searchCluster; + this.dispatchConfig = dispatchConfig; this.group = group; this.availableForProcessing = newQueue(); this.alreadyFailedNodes = alreadyFailedNodes; - coverageAggregator = new CoverageAggregator(invokers.size()); + this.coverageAggregator = new CoverageAggregator(invokers.size()); + this.hitEstimator = hitEstimator; + } + + private int estimateHitsToFetch(int wantedHits, int numPartitions) { + return hitEstimator.estimateK(wantedHits, numPartitions); + } + private int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) { + return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability); } private TimeoutHandler createTimeoutHandler(DispatchConfig config, int askedNodes, Query query) { @@ -84,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM if (group.isBalanced() && !group.isSparse()) { Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability); q = (topkProbabilityOverrride != null) - ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) - : searchCluster.estimateHitsToFetch(neededHits, invokers.size()); + ? estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) + : estimateHitsToFetch(neededHits, invokers.size()); } query.setHits(q); query.setOffset(0); @@ -94,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM for (SearchInvoker invoker : invokers) { context = invoker.sendSearchRequest(query, context); } - timeoutHandler = createTimeoutHandler(searchCluster.dispatchConfig(), invokers.size(), query); + timeoutHandler = createTimeoutHandler(dispatchConfig, invokers.size(), query); query.setHits(originalHits); query.setOffset(originalOffset); @@ -127,7 +136,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h)); insertNetworkErrors(result.getResult()); - CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)searchCluster.dispatchConfig().redundancy(), timeoutHandler); + CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)dispatchConfig.redundancy(), timeoutHandler); result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler)); int needed = query.getOffset() + query.getHits(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 02cf11c9fe7..caeae9c2c1d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.vespa.config.search.DispatchConfig; import java.util.ArrayList; import java.util.HashSet; @@ -21,11 +22,16 @@ import java.util.Set; * @author ollivir */ public abstract class InvokerFactory { + private static final double SKEW_FACTOR = 0.05; - protected final SearchCluster searchCluster; + private final SearchCluster searchCluster; + private final DispatchConfig dispatchConfig; + private final TopKEstimator hitEstimator; - public InvokerFactory(SearchCluster searchCluster) { + public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) { this.searchCluster = searchCluster; + this.dispatchConfig = dispatchConfig; + this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); } protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, @@ -90,7 +96,7 @@ public abstract class InvokerFactory { if (invokers.size() == 1 && failed == null) { return Optional.of(invokers.get(0)); } else { - return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, searchCluster, group, failed)); + return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, failed)); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java new file mode 100644 index 00000000000..786b804edaa --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +import com.yahoo.compress.Compressor; +import com.yahoo.search.Query; + +/** + * Interface for compressing and decompressing request/response + * + * @author baldersheim + */ +public interface CompressPayload { + Compressor.Compression compress(Query query, byte[] payload); + byte[] decompress(Client.ProtobufResponse response); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java new file mode 100644 index 00000000000..9e7fc9b5b29 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; + +/** + * Implement interface to compress/decompress request/response + * + * @author baldersheim + */ +public class CompressService implements CompressPayload { + /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ + public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); + private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 256); + + + @Override + public Compressor.Compression compress(Query query, byte[] payload) { + CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); + return compressor.compress(compression, payload); + } + + @Override + public byte[] decompress(Client.ProtobufResponse response) { + CompressionType compression = CompressionType.valueOf(response.compression()); + return compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize()); + } + Compressor compressor() { return compressor; } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java new file mode 100644 index 00000000000..fd8e0e4f81a --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java @@ -0,0 +1,11 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +/** + * Interface for getting a connection given a node id. + * + * @author balderersheim + */ +public interface RpcConnectionPool { + Client.NodeConnection getConnection(int nodeId); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index a1ebea3b695..d5b1b876540 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -9,6 +9,7 @@ import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Optional; @@ -17,19 +18,18 @@ import java.util.Optional; */ public class RpcInvokerFactory extends InvokerFactory { - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; + private final CompressPayload compressor; - public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) { - super(searchCluster); + public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) { + super(searchCluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; + this.compressor = new CompressService(); } @Override - protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, - Query query, - int maxHits, - Node node) { - return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits)); + protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) { + return Optional.of(new RpcSearchInvoker(searcher, compressor, node, rpcResourcePool, maxHits)); } @Override @@ -37,7 +37,6 @@ public class RpcInvokerFactory extends InvokerFactory { Query query = result.getQuery(); boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); - return new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery); + return new RpcProtobufFillInvoker(rpcResourcePool, compressor, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery); } - } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 44f0af2aca1..53dc54f7bc5 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -25,17 +25,19 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { private static final boolean triggeredClassLoading = ErrorMessage.createBackendCommunicationError("TriggerClassLoading") instanceof ErrorMessage; private final Node node; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool connectionPool; private final ClusterMonitor<Node> clusterMonitor; private final long pingSequenceId; private final PongHandler pongHandler; + private final Compressor compressor; - public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool, PongHandler pongHandler) { + public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcConnectionPool connectionPool, PongHandler pongHandler, Compressor compressor) { this.node = node; - this.resourcePool = rpcResourcePool; + this.connectionPool = connectionPool; this.clusterMonitor = clusterMonitor; - pingSequenceId = node.createPingSequenceId(); + this.pingSequenceId = node.createPingSequenceId(); this.pongHandler = pongHandler; + this. compressor = compressor; } @Override @@ -63,16 +65,16 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { } private void sendPing() { - var connection = resourcePool.getConnection(node.key()); + var connection = connectionPool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); + Compressor.Compression compressionResult = compressor.compress(PING_COMPRESSION, ping); connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), this, timeoutSeconds); } private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize()); + byte[] responseBytes = compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize()); var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes); if (reply.getDistributionKey() != node.key()) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java index 2e690198c1c..31e152c6ff9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; @@ -9,15 +11,16 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler; public class RpcPingFactory implements PingFactory { - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; + private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 512); - public RpcPingFactory(RpcResourcePool rpcResourcePool) { + public RpcPingFactory(RpcConnectionPool rpcResourcePool) { this.rpcResourcePool = rpcResourcePool; } @Override public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) { - return new RpcPing(node, monitor, rpcResourcePool, pongHandler); + return new RpcPing(node, monitor, rpcResourcePool, pongHandler, compressor); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index c84795352f5..4e538fb54dc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; import com.google.protobuf.InvalidProtocolBufferException; import com.yahoo.collections.ListMap; import com.yahoo.collections.Pair; -import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.container.protect.Error; import com.yahoo.data.access.Inspector; @@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker { private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName()); private final DocumentDatabase documentDb; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool resourcePool; private final boolean summaryNeedsQuery; private final String serverId; + private final CompressPayload compressor; private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses; @@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker { /** The number of responses we should receive (and process) before this is complete */ private int outstandingResponses; - RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { + RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { this.documentDb = documentDb; this.resourcePool = resourcePool; this.serverId = serverId; this.summaryNeedsQuery = summaryNeedsQuery; + this.compressor = compressor; } @Override @@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker { ListMap<Integer, FastHit> hitsByNode = new ListMap<>(); for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) { Hit h = i.next(); - if (!(h instanceof FastHit)) + if (!(h instanceof FastHit hit)) continue; - FastHit hit = (FastHit) h; hitsByNode.put(hit.getDistributionKey(), hit); } @@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { } Query query = result.getQuery(); - Compressor.Compression compressionResult = resourcePool.compress(query, payload); + Compressor.Compression compressionResult = compressor.compress(query, payload); node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits), clientTimeout); } @@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { hasReportedError = true; } else { Client.ProtobufResponse response = responseOrError.response().get(); - CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, - response.uncompressedSize()); + byte[] responseBytes = compressor.decompress(response); return fill(result, hitsContext, summaryClass, responseBytes); } return 0; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index bbe60d0df23..71e8cc0baa8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -2,11 +2,6 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.compress.Compressor.Compression; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.Query; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.vespa.config.search.DispatchNodesConfig; @@ -23,12 +18,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool implements AutoCloseable { - - /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ - public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - - private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32); +public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; @@ -54,15 +44,7 @@ public class RpcResourcePool implements AutoCloseable { this.nodeConnectionPools = builder.build(); } - public Compressor compressor() { - return compressor; - } - - public Compression compress(Query query, byte[] payload) { - CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); - return compressor.compress(compression, payload); - } - + @Override public NodeConnection getConnection(int nodeId) { var pool = nodeConnectionPools.get(nodeId); if (pool == null) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index 64e0dd666dd..31de0b147c9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; -import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; @@ -29,19 +28,21 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe private final VespaBackEndSearcher searcher; private final Node node; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool resourcePool; private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses; private final int maxHits; + private final CompressPayload compressor; private Query query; - RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) { + RpcSearchInvoker(VespaBackEndSearcher searcher, CompressPayload compressor, Node node, RpcConnectionPool resourcePool, int maxHits) { super(Optional.of(node)); this.searcher = searcher; this.node = node; this.resourcePool = resourcePool; this.responses = new LinkedBlockingQueue<>(1); this.maxHits = maxHits; + this.compressor = compressor; } @Override @@ -78,7 +79,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe if (incomingContext instanceof RpcContext) return (RpcContext)incomingContext; - return new RpcContext(resourcePool, query, + return new RpcContext(compressor, query, ProtobufSerialization.serializeSearchRequest(query, Math.min(query.getHits(), maxHits), searcher.getServerId(), requestTimeout)); @@ -110,8 +111,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe } ProtobufResponse protobufResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(protobufResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize()); + byte[] payload = compressor.decompress(protobufResponse); return ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key()); } @@ -133,8 +133,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe final Compressor.Compression compressedPayload; - RpcContext(RpcResourcePool resourcePool, Query query, byte[] payload) { - compressedPayload = resourcePool.compress(query, payload); + RpcContext(CompressPayload compressor, Query query, byte[] payload) { + compressedPayload = compressor.compress(query, payload); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 54a3e42b9ab..ca2fce0b32b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -8,7 +8,6 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.search.dispatch.TopKEstimator; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; @@ -29,16 +28,14 @@ public class SearchCluster implements NodeManager<Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - private final DispatchConfig dispatchConfig; + private final double minActivedocsPercentage; private final String clusterId; + private final VipStatus vipStatus; + private final PingFactory pingFactory; private final Map<Integer, Group> groups; private final List<Group> orderedGroups; private final List<Node> nodes; - private final VipStatus vipStatus; - private final PingFactory pingFactory; - private final TopKEstimator hitEstimator; private long nextLogTime = 0; - private static final double SKEW_FACTOR = 0.05; /** * A search node on this local machine having the entire corpus, which we therefore @@ -48,13 +45,13 @@ public class SearchCluster implements NodeManager<Node> { * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ - private final Optional<Node> localCorpusDispatchTarget; + private final Node localCorpusDispatchTarget; public SearchCluster(String clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; - this.dispatchConfig = dispatchConfig; + this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage(); this.vipStatus = vipStatus; this.pingFactory = pingFactory; @@ -71,7 +68,6 @@ public class SearchCluster implements NodeManager<Node> { nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group()))); this.orderedGroups = List.copyOf(groupIntroductionOrder.values()); - hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups); } @@ -85,7 +81,7 @@ public class SearchCluster implements NodeManager<Node> { } } - private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname, + private static Node findLocalCorpusDispatchTarget(String selfHostname, List<Node> nodes, Map<Integer, Group> groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. @@ -96,15 +92,15 @@ public class SearchCluster implements NodeManager<Node> { .filter(node -> node.hostname().equals(selfHostname)) .toList(); // Only use direct dispatch if we have exactly 1 search node on the same machine: - if (localSearchNodes.size() != 1) return Optional.empty(); + if (localSearchNodes.size() != 1) return null; Node localSearchNode = localSearchNodes.iterator().next(); Group localSearchGroup = groups.get(localSearchNode.group()); // Only use direct dispatch if the local search node has the entire corpus - if (localSearchGroup.nodes().size() != 1) return Optional.empty(); + if (localSearchGroup.nodes().size() != 1) return null; - return Optional.of(localSearchNode); + return localSearchNode; } private static List<Node> toNodes(DispatchNodesConfig nodesConfig) { @@ -113,13 +109,6 @@ public class SearchCluster implements NodeManager<Node> { .toList(); } - public DispatchConfig dispatchConfig() { - return dispatchConfig; - } - - /** Returns an immutable list of all nodes in this. */ - public List<Node> nodes() { return nodes; } - /** Returns the groups of this cluster as an immutable map indexed by group id */ public Map<Integer, Group> groups() { return groups; } @@ -148,16 +137,16 @@ public class SearchCluster implements NodeManager<Node> { * or empty if we should not dispatch directly. */ public Optional<Node> localCorpusDispatchTarget() { - if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty(); + if ( localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group()); + Group localSearchGroup = groups().get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down - if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty(); + if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); - return localCorpusDispatchTarget; + return Optional.of(localCorpusDispatchTarget); } private void updateWorkingState(Node node, boolean isWorking) { @@ -185,7 +174,7 @@ public class SearchCluster implements NodeManager<Node> { } private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) { - if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster + if (localCorpusDispatchTarget == null) { // consider entire cluster if (hasInformationAboutAllNodes()) setInRotationOnlyIf(hasWorkingNodes()); } @@ -198,7 +187,7 @@ public class SearchCluster implements NodeManager<Node> { } private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) { - if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster + if ( localCorpusDispatchTarget == null) { // consider entire cluster // VIP status does not depend on coverage } else if (usesLocalCorpusIn(group)) { // follow the status of this group @@ -213,13 +202,6 @@ public class SearchCluster implements NodeManager<Node> { vipStatus.removeFromRotation(clusterId); } - public int estimateHitsToFetch(int wantedHits, int numPartitions) { - return hitEstimator.estimateK(wantedHits, numPartitions); - } - public int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) { - return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability); - } - public boolean hasInformationAboutAllNodes() { return nodes.stream().allMatch(node -> node.isWorking() != null); } @@ -229,11 +211,11 @@ public class SearchCluster implements NodeManager<Node> { } private boolean usesLocalCorpusIn(Node node) { - return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node); + return node.equals(localCorpusDispatchTarget); } private boolean usesLocalCorpusIn(Group group) { - return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); + return (localCorpusDispatchTarget != null) && localCorpusDispatchTarget.group() == group.id(); } /** Used by the cluster monitor to manage node status */ @@ -286,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> { private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; - if (medianDocuments > 0 && documentCoverage < dispatchConfig.minActivedocsPercentage()) + if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage) return false; return true; } diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java index 974441fc5fc..10e188d092c 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java @@ -33,7 +33,7 @@ class MockDispatcher extends Dispatcher { } private MockDispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcResourcePool rpcResourcePool) { - this(clusterMonitor, searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster)); + this(clusterMonitor, searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig)); } private MockDispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java index a6c07b74a92..bc92afdb8fc 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java @@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.PingFactory; import com.yahoo.search.dispatch.searchcluster.Pinger; import com.yahoo.search.dispatch.searchcluster.PongHandler; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.vespa.config.search.DispatchConfig; import org.junit.jupiter.api.Test; import java.util.List; @@ -26,18 +27,19 @@ import static org.junit.jupiter.api.Assertions.fail; * @author ollivir */ public class DispatcherTest { + private final DispatchConfig dispatchConfig = createDispatchConfig(); @Test void requireThatDispatcherSupportsSearchPath() { SearchCluster cl = new MockSearchCluster("1", 2, 2); Query q = new Query(); q.getModel().setSearchPath("1/0"); // second node in first group - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (nodes, a) -> { + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (nodes, a) -> { assertEquals(1, nodes.size()); assertEquals(1, nodes.get(0).key()); return true; }); - Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory); + Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory); SearchInvoker invoker = disp.getSearchInvoker(q, null); assertNotNull(invoker); invokerFactory.verifyAllEventsProcessed(); @@ -52,8 +54,8 @@ public class DispatcherTest { return Optional.of(new Node(1, "test", 1)); } }; - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> true); - Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory); + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, a) -> true); + Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory); SearchInvoker invoker = disp.getSearchInvoker(new Query(), null); assertNotNull(invoker); invokerFactory.verifyAllEventsProcessed(); @@ -64,14 +66,14 @@ public class DispatcherTest { void requireThatInvokerConstructionIsRetriedAndLastAcceptsAnyCoverage() { SearchCluster cl = new MockSearchCluster("1", 2, 1); - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, acceptIncompleteCoverage) -> { + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, acceptIncompleteCoverage) -> { assertFalse(acceptIncompleteCoverage); return false; }, (n, acceptIncompleteCoverage) -> { assertTrue(acceptIncompleteCoverage); return true; }); - Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory); + Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory); SearchInvoker invoker = disp.getSearchInvoker(new Query(), null); assertNotNull(invoker); invokerFactory.verifyAllEventsProcessed(); @@ -83,8 +85,8 @@ public class DispatcherTest { try { SearchCluster cl = new MockSearchCluster("1", 2, 1); - MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> false, (n, a) -> false); - Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory); + MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, a) -> false, (n, a) -> false); + Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory); disp.getSearchInvoker(new Query(), null); disp.deconstruct(); fail("Expected exception"); @@ -97,7 +99,7 @@ public class DispatcherTest { @Test void testGroup0IsSelected() { SearchCluster cluster = new MockSearchCluster("1", 3, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); cluster.pingIterationCompleted(); assertEquals(0, dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue()); @@ -107,7 +109,7 @@ public class DispatcherTest { @Test void testGroup0IsSkippedWhenItIsBlockingFeed() { SearchCluster cluster = new MockSearchCluster("1", 3, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); cluster.group(0).get().nodes().get(0).setBlockingWrites(true); cluster.pingIterationCompleted(); assertEquals(1, @@ -119,7 +121,7 @@ public class DispatcherTest { @Test void testGroup0IsSelectedWhenMoreAreBlockingFeed() { SearchCluster cluster = new MockSearchCluster("1", 3, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); cluster.group(0).get().nodes().get(0).setBlockingWrites(true); cluster.group(1).get().nodes().get(0).setBlockingWrites(true); cluster.pingIterationCompleted(); @@ -132,7 +134,7 @@ public class DispatcherTest { @Test void testGroup0IsSelectedWhenItIsBlockingFeedWhenNoOthers() { SearchCluster cluster = new MockSearchCluster("1", 1, 1); - Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true)); + Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true)); cluster.group(0).get().nodes().get(0).setBlockingWrites(true); cluster.pingIterationCompleted(); assertEquals(0, @@ -150,8 +152,8 @@ public class DispatcherTest { private final FactoryStep[] events; private int step = 0; - public MockInvokerFactory(SearchCluster cl, FactoryStep... events) { - super(cl); + public MockInvokerFactory(SearchCluster cl, DispatchConfig disptachConfig, FactoryStep... events) { + super(cl, disptachConfig); this.events = events; } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java index 15656ffb457..178d3383805 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java @@ -10,7 +10,6 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; import com.yahoo.search.dispatch.searchcluster.Node; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; @@ -22,7 +21,6 @@ import com.yahoo.searchlib.expression.IntegerResultNode; import com.yahoo.searchlib.expression.StringResultNode; import com.yahoo.test.ManualClock; import com.yahoo.vespa.config.search.DispatchConfig; -import com.yahoo.vespa.config.search.DispatchNodesConfig; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -40,8 +38,6 @@ import java.util.stream.StreamSupport; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE; import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT; -import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig; -import static com.yahoo.search.dispatch.MockSearchCluster.createNodesConfig; import static org.junit.jupiter.api.Assertions.*; /** @@ -53,11 +49,12 @@ public class InterleavedSearchInvokerTest { private final Query query = new TestQuery(); private final LinkedList<Event> expectedEvents = new LinkedList<>(); private final List<SearchInvoker> invokers = new ArrayList<>(); + DispatchConfig dispatchConfig = new DispatchConfig.Builder().build(); + TopKEstimator hitEstimator = new TopKEstimator(30, dispatchConfig.topKProbability(), 0.05); @Test void requireThatAdaptiveTimeoutsAreNotUsedWithFullCoverageRequirement() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(100.0), createNodesConfig(), 1, 3); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 3)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 3)) { expectedEvents.add(new Event(5000, 100, 0)); expectedEvents.add(new Event(4900, 100, 1)); @@ -71,8 +68,7 @@ public class InterleavedSearchInvokerTest { @Test void requireThatTimeoutsAreNotMarkedAsAdaptive() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(100.0), createNodesConfig(), 1, 3); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 3)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 3)) { expectedEvents.add(new Event(5000, 300, 0)); expectedEvents.add(new Event(4700, 300, 1)); @@ -90,8 +86,7 @@ public class InterleavedSearchInvokerTest { @Test void requireThatAdaptiveTimeoutDecreasesTimeoutWhenCoverageIsReached() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(50.0), createNodesConfig(), 1, 4); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 4)) { + try (SearchInvoker invoker = createInterleavedInvoker(hitEstimator, MockSearchCluster.createDispatchConfig(50.0), new Group(0, List.of()), 4)) { expectedEvents.add(new Event(5000, 100, 0)); expectedEvents.add(new Event(4900, 100, 1)); @@ -110,10 +105,9 @@ public class InterleavedSearchInvokerTest { @Test void requireCorrectCoverageCalculationWhenAllNodesOk() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", 1, 2); invokers.add(new MockInvoker(0, createCoverage(50155, 50155, 50155, 1, 1, 0))); invokers.add(new MockInvoker(1, createCoverage(49845, 49845, 49845, 1, 1, 0))); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) { expectedEvents.add(new Event(null, 100, 0)); expectedEvents.add(new Event(null, 200, 1)); @@ -132,10 +126,9 @@ public class InterleavedSearchInvokerTest { @Test void requireCorrectCoverageCalculationWhenResultsAreLimitedByMatchPhase() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", 1, 2); invokers.add(new MockInvoker(0, createCoverage(10101, 50155, 50155, 1, 1, DEGRADED_BY_MATCH_PHASE))); invokers.add(new MockInvoker(1, createCoverage(13319, 49845, 49845, 1, 1, DEGRADED_BY_MATCH_PHASE))); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) { expectedEvents.add(new Event(null, 100, 0)); expectedEvents.add(new Event(null, 200, 1)); @@ -155,10 +148,9 @@ public class InterleavedSearchInvokerTest { @Test void requireCorrectCoverageCalculationWhenResultsAreLimitedBySoftTimeout() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", 1, 2); invokers.add(new MockInvoker(0, createCoverage(5000, 50155, 50155, 1, 1, DEGRADED_BY_TIMEOUT))); invokers.add(new MockInvoker(1, createCoverage(4900, 49845, 49845, 1, 1, DEGRADED_BY_TIMEOUT))); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) { expectedEvents.add(new Event(null, 100, 0)); expectedEvents.add(new Event(null, 200, 1)); @@ -178,10 +170,9 @@ public class InterleavedSearchInvokerTest { @Test void requireCorrectCoverageCalculationWhenOneNodeIsUnexpectedlyDown() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", 1, 2); invokers.add(new MockInvoker(0, createCoverage(50155, 50155, 50155, 1, 1, 0))); invokers.add(new MockInvoker(1, createCoverage(49845, 49845, 49845, 1, 1, 0))); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) { expectedEvents.add(new Event(null, 100, 0)); expectedEvents.add(null); @@ -342,7 +333,6 @@ public class InterleavedSearchInvokerTest { @Test void requireThatGroupingsAreMerged() throws IOException { - SearchCluster cluster = new MockSearchCluster("!", 1, 2); List<SearchInvoker> invokers = new ArrayList<>(); Grouping grouping1 = new Grouping(0); @@ -365,7 +355,7 @@ public class InterleavedSearchInvokerTest { .addAggregationResult(new MinAggregationResult().setMin(new IntegerResultNode(6)).setTag(3)))); invokers.add(new MockInvoker(0).setHits(List.of(new GroupingListHit(List.of(grouping2))))); - try (InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, cluster, new Group(0, List.of()), Collections.emptySet())) { + try (InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, new Group(0, List.of()), Collections.emptySet())) { invoker.responseAvailable(invokers.get(0)); invoker.responseAvailable(invokers.get(1)); Result result = invoker.search(query, null); @@ -377,11 +367,12 @@ public class InterleavedSearchInvokerTest { } private static InterleavedSearchInvoker createInterLeavedTestInvoker(List<Double> a, List<Double> b, Group group) { - SearchCluster cluster = new MockSearchCluster("!", 1, 2); + DispatchConfig dispatchConfig = new DispatchConfig.Builder().build(); + TopKEstimator hitEstimator = new TopKEstimator(30, dispatchConfig.topKProbability(), 0.05); List<SearchInvoker> invokers = new ArrayList<>(); invokers.add(createInvoker(a, 0)); invokers.add(createInvoker(b, 1)); - InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, cluster, group, Collections.emptySet()); + InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, Collections.emptySet()); invoker.responseAvailable(invokers.get(0)); invoker.responseAvailable(invokers.get(1)); return invoker; @@ -402,13 +393,12 @@ public class InterleavedSearchInvokerTest { return hits; } - void verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, int expectedCoverage) throws IOException { - SearchCluster cluster = new MockSearchCluster("!", dispatchConfig, nodesConfig, 1, 2); + void verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(int expectedCoverage) throws IOException { invokers.add(new MockInvoker(0, createCoverage(50155, 50155, 60000, 1, 1, 0))); Coverage errorCoverage = new Coverage(0, 0, 0); errorCoverage.setNodesTried(1); invokers.add(new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError("node is down"), errorCoverage)); - try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) { + try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) { expectedEvents.add(new Event(null, 1, 1)); expectedEvents.add(new Event(null, 100, 0)); @@ -429,19 +419,20 @@ public class InterleavedSearchInvokerTest { @Test void requireCorrectCoverageCalculationWhenDegradedCoverageIsExpectedUsingTargetActiveDocs() throws IOException { - verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(MockSearchCluster.createDispatchConfigBuilder(100.0) - .redundancy(1) - .build(), - MockSearchCluster.createNodesConfig(), + verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected( 42); } - private InterleavedSearchInvoker createInterleavedInvoker(SearchCluster searchCluster, Group group, int numInvokers) { + private InterleavedSearchInvoker createInterleavedInvoker(Group group, int numInvokers) { + return createInterleavedInvoker(hitEstimator, dispatchConfig, group, numInvokers); + } + private InterleavedSearchInvoker createInterleavedInvoker(TopKEstimator hitEstimator, DispatchConfig dispatchConfig, + Group group, int numInvokers) { for (int i = 0; i < numInvokers; i++) { invokers.add(new MockInvoker(i)); } - return new InterleavedSearchInvoker(Timer.wrap(clock), invokers, searchCluster, group,null) { + return new InterleavedSearchInvoker(Timer.wrap(clock), invokers, hitEstimator, dispatchConfig, group,null) { @Override protected LinkedBlockingQueue<SearchInvoker> newQueue() { diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java index c90153e8008..32ca63693b4 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java @@ -62,9 +62,6 @@ public class MockSearchCluster extends SearchCluster { } @Override - public List<Node> nodes() { return nodes; } - - @Override public ImmutableMap<Integer, Group> groups() { return groups; } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java index 6789d0347d5..82b7845d63d 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java @@ -5,7 +5,6 @@ package com.yahoo.search.dispatch.rpc; import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; import com.google.common.collect.ImmutableMap; import com.yahoo.compress.CompressionType; -import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; import com.yahoo.search.Result; @@ -14,7 +13,6 @@ import com.yahoo.search.searchchain.Execution; import org.junit.jupiter.api.Test; import java.io.IOException; -import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -25,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.*; */ public class RpcSearchInvokerTest { + private final CompressService compressor = new CompressService(); @Test void testProtobufSerialization() throws IOException { var compressionTypeHolder = new AtomicReference<CompressionType>(); @@ -32,21 +31,21 @@ public class RpcSearchInvokerTest { var lengthHolder = new AtomicInteger(); var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123))); - var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, 1000); + var invoker = new RpcSearchInvoker(mockSearcher(), compressor, new Node(7, "seven", 1), mockPool, 1000); Query q = new Query("search/?query=test&hits=10&offset=3"); RpcSearchInvoker.RpcContext context = (RpcSearchInvoker.RpcContext) invoker.sendSearchRequest(q, null); assertEquals(lengthHolder.get(), context.compressedPayload.uncompressedSize()); assertSame(context.compressedPayload.data(), payloadHolder.get()); - var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); + var bytes = compressor.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build(); assertEquals(10, request.getHits()); assertEquals(3, request.getOffset()); assertTrue(request.getQueryTreeBlob().size() > 0); - var invoker2 = new RpcSearchInvoker(mockSearcher(), new Node(8, "eight", 1), mockPool, 1000); + var invoker2 = new RpcSearchInvoker(mockSearcher(), compressor, new Node(8, "eight", 1), mockPool, 1000); RpcSearchInvoker.RpcContext context2 = (RpcSearchInvoker.RpcContext) invoker2.sendSearchRequest(q, context); assertSame(context, context2); assertEquals(lengthHolder.get(), context.compressedPayload.uncompressedSize()); @@ -61,12 +60,12 @@ public class RpcSearchInvokerTest { var lengthHolder = new AtomicInteger(); var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder); var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123))); - var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, maxHits); + var invoker = new RpcSearchInvoker(mockSearcher(), compressor, new Node(7, "seven", 1), mockPool, maxHits); Query q = new Query("search/?query=test&hits=10&offset=3"); invoker.sendSearchRequest(q, null); - var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); + var bytes = compressor.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get()); var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build(); assertEquals(maxHits, request.getHits()); |