From ef637d4a7236d6570c748ba5782e0435f628bd9a Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 22 Nov 2022 22:16:42 +0100 Subject: Make a few simpler interfaces instead of carrying one huge implement all SearchCluster around. --- .../java/com/yahoo/search/dispatch/Dispatcher.java | 9 ++-- .../search/dispatch/InterleavedSearchInvoker.java | 27 +++++++---- .../com/yahoo/search/dispatch/InvokerFactory.java | 12 +++-- .../yahoo/search/dispatch/rpc/CompressPayload.java | 15 ++++++ .../yahoo/search/dispatch/rpc/CompressService.java | 32 +++++++++++++ .../search/dispatch/rpc/RpcConnectionPool.java | 11 +++++ .../search/dispatch/rpc/RpcInvokerFactory.java | 19 ++++---- .../com/yahoo/search/dispatch/rpc/RpcPing.java | 16 ++++--- .../yahoo/search/dispatch/rpc/RpcPingFactory.java | 9 ++-- .../dispatch/rpc/RpcProtobufFillInvoker.java | 16 +++---- .../yahoo/search/dispatch/rpc/RpcResourcePool.java | 22 +-------- .../search/dispatch/rpc/RpcSearchInvoker.java | 16 +++---- .../dispatch/searchcluster/SearchCluster.java | 54 ++++++++-------------- 13 files changed, 148 insertions(+), 110 deletions(-) create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java create mode 100644 container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java (limited to 'container-search/src/main/java/com/yahoo/search') diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 36c5c8a16fa..b7a2d9f70ba 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -58,9 +58,9 @@ public class Dispatcher extends AbstractComponent { private final ClusterMonitor clusterMonitor; private final LoadBalancer loadBalancer; private final InvokerFactory invokerFactory; - private final RpcResourcePool rpcResourcePool; private final RpcClient rpcClient; private final int maxHitsPerNode; + private final RpcResourcePool rpcResourcePool; private static final QueryProfileType argumentType; @@ -101,7 +101,7 @@ public class Dispatcher extends AbstractComponent { RpcClient rpcClient, RpcResourcePool rpcResourcePool) { this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, - new RpcInvokerFactory(rpcResourcePool, searchCluster), + new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig), rpcClient, rpcResourcePool); } @@ -161,9 +161,8 @@ public class Dispatcher extends AbstractComponent { new Compressor().warmup(seconds); } - /** Returns the search cluster this dispatches to */ - public SearchCluster searchCluster() { - return searchCluster; + public boolean allGroupsHaveSize1() { + return searchCluster.allGroupsHaveSize1(); } @Override diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java index 39be91cf3e8..8751a969fe0 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java @@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.GroupingListHit; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.dispatch.searchcluster.Group; -import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.ErrorMessage; import com.yahoo.search.result.Hit; import com.yahoo.search.searchchain.Execution; @@ -38,27 +37,37 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM private final Timer timer; private final Set invokers; - private final SearchCluster searchCluster; + private final DispatchConfig dispatchConfig; private final Group group; private final LinkedBlockingQueue availableForProcessing; private final Set alreadyFailedNodes; private final CoverageAggregator coverageAggregator; + private final TopKEstimator hitEstimator; private Query query; private TimeoutHandler timeoutHandler; public InterleavedSearchInvoker(Timer timer, Collection invokers, - SearchCluster searchCluster, + TopKEstimator hitEstimator, + DispatchConfig dispatchConfig, Group group, Set alreadyFailedNodes) { super(Optional.empty()); this.timer = timer; this.invokers = Collections.newSetFromMap(new IdentityHashMap<>()); this.invokers.addAll(invokers); - this.searchCluster = searchCluster; + this.dispatchConfig = dispatchConfig; this.group = group; this.availableForProcessing = newQueue(); this.alreadyFailedNodes = alreadyFailedNodes; - coverageAggregator = new CoverageAggregator(invokers.size()); + this.coverageAggregator = new CoverageAggregator(invokers.size()); + this.hitEstimator = hitEstimator; + } + + private int estimateHitsToFetch(int wantedHits, int numPartitions) { + return hitEstimator.estimateK(wantedHits, numPartitions); + } + private int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) { + return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability); } private TimeoutHandler createTimeoutHandler(DispatchConfig config, int askedNodes, Query query) { @@ -84,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM if (group.isBalanced() && !group.isSparse()) { Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability); q = (topkProbabilityOverrride != null) - ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) - : searchCluster.estimateHitsToFetch(neededHits, invokers.size()); + ? estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride) + : estimateHitsToFetch(neededHits, invokers.size()); } query.setHits(q); query.setOffset(0); @@ -94,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM for (SearchInvoker invoker : invokers) { context = invoker.sendSearchRequest(query, context); } - timeoutHandler = createTimeoutHandler(searchCluster.dispatchConfig(), invokers.size(), query); + timeoutHandler = createTimeoutHandler(dispatchConfig, invokers.size(), query); query.setHits(originalHits); query.setOffset(originalOffset); @@ -127,7 +136,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h)); insertNetworkErrors(result.getResult()); - CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)searchCluster.dispatchConfig().redundancy(), timeoutHandler); + CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)dispatchConfig.redundancy(), timeoutHandler); result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler)); int needed = query.getOffset() + query.getHits(); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java index 02cf11c9fe7..caeae9c2c1d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java @@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import com.yahoo.search.result.Coverage; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.vespa.config.search.DispatchConfig; import java.util.ArrayList; import java.util.HashSet; @@ -21,11 +22,16 @@ import java.util.Set; * @author ollivir */ public abstract class InvokerFactory { + private static final double SKEW_FACTOR = 0.05; - protected final SearchCluster searchCluster; + private final SearchCluster searchCluster; + private final DispatchConfig dispatchConfig; + private final TopKEstimator hitEstimator; - public InvokerFactory(SearchCluster searchCluster) { + public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) { this.searchCluster = searchCluster; + this.dispatchConfig = dispatchConfig; + this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); } protected abstract Optional createNodeSearchInvoker(VespaBackEndSearcher searcher, @@ -90,7 +96,7 @@ public abstract class InvokerFactory { if (invokers.size() == 1 && failed == null) { return Optional.of(invokers.get(0)); } else { - return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, searchCluster, group, failed)); + return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, failed)); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java new file mode 100644 index 00000000000..786b804edaa --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +import com.yahoo.compress.Compressor; +import com.yahoo.search.Query; + +/** + * Interface for compressing and decompressing request/response + * + * @author baldersheim + */ +public interface CompressPayload { + Compressor.Compression compress(Query query, byte[] payload); + byte[] decompress(Client.ProtobufResponse response); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java new file mode 100644 index 00000000000..9e7fc9b5b29 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java @@ -0,0 +1,32 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.processing.request.CompoundName; +import com.yahoo.search.Query; + +/** + * Implement interface to compress/decompress request/response + * + * @author baldersheim + */ +public class CompressService implements CompressPayload { + /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ + public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); + private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 256); + + + @Override + public Compressor.Compression compress(Query query, byte[] payload) { + CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); + return compressor.compress(compression, payload); + } + + @Override + public byte[] decompress(Client.ProtobufResponse response) { + CompressionType compression = CompressionType.valueOf(response.compression()); + return compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize()); + } + Compressor compressor() { return compressor; } +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java new file mode 100644 index 00000000000..fd8e0e4f81a --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java @@ -0,0 +1,11 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.rpc; + +/** + * Interface for getting a connection given a node id. + * + * @author balderersheim + */ +public interface RpcConnectionPool { + Client.NodeConnection getConnection(int nodeId); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index a1ebea3b695..d5b1b876540 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -9,6 +9,7 @@ import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.SearchCluster; +import com.yahoo.vespa.config.search.DispatchConfig; import java.util.Optional; @@ -17,19 +18,18 @@ import java.util.Optional; */ public class RpcInvokerFactory extends InvokerFactory { - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; + private final CompressPayload compressor; - public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) { - super(searchCluster); + public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) { + super(searchCluster, dispatchConfig); this.rpcResourcePool = rpcResourcePool; + this.compressor = new CompressService(); } @Override - protected Optional createNodeSearchInvoker(VespaBackEndSearcher searcher, - Query query, - int maxHits, - Node node) { - return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits)); + protected Optional createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) { + return Optional.of(new RpcSearchInvoker(searcher, compressor, node, rpcResourcePool, maxHits)); } @Override @@ -37,7 +37,6 @@ public class RpcInvokerFactory extends InvokerFactory { Query query = result.getQuery(); boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query); - return new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery); + return new RpcProtobufFillInvoker(rpcResourcePool, compressor, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery); } - } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index 44f0af2aca1..53dc54f7bc5 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -25,17 +25,19 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { private static final boolean triggeredClassLoading = ErrorMessage.createBackendCommunicationError("TriggerClassLoading") instanceof ErrorMessage; private final Node node; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool connectionPool; private final ClusterMonitor clusterMonitor; private final long pingSequenceId; private final PongHandler pongHandler; + private final Compressor compressor; - public RpcPing(Node node, ClusterMonitor clusterMonitor, RpcResourcePool rpcResourcePool, PongHandler pongHandler) { + public RpcPing(Node node, ClusterMonitor clusterMonitor, RpcConnectionPool connectionPool, PongHandler pongHandler, Compressor compressor) { this.node = node; - this.resourcePool = rpcResourcePool; + this.connectionPool = connectionPool; this.clusterMonitor = clusterMonitor; - pingSequenceId = node.createPingSequenceId(); + this.pingSequenceId = node.createPingSequenceId(); this.pongHandler = pongHandler; + this. compressor = compressor; } @Override @@ -63,16 +65,16 @@ public class RpcPing implements Pinger, Client.ResponseReceiver { } private void sendPing() { - var connection = resourcePool.getConnection(node.key()); + var connection = connectionPool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; - Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); + Compressor.Compression compressionResult = compressor.compress(PING_COMPRESSION, ping); connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), this, timeoutSeconds); } private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize()); + byte[] responseBytes = compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize()); var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes); if (reply.getDistributionKey() != node.key()) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java index 2e690198c1c..31e152c6ff9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java @@ -1,6 +1,8 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; @@ -9,15 +11,16 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler; public class RpcPingFactory implements PingFactory { - private final RpcResourcePool rpcResourcePool; + private final RpcConnectionPool rpcResourcePool; + private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 512); - public RpcPingFactory(RpcResourcePool rpcResourcePool) { + public RpcPingFactory(RpcConnectionPool rpcResourcePool) { this.rpcResourcePool = rpcResourcePool; } @Override public Pinger createPinger(Node node, ClusterMonitor monitor, PongHandler pongHandler) { - return new RpcPing(node, monitor, rpcResourcePool, pongHandler); + return new RpcPing(node, monitor, rpcResourcePool, pongHandler, compressor); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java index c84795352f5..4e538fb54dc 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java @@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol; import com.google.protobuf.InvalidProtocolBufferException; import com.yahoo.collections.ListMap; import com.yahoo.collections.Pair; -import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.container.protect.Error; import com.yahoo.data.access.Inspector; @@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker { private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName()); private final DocumentDatabase documentDb; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool resourcePool; private final boolean summaryNeedsQuery; private final String serverId; + private final CompressPayload compressor; private BlockingQueue, List>> responses; @@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker { /** The number of responses we should receive (and process) before this is complete */ private int outstandingResponses; - RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { + RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) { this.documentDb = documentDb; this.resourcePool = resourcePool; this.serverId = serverId; this.summaryNeedsQuery = summaryNeedsQuery; + this.compressor = compressor; } @Override @@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker { ListMap hitsByNode = new ListMap<>(); for (Iterator i = result.hits().unorderedDeepIterator(); i.hasNext();) { Hit h = i.next(); - if (!(h instanceof FastHit)) + if (!(h instanceof FastHit hit)) continue; - FastHit hit = (FastHit) h; hitsByNode.put(hit.getDistributionKey(), hit); } @@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { } Query query = result.getQuery(); - Compressor.Compression compressionResult = resourcePool.compress(query, payload); + Compressor.Compression compressionResult = compressor.compress(query, payload); node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(), roe -> receive(roe, hits), clientTimeout); } @@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker { hasReportedError = true; } else { Client.ProtobufResponse response = responseOrError.response().get(); - CompressionType compression = CompressionType.valueOf(response.compression()); - byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, - response.uncompressedSize()); + byte[] responseBytes = compressor.decompress(response); return fill(result, hitsContext, summaryClass, responseBytes); } return 0; diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index bbe60d0df23..71e8cc0baa8 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -2,11 +2,6 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; -import com.yahoo.compress.Compressor.Compression; -import com.yahoo.processing.request.CompoundName; -import com.yahoo.search.Query; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; import com.yahoo.vespa.config.search.DispatchNodesConfig; @@ -23,12 +18,7 @@ import java.util.concurrent.ThreadLocalRandom; * * @author ollivir */ -public class RpcResourcePool implements AutoCloseable { - - /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */ - public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression"); - - private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32); +public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap nodeConnectionPools; @@ -54,15 +44,7 @@ public class RpcResourcePool implements AutoCloseable { this.nodeConnectionPools = builder.build(); } - public Compressor compressor() { - return compressor; - } - - public Compression compress(Query query, byte[] payload) { - CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase()); - return compressor.compress(compression, payload); - } - + @Override public NodeConnection getConnection(int nodeId) { var pool = nodeConnectionPools.get(nodeId); if (pool == null) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java index 64e0dd666dd..31de0b147c9 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java @@ -1,7 +1,6 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.rpc; -import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; import com.yahoo.search.Query; @@ -29,19 +28,21 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe private final VespaBackEndSearcher searcher; private final Node node; - private final RpcResourcePool resourcePool; + private final RpcConnectionPool resourcePool; private final BlockingQueue> responses; private final int maxHits; + private final CompressPayload compressor; private Query query; - RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) { + RpcSearchInvoker(VespaBackEndSearcher searcher, CompressPayload compressor, Node node, RpcConnectionPool resourcePool, int maxHits) { super(Optional.of(node)); this.searcher = searcher; this.node = node; this.resourcePool = resourcePool; this.responses = new LinkedBlockingQueue<>(1); this.maxHits = maxHits; + this.compressor = compressor; } @Override @@ -78,7 +79,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe if (incomingContext instanceof RpcContext) return (RpcContext)incomingContext; - return new RpcContext(resourcePool, query, + return new RpcContext(compressor, query, ProtobufSerialization.serializeSearchRequest(query, Math.min(query.getHits(), maxHits), searcher.getServerId(), requestTimeout)); @@ -110,8 +111,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe } ProtobufResponse protobufResponse = response.response().get(); - CompressionType compression = CompressionType.valueOf(protobufResponse.compression()); - byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize()); + byte[] payload = compressor.decompress(protobufResponse); return ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key()); } @@ -133,8 +133,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe final Compressor.Compression compressedPayload; - RpcContext(RpcResourcePool resourcePool, Query query, byte[] payload) { - compressedPayload = resourcePool.compress(query, payload); + RpcContext(CompressPayload compressor, Query query, byte[] payload) { + compressedPayload = compressor.compress(query, payload); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 54a3e42b9ab..ca2fce0b32b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -8,7 +8,6 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.search.dispatch.TopKEstimator; import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; @@ -29,16 +28,14 @@ public class SearchCluster implements NodeManager { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - private final DispatchConfig dispatchConfig; + private final double minActivedocsPercentage; private final String clusterId; + private final VipStatus vipStatus; + private final PingFactory pingFactory; private final Map groups; private final List orderedGroups; private final List nodes; - private final VipStatus vipStatus; - private final PingFactory pingFactory; - private final TopKEstimator hitEstimator; private long nextLogTime = 0; - private static final double SKEW_FACTOR = 0.05; /** * A search node on this local machine having the entire corpus, which we therefore @@ -48,13 +45,13 @@ public class SearchCluster implements NodeManager { * if it only queries this cluster when the local node cannot be used, to avoid unnecessary * cross-node network traffic. */ - private final Optional localCorpusDispatchTarget; + private final Node localCorpusDispatchTarget; public SearchCluster(String clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus, PingFactory pingFactory) { this.clusterId = clusterId; - this.dispatchConfig = dispatchConfig; + this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage(); this.vipStatus = vipStatus; this.pingFactory = pingFactory; @@ -71,7 +68,6 @@ public class SearchCluster implements NodeManager { nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group()))); this.orderedGroups = List.copyOf(groupIntroductionOrder.values()); - hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR); this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups); } @@ -85,7 +81,7 @@ public class SearchCluster implements NodeManager { } } - private static Optional findLocalCorpusDispatchTarget(String selfHostname, + private static Node findLocalCorpusDispatchTarget(String selfHostname, List nodes, Map groups) { // A search node in the search cluster in question is configured on the same host as the currently running container. @@ -96,15 +92,15 @@ public class SearchCluster implements NodeManager { .filter(node -> node.hostname().equals(selfHostname)) .toList(); // Only use direct dispatch if we have exactly 1 search node on the same machine: - if (localSearchNodes.size() != 1) return Optional.empty(); + if (localSearchNodes.size() != 1) return null; Node localSearchNode = localSearchNodes.iterator().next(); Group localSearchGroup = groups.get(localSearchNode.group()); // Only use direct dispatch if the local search node has the entire corpus - if (localSearchGroup.nodes().size() != 1) return Optional.empty(); + if (localSearchGroup.nodes().size() != 1) return null; - return Optional.of(localSearchNode); + return localSearchNode; } private static List toNodes(DispatchNodesConfig nodesConfig) { @@ -113,13 +109,6 @@ public class SearchCluster implements NodeManager { .toList(); } - public DispatchConfig dispatchConfig() { - return dispatchConfig; - } - - /** Returns an immutable list of all nodes in this. */ - public List nodes() { return nodes; } - /** Returns the groups of this cluster as an immutable map indexed by group id */ public Map groups() { return groups; } @@ -148,16 +137,16 @@ public class SearchCluster implements NodeManager { * or empty if we should not dispatch directly. */ public Optional localCorpusDispatchTarget() { - if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty(); + if ( localCorpusDispatchTarget == null) return Optional.empty(); // Only use direct dispatch if the local group has sufficient coverage - Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group()); + Group localSearchGroup = groups().get(localCorpusDispatchTarget.group()); if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty(); // Only use direct dispatch if the local search node is not down - if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty(); + if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty(); - return localCorpusDispatchTarget; + return Optional.of(localCorpusDispatchTarget); } private void updateWorkingState(Node node, boolean isWorking) { @@ -185,7 +174,7 @@ public class SearchCluster implements NodeManager { } private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) { - if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster + if (localCorpusDispatchTarget == null) { // consider entire cluster if (hasInformationAboutAllNodes()) setInRotationOnlyIf(hasWorkingNodes()); } @@ -198,7 +187,7 @@ public class SearchCluster implements NodeManager { } private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) { - if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster + if ( localCorpusDispatchTarget == null) { // consider entire cluster // VIP status does not depend on coverage } else if (usesLocalCorpusIn(group)) { // follow the status of this group @@ -213,13 +202,6 @@ public class SearchCluster implements NodeManager { vipStatus.removeFromRotation(clusterId); } - public int estimateHitsToFetch(int wantedHits, int numPartitions) { - return hitEstimator.estimateK(wantedHits, numPartitions); - } - public int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) { - return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability); - } - public boolean hasInformationAboutAllNodes() { return nodes.stream().allMatch(node -> node.isWorking() != null); } @@ -229,11 +211,11 @@ public class SearchCluster implements NodeManager { } private boolean usesLocalCorpusIn(Node node) { - return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node); + return node.equals(localCorpusDispatchTarget); } private boolean usesLocalCorpusIn(Group group) { - return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); + return (localCorpusDispatchTarget != null) && localCorpusDispatchTarget.group() == group.id(); } /** Used by the cluster monitor to manage node status */ @@ -286,7 +268,7 @@ public class SearchCluster implements NodeManager { private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) { double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments; - if (medianDocuments > 0 && documentCoverage < dispatchConfig.minActivedocsPercentage()) + if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage) return false; return true; } -- cgit v1.2.3