aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-22 22:16:42 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-23 10:01:17 +0100
commitef637d4a7236d6570c748ba5782e0435f628bd9a (patch)
treed7c4a7ab240d2faca50eace28eb18db2cda5240c /container-search/src/main/java/com/yahoo/search
parentb8165e0e316527dc956489bc416f9ccb83cf1904 (diff)
Make a few simpler interfaces instead of carrying one huge implement all SearchCluster around.
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java12
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java32
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java19
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java22
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java54
13 files changed, 148 insertions, 110 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 36c5c8a16fa..b7a2d9f70ba 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -58,9 +58,9 @@ public class Dispatcher extends AbstractComponent {
private final ClusterMonitor<Node> clusterMonitor;
private final LoadBalancer loadBalancer;
private final InvokerFactory invokerFactory;
- private final RpcResourcePool rpcResourcePool;
private final RpcClient rpcClient;
private final int maxHitsPerNode;
+ private final RpcResourcePool rpcResourcePool;
private static final QueryProfileType argumentType;
@@ -101,7 +101,7 @@ public class Dispatcher extends AbstractComponent {
RpcClient rpcClient, RpcResourcePool rpcResourcePool) {
this(new ClusterMonitor<>(searchCluster, true),
searchCluster, dispatchConfig,
- new RpcInvokerFactory(rpcResourcePool, searchCluster),
+ new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig),
rpcClient, rpcResourcePool);
}
@@ -161,9 +161,8 @@ public class Dispatcher extends AbstractComponent {
new Compressor().warmup(seconds);
}
- /** Returns the search cluster this dispatches to */
- public SearchCluster searchCluster() {
- return searchCluster;
+ public boolean allGroupsHaveSize1() {
+ return searchCluster.allGroupsHaveSize1();
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 39be91cf3e8..8751a969fe0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Group;
-import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
@@ -38,27 +37,37 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private final Timer timer;
private final Set<SearchInvoker> invokers;
- private final SearchCluster searchCluster;
+ private final DispatchConfig dispatchConfig;
private final Group group;
private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
private final Set<Integer> alreadyFailedNodes;
private final CoverageAggregator coverageAggregator;
+ private final TopKEstimator hitEstimator;
private Query query;
private TimeoutHandler timeoutHandler;
public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers,
- SearchCluster searchCluster,
+ TopKEstimator hitEstimator,
+ DispatchConfig dispatchConfig,
Group group,
Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
this.timer = timer;
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
this.invokers.addAll(invokers);
- this.searchCluster = searchCluster;
+ this.dispatchConfig = dispatchConfig;
this.group = group;
this.availableForProcessing = newQueue();
this.alreadyFailedNodes = alreadyFailedNodes;
- coverageAggregator = new CoverageAggregator(invokers.size());
+ this.coverageAggregator = new CoverageAggregator(invokers.size());
+ this.hitEstimator = hitEstimator;
+ }
+
+ private int estimateHitsToFetch(int wantedHits, int numPartitions) {
+ return hitEstimator.estimateK(wantedHits, numPartitions);
+ }
+ private int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
+ return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
}
private TimeoutHandler createTimeoutHandler(DispatchConfig config, int askedNodes, Query query) {
@@ -84,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
if (group.isBalanced() && !group.isSparse()) {
Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability);
q = (topkProbabilityOverrride != null)
- ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
- : searchCluster.estimateHitsToFetch(neededHits, invokers.size());
+ ? estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
+ : estimateHitsToFetch(neededHits, invokers.size());
}
query.setHits(q);
query.setOffset(0);
@@ -94,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
for (SearchInvoker invoker : invokers) {
context = invoker.sendSearchRequest(query, context);
}
- timeoutHandler = createTimeoutHandler(searchCluster.dispatchConfig(), invokers.size(), query);
+ timeoutHandler = createTimeoutHandler(dispatchConfig, invokers.size(), query);
query.setHits(originalHits);
query.setOffset(originalOffset);
@@ -127,7 +136,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h));
insertNetworkErrors(result.getResult());
- CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)searchCluster.dispatchConfig().redundancy(), timeoutHandler);
+ CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)dispatchConfig.redundancy(), timeoutHandler);
result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler));
int needed = query.getOffset() + query.getHits();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 02cf11c9fe7..caeae9c2c1d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.ArrayList;
import java.util.HashSet;
@@ -21,11 +22,16 @@ import java.util.Set;
* @author ollivir
*/
public abstract class InvokerFactory {
+ private static final double SKEW_FACTOR = 0.05;
- protected final SearchCluster searchCluster;
+ private final SearchCluster searchCluster;
+ private final DispatchConfig dispatchConfig;
+ private final TopKEstimator hitEstimator;
- public InvokerFactory(SearchCluster searchCluster) {
+ public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) {
this.searchCluster = searchCluster;
+ this.dispatchConfig = dispatchConfig;
+ this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
}
protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
@@ -90,7 +96,7 @@ public abstract class InvokerFactory {
if (invokers.size() == 1 && failed == null) {
return Optional.of(invokers.get(0));
} else {
- return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, searchCluster, group, failed));
+ return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java
new file mode 100644
index 00000000000..786b804edaa
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java
@@ -0,0 +1,15 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.Compressor;
+import com.yahoo.search.Query;
+
+/**
+ * Interface for compressing and decompressing request/response
+ *
+ * @author baldersheim
+ */
+public interface CompressPayload {
+ Compressor.Compression compress(Query query, byte[] payload);
+ byte[] decompress(Client.ProtobufResponse response);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java
new file mode 100644
index 00000000000..9e7fc9b5b29
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+
+/**
+ * Implement interface to compress/decompress request/response
+ *
+ * @author baldersheim
+ */
+public class CompressService implements CompressPayload {
+ /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
+ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 256);
+
+
+ @Override
+ public Compressor.Compression compress(Query query, byte[] payload) {
+ CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
+ return compressor.compress(compression, payload);
+ }
+
+ @Override
+ public byte[] decompress(Client.ProtobufResponse response) {
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ return compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ }
+ Compressor compressor() { return compressor; }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
new file mode 100644
index 00000000000..fd8e0e4f81a
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -0,0 +1,11 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+/**
+ * Interface for getting a connection given a node id.
+ *
+ * @author balderersheim
+ */
+public interface RpcConnectionPool {
+ Client.NodeConnection getConnection(int nodeId);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index a1ebea3b695..d5b1b876540 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -9,6 +9,7 @@ import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Optional;
@@ -17,19 +18,18 @@ import java.util.Optional;
*/
public class RpcInvokerFactory extends InvokerFactory {
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
+ private final CompressPayload compressor;
- public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) {
- super(searchCluster);
+ public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) {
+ super(searchCluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
+ this.compressor = new CompressService();
}
@Override
- protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
- Query query,
- int maxHits,
- Node node) {
- return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits));
+ protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) {
+ return Optional.of(new RpcSearchInvoker(searcher, compressor, node, rpcResourcePool, maxHits));
}
@Override
@@ -37,7 +37,6 @@ public class RpcInvokerFactory extends InvokerFactory {
Query query = result.getQuery();
boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query);
- return new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery);
+ return new RpcProtobufFillInvoker(rpcResourcePool, compressor, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery);
}
-
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index 44f0af2aca1..53dc54f7bc5 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -25,17 +25,19 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
private static final boolean triggeredClassLoading = ErrorMessage.createBackendCommunicationError("TriggerClassLoading") instanceof ErrorMessage;
private final Node node;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool connectionPool;
private final ClusterMonitor<Node> clusterMonitor;
private final long pingSequenceId;
private final PongHandler pongHandler;
+ private final Compressor compressor;
- public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool, PongHandler pongHandler) {
+ public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcConnectionPool connectionPool, PongHandler pongHandler, Compressor compressor) {
this.node = node;
- this.resourcePool = rpcResourcePool;
+ this.connectionPool = connectionPool;
this.clusterMonitor = clusterMonitor;
- pingSequenceId = node.createPingSequenceId();
+ this.pingSequenceId = node.createPingSequenceId();
this.pongHandler = pongHandler;
+ this. compressor = compressor;
}
@Override
@@ -63,16 +65,16 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
}
private void sendPing() {
- var connection = resourcePool.getConnection(node.key());
+ var connection = connectionPool.getConnection(node.key());
var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
+ Compressor.Compression compressionResult = compressor.compress(PING_COMPRESSION, ping);
connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), this, timeoutSeconds);
}
private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize());
var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes);
if (reply.getDistributionKey() != node.key()) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
index 2e690198c1c..31e152c6ff9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.PingFactory;
@@ -9,15 +11,16 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler;
public class RpcPingFactory implements PingFactory {
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 512);
- public RpcPingFactory(RpcResourcePool rpcResourcePool) {
+ public RpcPingFactory(RpcConnectionPool rpcResourcePool) {
this.rpcResourcePool = rpcResourcePool;
}
@Override
public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) {
- return new RpcPing(node, monitor, rpcResourcePool, pongHandler);
+ return new RpcPing(node, monitor, rpcResourcePool, pongHandler, compressor);
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index c84795352f5..4e538fb54dc 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.collections.Pair;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.Inspector;
@@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker {
private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
private final DocumentDatabase documentDb;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final boolean summaryNeedsQuery;
private final String serverId;
+ private final CompressPayload compressor;
private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses;
@@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker {
/** The number of responses we should receive (and process) before this is complete */
private int outstandingResponses;
- RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
+ RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
this.documentDb = documentDb;
this.resourcePool = resourcePool;
this.serverId = serverId;
this.summaryNeedsQuery = summaryNeedsQuery;
+ this.compressor = compressor;
}
@Override
@@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
Hit h = i.next();
- if (!(h instanceof FastHit))
+ if (!(h instanceof FastHit hit))
continue;
- FastHit hit = (FastHit) h;
hitsByNode.put(hit.getDistributionKey(), hit);
}
@@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
Query query = result.getQuery();
- Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ Compressor.Compression compressionResult = compressor.compress(query, payload);
node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(),
roe -> receive(roe, hits), clientTimeout);
}
@@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
hasReportedError = true;
} else {
Client.ProtobufResponse response = responseOrError.response().get();
- CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression,
- response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response);
return fill(result, hitsContext, summaryClass, responseBytes);
}
return 0;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index bbe60d0df23..71e8cc0baa8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -2,11 +2,6 @@
package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.compress.Compressor.Compression;
-import com.yahoo.processing.request.CompoundName;
-import com.yahoo.search.Query;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
@@ -23,12 +18,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool implements AutoCloseable {
-
- /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
- public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
-
- private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32);
+public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
@@ -54,15 +44,7 @@ public class RpcResourcePool implements AutoCloseable {
this.nodeConnectionPools = builder.build();
}
- public Compressor compressor() {
- return compressor;
- }
-
- public Compression compress(Query query, byte[] payload) {
- CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
- return compressor.compress(compression, payload);
- }
-
+ @Override
public NodeConnection getConnection(int nodeId) {
var pool = nodeConnectionPools.get(nodeId);
if (pool == null) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 64e0dd666dd..31de0b147c9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
@@ -29,19 +28,21 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
private final VespaBackEndSearcher searcher;
private final Node node;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
private final int maxHits;
+ private final CompressPayload compressor;
private Query query;
- RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) {
+ RpcSearchInvoker(VespaBackEndSearcher searcher, CompressPayload compressor, Node node, RpcConnectionPool resourcePool, int maxHits) {
super(Optional.of(node));
this.searcher = searcher;
this.node = node;
this.resourcePool = resourcePool;
this.responses = new LinkedBlockingQueue<>(1);
this.maxHits = maxHits;
+ this.compressor = compressor;
}
@Override
@@ -78,7 +79,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
if (incomingContext instanceof RpcContext)
return (RpcContext)incomingContext;
- return new RpcContext(resourcePool, query,
+ return new RpcContext(compressor, query,
ProtobufSerialization.serializeSearchRequest(query,
Math.min(query.getHits(), maxHits),
searcher.getServerId(), requestTimeout));
@@ -110,8 +111,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
}
ProtobufResponse protobufResponse = response.response().get();
- CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
- byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize());
+ byte[] payload = compressor.decompress(protobufResponse);
return ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key());
}
@@ -133,8 +133,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
final Compressor.Compression compressedPayload;
- RpcContext(RpcResourcePool resourcePool, Query query, byte[] payload) {
- compressedPayload = resourcePool.compress(query, payload);
+ RpcContext(CompressPayload compressor, Query query, byte[] payload) {
+ compressedPayload = compressor.compress(query, payload);
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 54a3e42b9ab..ca2fce0b32b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -8,7 +8,6 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
-import com.yahoo.search.dispatch.TopKEstimator;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
@@ -29,16 +28,14 @@ public class SearchCluster implements NodeManager<Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
- private final DispatchConfig dispatchConfig;
+ private final double minActivedocsPercentage;
private final String clusterId;
+ private final VipStatus vipStatus;
+ private final PingFactory pingFactory;
private final Map<Integer, Group> groups;
private final List<Group> orderedGroups;
private final List<Node> nodes;
- private final VipStatus vipStatus;
- private final PingFactory pingFactory;
- private final TopKEstimator hitEstimator;
private long nextLogTime = 0;
- private static final double SKEW_FACTOR = 0.05;
/**
* A search node on this local machine having the entire corpus, which we therefore
@@ -48,13 +45,13 @@ public class SearchCluster implements NodeManager<Node> {
* if it only queries this cluster when the local node cannot be used, to avoid unnecessary
* cross-node network traffic.
*/
- private final Optional<Node> localCorpusDispatchTarget;
+ private final Node localCorpusDispatchTarget;
public SearchCluster(String clusterId, DispatchConfig dispatchConfig,
DispatchNodesConfig nodesConfig,
VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
- this.dispatchConfig = dispatchConfig;
+ this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage();
this.vipStatus = vipStatus;
this.pingFactory = pingFactory;
@@ -71,7 +68,6 @@ public class SearchCluster implements NodeManager<Node> {
nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group())));
this.orderedGroups = List.copyOf(groupIntroductionOrder.values());
- hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups);
}
@@ -85,7 +81,7 @@ public class SearchCluster implements NodeManager<Node> {
}
}
- private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
+ private static Node findLocalCorpusDispatchTarget(String selfHostname,
List<Node> nodes,
Map<Integer, Group> groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
@@ -96,15 +92,15 @@ public class SearchCluster implements NodeManager<Node> {
.filter(node -> node.hostname().equals(selfHostname))
.toList();
// Only use direct dispatch if we have exactly 1 search node on the same machine:
- if (localSearchNodes.size() != 1) return Optional.empty();
+ if (localSearchNodes.size() != 1) return null;
Node localSearchNode = localSearchNodes.iterator().next();
Group localSearchGroup = groups.get(localSearchNode.group());
// Only use direct dispatch if the local search node has the entire corpus
- if (localSearchGroup.nodes().size() != 1) return Optional.empty();
+ if (localSearchGroup.nodes().size() != 1) return null;
- return Optional.of(localSearchNode);
+ return localSearchNode;
}
private static List<Node> toNodes(DispatchNodesConfig nodesConfig) {
@@ -113,13 +109,6 @@ public class SearchCluster implements NodeManager<Node> {
.toList();
}
- public DispatchConfig dispatchConfig() {
- return dispatchConfig;
- }
-
- /** Returns an immutable list of all nodes in this. */
- public List<Node> nodes() { return nodes; }
-
/** Returns the groups of this cluster as an immutable map indexed by group id */
public Map<Integer, Group> groups() { return groups; }
@@ -148,16 +137,16 @@ public class SearchCluster implements NodeManager<Node> {
* or empty if we should not dispatch directly.
*/
public Optional<Node> localCorpusDispatchTarget() {
- if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty();
+ if ( localCorpusDispatchTarget == null) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group());
+ Group localSearchGroup = groups().get(localCorpusDispatchTarget.group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
- if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty();
+ if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
- return localCorpusDispatchTarget;
+ return Optional.of(localCorpusDispatchTarget);
}
private void updateWorkingState(Node node, boolean isWorking) {
@@ -185,7 +174,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) {
- if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if (localCorpusDispatchTarget == null) { // consider entire cluster
if (hasInformationAboutAllNodes())
setInRotationOnlyIf(hasWorkingNodes());
}
@@ -198,7 +187,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) {
- if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if ( localCorpusDispatchTarget == null) { // consider entire cluster
// VIP status does not depend on coverage
}
else if (usesLocalCorpusIn(group)) { // follow the status of this group
@@ -213,13 +202,6 @@ public class SearchCluster implements NodeManager<Node> {
vipStatus.removeFromRotation(clusterId);
}
- public int estimateHitsToFetch(int wantedHits, int numPartitions) {
- return hitEstimator.estimateK(wantedHits, numPartitions);
- }
- public int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
- return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
- }
-
public boolean hasInformationAboutAllNodes() {
return nodes.stream().allMatch(node -> node.isWorking() != null);
}
@@ -229,11 +211,11 @@ public class SearchCluster implements NodeManager<Node> {
}
private boolean usesLocalCorpusIn(Node node) {
- return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node);
+ return node.equals(localCorpusDispatchTarget);
}
private boolean usesLocalCorpusIn(Group group) {
- return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
+ return (localCorpusDispatchTarget != null) && localCorpusDispatchTarget.group() == group.id();
}
/** Used by the cluster monitor to manage node status */
@@ -286,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> {
private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) {
double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments;
- if (medianDocuments > 0 && documentCoverage < dispatchConfig.minActivedocsPercentage())
+ if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage)
return false;
return true;
}