summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java12
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java32
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java19
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java22
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java54
-rw-r--r--container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java2
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java30
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java53
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java3
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java13
19 files changed, 194 insertions, 167 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 34ddd8e16cc..a2155cc8649 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -84,7 +84,7 @@ public class FastSearcher extends VespaBackEndSearcher {
@Override
public Result doSearch2(Query query, Execution execution) {
- if (dispatcher.searchCluster().allGroupsHaveSize1())
+ if (dispatcher.allGroupsHaveSize1())
forceSinglePassGrouping(query);
try (SearchInvoker invoker = getSearchInvoker(query)) {
Result result = invoker.search(query, execution);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 36c5c8a16fa..b7a2d9f70ba 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -58,9 +58,9 @@ public class Dispatcher extends AbstractComponent {
private final ClusterMonitor<Node> clusterMonitor;
private final LoadBalancer loadBalancer;
private final InvokerFactory invokerFactory;
- private final RpcResourcePool rpcResourcePool;
private final RpcClient rpcClient;
private final int maxHitsPerNode;
+ private final RpcResourcePool rpcResourcePool;
private static final QueryProfileType argumentType;
@@ -101,7 +101,7 @@ public class Dispatcher extends AbstractComponent {
RpcClient rpcClient, RpcResourcePool rpcResourcePool) {
this(new ClusterMonitor<>(searchCluster, true),
searchCluster, dispatchConfig,
- new RpcInvokerFactory(rpcResourcePool, searchCluster),
+ new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig),
rpcClient, rpcResourcePool);
}
@@ -161,9 +161,8 @@ public class Dispatcher extends AbstractComponent {
new Compressor().warmup(seconds);
}
- /** Returns the search cluster this dispatches to */
- public SearchCluster searchCluster() {
- return searchCluster;
+ public boolean allGroupsHaveSize1() {
+ return searchCluster.allGroupsHaveSize1();
}
@Override
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 39be91cf3e8..8751a969fe0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Group;
-import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
@@ -38,27 +37,37 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private final Timer timer;
private final Set<SearchInvoker> invokers;
- private final SearchCluster searchCluster;
+ private final DispatchConfig dispatchConfig;
private final Group group;
private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
private final Set<Integer> alreadyFailedNodes;
private final CoverageAggregator coverageAggregator;
+ private final TopKEstimator hitEstimator;
private Query query;
private TimeoutHandler timeoutHandler;
public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers,
- SearchCluster searchCluster,
+ TopKEstimator hitEstimator,
+ DispatchConfig dispatchConfig,
Group group,
Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
this.timer = timer;
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
this.invokers.addAll(invokers);
- this.searchCluster = searchCluster;
+ this.dispatchConfig = dispatchConfig;
this.group = group;
this.availableForProcessing = newQueue();
this.alreadyFailedNodes = alreadyFailedNodes;
- coverageAggregator = new CoverageAggregator(invokers.size());
+ this.coverageAggregator = new CoverageAggregator(invokers.size());
+ this.hitEstimator = hitEstimator;
+ }
+
+ private int estimateHitsToFetch(int wantedHits, int numPartitions) {
+ return hitEstimator.estimateK(wantedHits, numPartitions);
+ }
+ private int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
+ return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
}
private TimeoutHandler createTimeoutHandler(DispatchConfig config, int askedNodes, Query query) {
@@ -84,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
if (group.isBalanced() && !group.isSparse()) {
Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability);
q = (topkProbabilityOverrride != null)
- ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
- : searchCluster.estimateHitsToFetch(neededHits, invokers.size());
+ ? estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
+ : estimateHitsToFetch(neededHits, invokers.size());
}
query.setHits(q);
query.setOffset(0);
@@ -94,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
for (SearchInvoker invoker : invokers) {
context = invoker.sendSearchRequest(query, context);
}
- timeoutHandler = createTimeoutHandler(searchCluster.dispatchConfig(), invokers.size(), query);
+ timeoutHandler = createTimeoutHandler(dispatchConfig, invokers.size(), query);
query.setHits(originalHits);
query.setOffset(originalOffset);
@@ -127,7 +136,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h));
insertNetworkErrors(result.getResult());
- CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)searchCluster.dispatchConfig().redundancy(), timeoutHandler);
+ CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)dispatchConfig.redundancy(), timeoutHandler);
result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler));
int needed = query.getOffset() + query.getHits();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 02cf11c9fe7..caeae9c2c1d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.ArrayList;
import java.util.HashSet;
@@ -21,11 +22,16 @@ import java.util.Set;
* @author ollivir
*/
public abstract class InvokerFactory {
+ private static final double SKEW_FACTOR = 0.05;
- protected final SearchCluster searchCluster;
+ private final SearchCluster searchCluster;
+ private final DispatchConfig dispatchConfig;
+ private final TopKEstimator hitEstimator;
- public InvokerFactory(SearchCluster searchCluster) {
+ public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) {
this.searchCluster = searchCluster;
+ this.dispatchConfig = dispatchConfig;
+ this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
}
protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
@@ -90,7 +96,7 @@ public abstract class InvokerFactory {
if (invokers.size() == 1 && failed == null) {
return Optional.of(invokers.get(0));
} else {
- return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, searchCluster, group, failed));
+ return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java
new file mode 100644
index 00000000000..786b804edaa
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java
@@ -0,0 +1,15 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.Compressor;
+import com.yahoo.search.Query;
+
+/**
+ * Interface for compressing and decompressing request/response
+ *
+ * @author baldersheim
+ */
+public interface CompressPayload {
+ Compressor.Compression compress(Query query, byte[] payload);
+ byte[] decompress(Client.ProtobufResponse response);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java
new file mode 100644
index 00000000000..9e7fc9b5b29
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+
+/**
+ * Implement interface to compress/decompress request/response
+ *
+ * @author baldersheim
+ */
+public class CompressService implements CompressPayload {
+ /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
+ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 256);
+
+
+ @Override
+ public Compressor.Compression compress(Query query, byte[] payload) {
+ CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
+ return compressor.compress(compression, payload);
+ }
+
+ @Override
+ public byte[] decompress(Client.ProtobufResponse response) {
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ return compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ }
+ Compressor compressor() { return compressor; }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
new file mode 100644
index 00000000000..fd8e0e4f81a
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -0,0 +1,11 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+/**
+ * Interface for getting a connection given a node id.
+ *
+ * @author balderersheim
+ */
+public interface RpcConnectionPool {
+ Client.NodeConnection getConnection(int nodeId);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index a1ebea3b695..d5b1b876540 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -9,6 +9,7 @@ import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Optional;
@@ -17,19 +18,18 @@ import java.util.Optional;
*/
public class RpcInvokerFactory extends InvokerFactory {
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
+ private final CompressPayload compressor;
- public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) {
- super(searchCluster);
+ public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) {
+ super(searchCluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
+ this.compressor = new CompressService();
}
@Override
- protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
- Query query,
- int maxHits,
- Node node) {
- return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits));
+ protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) {
+ return Optional.of(new RpcSearchInvoker(searcher, compressor, node, rpcResourcePool, maxHits));
}
@Override
@@ -37,7 +37,6 @@ public class RpcInvokerFactory extends InvokerFactory {
Query query = result.getQuery();
boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query);
- return new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery);
+ return new RpcProtobufFillInvoker(rpcResourcePool, compressor, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery);
}
-
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index 44f0af2aca1..53dc54f7bc5 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -25,17 +25,19 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
private static final boolean triggeredClassLoading = ErrorMessage.createBackendCommunicationError("TriggerClassLoading") instanceof ErrorMessage;
private final Node node;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool connectionPool;
private final ClusterMonitor<Node> clusterMonitor;
private final long pingSequenceId;
private final PongHandler pongHandler;
+ private final Compressor compressor;
- public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool, PongHandler pongHandler) {
+ public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcConnectionPool connectionPool, PongHandler pongHandler, Compressor compressor) {
this.node = node;
- this.resourcePool = rpcResourcePool;
+ this.connectionPool = connectionPool;
this.clusterMonitor = clusterMonitor;
- pingSequenceId = node.createPingSequenceId();
+ this.pingSequenceId = node.createPingSequenceId();
this.pongHandler = pongHandler;
+ this. compressor = compressor;
}
@Override
@@ -63,16 +65,16 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
}
private void sendPing() {
- var connection = resourcePool.getConnection(node.key());
+ var connection = connectionPool.getConnection(node.key());
var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
+ Compressor.Compression compressionResult = compressor.compress(PING_COMPRESSION, ping);
connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), this, timeoutSeconds);
}
private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize());
var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes);
if (reply.getDistributionKey() != node.key()) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
index 2e690198c1c..31e152c6ff9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.PingFactory;
@@ -9,15 +11,16 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler;
public class RpcPingFactory implements PingFactory {
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 512);
- public RpcPingFactory(RpcResourcePool rpcResourcePool) {
+ public RpcPingFactory(RpcConnectionPool rpcResourcePool) {
this.rpcResourcePool = rpcResourcePool;
}
@Override
public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) {
- return new RpcPing(node, monitor, rpcResourcePool, pongHandler);
+ return new RpcPing(node, monitor, rpcResourcePool, pongHandler, compressor);
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index c84795352f5..4e538fb54dc 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.collections.Pair;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.Inspector;
@@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker {
private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
private final DocumentDatabase documentDb;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final boolean summaryNeedsQuery;
private final String serverId;
+ private final CompressPayload compressor;
private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses;
@@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker {
/** The number of responses we should receive (and process) before this is complete */
private int outstandingResponses;
- RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
+ RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
this.documentDb = documentDb;
this.resourcePool = resourcePool;
this.serverId = serverId;
this.summaryNeedsQuery = summaryNeedsQuery;
+ this.compressor = compressor;
}
@Override
@@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
Hit h = i.next();
- if (!(h instanceof FastHit))
+ if (!(h instanceof FastHit hit))
continue;
- FastHit hit = (FastHit) h;
hitsByNode.put(hit.getDistributionKey(), hit);
}
@@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
Query query = result.getQuery();
- Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ Compressor.Compression compressionResult = compressor.compress(query, payload);
node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(),
roe -> receive(roe, hits), clientTimeout);
}
@@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
hasReportedError = true;
} else {
Client.ProtobufResponse response = responseOrError.response().get();
- CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression,
- response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response);
return fill(result, hitsContext, summaryClass, responseBytes);
}
return 0;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index bbe60d0df23..71e8cc0baa8 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -2,11 +2,6 @@
package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.compress.Compressor.Compression;
-import com.yahoo.processing.request.CompoundName;
-import com.yahoo.search.Query;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
@@ -23,12 +18,7 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool implements AutoCloseable {
-
- /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
- public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
-
- private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32);
+public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
@@ -54,15 +44,7 @@ public class RpcResourcePool implements AutoCloseable {
this.nodeConnectionPools = builder.build();
}
- public Compressor compressor() {
- return compressor;
- }
-
- public Compression compress(Query query, byte[] payload) {
- CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
- return compressor.compress(compression, payload);
- }
-
+ @Override
public NodeConnection getConnection(int nodeId) {
var pool = nodeConnectionPools.get(nodeId);
if (pool == null) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 64e0dd666dd..31de0b147c9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
@@ -29,19 +28,21 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
private final VespaBackEndSearcher searcher;
private final Node node;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
private final int maxHits;
+ private final CompressPayload compressor;
private Query query;
- RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) {
+ RpcSearchInvoker(VespaBackEndSearcher searcher, CompressPayload compressor, Node node, RpcConnectionPool resourcePool, int maxHits) {
super(Optional.of(node));
this.searcher = searcher;
this.node = node;
this.resourcePool = resourcePool;
this.responses = new LinkedBlockingQueue<>(1);
this.maxHits = maxHits;
+ this.compressor = compressor;
}
@Override
@@ -78,7 +79,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
if (incomingContext instanceof RpcContext)
return (RpcContext)incomingContext;
- return new RpcContext(resourcePool, query,
+ return new RpcContext(compressor, query,
ProtobufSerialization.serializeSearchRequest(query,
Math.min(query.getHits(), maxHits),
searcher.getServerId(), requestTimeout));
@@ -110,8 +111,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
}
ProtobufResponse protobufResponse = response.response().get();
- CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
- byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize());
+ byte[] payload = compressor.decompress(protobufResponse);
return ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key());
}
@@ -133,8 +133,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
final Compressor.Compression compressedPayload;
- RpcContext(RpcResourcePool resourcePool, Query query, byte[] payload) {
- compressedPayload = resourcePool.compress(query, payload);
+ RpcContext(CompressPayload compressor, Query query, byte[] payload) {
+ compressedPayload = compressor.compress(query, payload);
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 54a3e42b9ab..ca2fce0b32b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -8,7 +8,6 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
-import com.yahoo.search.dispatch.TopKEstimator;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
@@ -29,16 +28,14 @@ public class SearchCluster implements NodeManager<Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
- private final DispatchConfig dispatchConfig;
+ private final double minActivedocsPercentage;
private final String clusterId;
+ private final VipStatus vipStatus;
+ private final PingFactory pingFactory;
private final Map<Integer, Group> groups;
private final List<Group> orderedGroups;
private final List<Node> nodes;
- private final VipStatus vipStatus;
- private final PingFactory pingFactory;
- private final TopKEstimator hitEstimator;
private long nextLogTime = 0;
- private static final double SKEW_FACTOR = 0.05;
/**
* A search node on this local machine having the entire corpus, which we therefore
@@ -48,13 +45,13 @@ public class SearchCluster implements NodeManager<Node> {
* if it only queries this cluster when the local node cannot be used, to avoid unnecessary
* cross-node network traffic.
*/
- private final Optional<Node> localCorpusDispatchTarget;
+ private final Node localCorpusDispatchTarget;
public SearchCluster(String clusterId, DispatchConfig dispatchConfig,
DispatchNodesConfig nodesConfig,
VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
- this.dispatchConfig = dispatchConfig;
+ this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage();
this.vipStatus = vipStatus;
this.pingFactory = pingFactory;
@@ -71,7 +68,6 @@ public class SearchCluster implements NodeManager<Node> {
nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group())));
this.orderedGroups = List.copyOf(groupIntroductionOrder.values());
- hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups);
}
@@ -85,7 +81,7 @@ public class SearchCluster implements NodeManager<Node> {
}
}
- private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
+ private static Node findLocalCorpusDispatchTarget(String selfHostname,
List<Node> nodes,
Map<Integer, Group> groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
@@ -96,15 +92,15 @@ public class SearchCluster implements NodeManager<Node> {
.filter(node -> node.hostname().equals(selfHostname))
.toList();
// Only use direct dispatch if we have exactly 1 search node on the same machine:
- if (localSearchNodes.size() != 1) return Optional.empty();
+ if (localSearchNodes.size() != 1) return null;
Node localSearchNode = localSearchNodes.iterator().next();
Group localSearchGroup = groups.get(localSearchNode.group());
// Only use direct dispatch if the local search node has the entire corpus
- if (localSearchGroup.nodes().size() != 1) return Optional.empty();
+ if (localSearchGroup.nodes().size() != 1) return null;
- return Optional.of(localSearchNode);
+ return localSearchNode;
}
private static List<Node> toNodes(DispatchNodesConfig nodesConfig) {
@@ -113,13 +109,6 @@ public class SearchCluster implements NodeManager<Node> {
.toList();
}
- public DispatchConfig dispatchConfig() {
- return dispatchConfig;
- }
-
- /** Returns an immutable list of all nodes in this. */
- public List<Node> nodes() { return nodes; }
-
/** Returns the groups of this cluster as an immutable map indexed by group id */
public Map<Integer, Group> groups() { return groups; }
@@ -148,16 +137,16 @@ public class SearchCluster implements NodeManager<Node> {
* or empty if we should not dispatch directly.
*/
public Optional<Node> localCorpusDispatchTarget() {
- if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty();
+ if ( localCorpusDispatchTarget == null) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group());
+ Group localSearchGroup = groups().get(localCorpusDispatchTarget.group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
- if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty();
+ if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
- return localCorpusDispatchTarget;
+ return Optional.of(localCorpusDispatchTarget);
}
private void updateWorkingState(Node node, boolean isWorking) {
@@ -185,7 +174,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) {
- if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if (localCorpusDispatchTarget == null) { // consider entire cluster
if (hasInformationAboutAllNodes())
setInRotationOnlyIf(hasWorkingNodes());
}
@@ -198,7 +187,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) {
- if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if ( localCorpusDispatchTarget == null) { // consider entire cluster
// VIP status does not depend on coverage
}
else if (usesLocalCorpusIn(group)) { // follow the status of this group
@@ -213,13 +202,6 @@ public class SearchCluster implements NodeManager<Node> {
vipStatus.removeFromRotation(clusterId);
}
- public int estimateHitsToFetch(int wantedHits, int numPartitions) {
- return hitEstimator.estimateK(wantedHits, numPartitions);
- }
- public int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
- return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
- }
-
public boolean hasInformationAboutAllNodes() {
return nodes.stream().allMatch(node -> node.isWorking() != null);
}
@@ -229,11 +211,11 @@ public class SearchCluster implements NodeManager<Node> {
}
private boolean usesLocalCorpusIn(Node node) {
- return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node);
+ return node.equals(localCorpusDispatchTarget);
}
private boolean usesLocalCorpusIn(Group group) {
- return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
+ return (localCorpusDispatchTarget != null) && localCorpusDispatchTarget.group() == group.id();
}
/** Used by the cluster monitor to manage node status */
@@ -286,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> {
private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) {
double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments;
- if (medianDocuments > 0 && documentCoverage < dispatchConfig.minActivedocsPercentage())
+ if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage)
return false;
return true;
}
diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
index 974441fc5fc..10e188d092c 100644
--- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
+++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java
@@ -33,7 +33,7 @@ class MockDispatcher extends Dispatcher {
}
private MockDispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcResourcePool rpcResourcePool) {
- this(clusterMonitor, searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster));
+ this(clusterMonitor, searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig));
}
private MockDispatcher(ClusterMonitor clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, RpcInvokerFactory invokerFactory) {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
index a6c07b74a92..bc92afdb8fc 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/DispatcherTest.java
@@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.PingFactory;
import com.yahoo.search.dispatch.searchcluster.Pinger;
import com.yahoo.search.dispatch.searchcluster.PongHandler;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.vespa.config.search.DispatchConfig;
import org.junit.jupiter.api.Test;
import java.util.List;
@@ -26,18 +27,19 @@ import static org.junit.jupiter.api.Assertions.fail;
* @author ollivir
*/
public class DispatcherTest {
+ private final DispatchConfig dispatchConfig = createDispatchConfig();
@Test
void requireThatDispatcherSupportsSearchPath() {
SearchCluster cl = new MockSearchCluster("1", 2, 2);
Query q = new Query();
q.getModel().setSearchPath("1/0"); // second node in first group
- MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (nodes, a) -> {
+ MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (nodes, a) -> {
assertEquals(1, nodes.size());
assertEquals(1, nodes.get(0).key());
return true;
});
- Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory);
+ Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory);
SearchInvoker invoker = disp.getSearchInvoker(q, null);
assertNotNull(invoker);
invokerFactory.verifyAllEventsProcessed();
@@ -52,8 +54,8 @@ public class DispatcherTest {
return Optional.of(new Node(1, "test", 1));
}
};
- MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> true);
- Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory);
+ MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, a) -> true);
+ Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory);
SearchInvoker invoker = disp.getSearchInvoker(new Query(), null);
assertNotNull(invoker);
invokerFactory.verifyAllEventsProcessed();
@@ -64,14 +66,14 @@ public class DispatcherTest {
void requireThatInvokerConstructionIsRetriedAndLastAcceptsAnyCoverage() {
SearchCluster cl = new MockSearchCluster("1", 2, 1);
- MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, acceptIncompleteCoverage) -> {
+ MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, acceptIncompleteCoverage) -> {
assertFalse(acceptIncompleteCoverage);
return false;
}, (n, acceptIncompleteCoverage) -> {
assertTrue(acceptIncompleteCoverage);
return true;
});
- Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory);
+ Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory);
SearchInvoker invoker = disp.getSearchInvoker(new Query(), null);
assertNotNull(invoker);
invokerFactory.verifyAllEventsProcessed();
@@ -83,8 +85,8 @@ public class DispatcherTest {
try {
SearchCluster cl = new MockSearchCluster("1", 2, 1);
- MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, (n, a) -> false, (n, a) -> false);
- Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, createDispatchConfig(), invokerFactory);
+ MockInvokerFactory invokerFactory = new MockInvokerFactory(cl, dispatchConfig, (n, a) -> false, (n, a) -> false);
+ Dispatcher disp = new Dispatcher(new ClusterMonitor<>(cl, false), cl, dispatchConfig, invokerFactory);
disp.getSearchInvoker(new Query(), null);
disp.deconstruct();
fail("Expected exception");
@@ -97,7 +99,7 @@ public class DispatcherTest {
@Test
void testGroup0IsSelected() {
SearchCluster cluster = new MockSearchCluster("1", 3, 1);
- Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true));
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true));
cluster.pingIterationCompleted();
assertEquals(0,
dispatcher.getSearchInvoker(new Query(), null).distributionKey().get().longValue());
@@ -107,7 +109,7 @@ public class DispatcherTest {
@Test
void testGroup0IsSkippedWhenItIsBlockingFeed() {
SearchCluster cluster = new MockSearchCluster("1", 3, 1);
- Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true));
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true));
cluster.group(0).get().nodes().get(0).setBlockingWrites(true);
cluster.pingIterationCompleted();
assertEquals(1,
@@ -119,7 +121,7 @@ public class DispatcherTest {
@Test
void testGroup0IsSelectedWhenMoreAreBlockingFeed() {
SearchCluster cluster = new MockSearchCluster("1", 3, 1);
- Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true));
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true));
cluster.group(0).get().nodes().get(0).setBlockingWrites(true);
cluster.group(1).get().nodes().get(0).setBlockingWrites(true);
cluster.pingIterationCompleted();
@@ -132,7 +134,7 @@ public class DispatcherTest {
@Test
void testGroup0IsSelectedWhenItIsBlockingFeedWhenNoOthers() {
SearchCluster cluster = new MockSearchCluster("1", 1, 1);
- Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, createDispatchConfig(), new MockInvokerFactory(cluster, (n, a) -> true));
+ Dispatcher dispatcher = new Dispatcher(new ClusterMonitor<>(cluster, false), cluster, dispatchConfig, new MockInvokerFactory(cluster, dispatchConfig, (n, a) -> true));
cluster.group(0).get().nodes().get(0).setBlockingWrites(true);
cluster.pingIterationCompleted();
assertEquals(0,
@@ -150,8 +152,8 @@ public class DispatcherTest {
private final FactoryStep[] events;
private int step = 0;
- public MockInvokerFactory(SearchCluster cl, FactoryStep... events) {
- super(cl);
+ public MockInvokerFactory(SearchCluster cl, DispatchConfig disptachConfig, FactoryStep... events) {
+ super(cl, disptachConfig);
this.events = events;
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
index 15656ffb457..178d3383805 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/InterleavedSearchInvokerTest.java
@@ -10,7 +10,6 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Group;
import com.yahoo.search.dispatch.searchcluster.Node;
-import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
@@ -22,7 +21,6 @@ import com.yahoo.searchlib.expression.IntegerResultNode;
import com.yahoo.searchlib.expression.StringResultNode;
import com.yahoo.test.ManualClock;
import com.yahoo.vespa.config.search.DispatchConfig;
-import com.yahoo.vespa.config.search.DispatchNodesConfig;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -40,8 +38,6 @@ import java.util.stream.StreamSupport;
import static com.yahoo.container.handler.Coverage.DEGRADED_BY_MATCH_PHASE;
import static com.yahoo.container.handler.Coverage.DEGRADED_BY_TIMEOUT;
-import static com.yahoo.search.dispatch.MockSearchCluster.createDispatchConfig;
-import static com.yahoo.search.dispatch.MockSearchCluster.createNodesConfig;
import static org.junit.jupiter.api.Assertions.*;
/**
@@ -53,11 +49,12 @@ public class InterleavedSearchInvokerTest {
private final Query query = new TestQuery();
private final LinkedList<Event> expectedEvents = new LinkedList<>();
private final List<SearchInvoker> invokers = new ArrayList<>();
+ DispatchConfig dispatchConfig = new DispatchConfig.Builder().build();
+ TopKEstimator hitEstimator = new TopKEstimator(30, dispatchConfig.topKProbability(), 0.05);
@Test
void requireThatAdaptiveTimeoutsAreNotUsedWithFullCoverageRequirement() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(100.0), createNodesConfig(), 1, 3);
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 3)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 3)) {
expectedEvents.add(new Event(5000, 100, 0));
expectedEvents.add(new Event(4900, 100, 1));
@@ -71,8 +68,7 @@ public class InterleavedSearchInvokerTest {
@Test
void requireThatTimeoutsAreNotMarkedAsAdaptive() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(100.0), createNodesConfig(), 1, 3);
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 3)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 3)) {
expectedEvents.add(new Event(5000, 300, 0));
expectedEvents.add(new Event(4700, 300, 1));
@@ -90,8 +86,7 @@ public class InterleavedSearchInvokerTest {
@Test
void requireThatAdaptiveTimeoutDecreasesTimeoutWhenCoverageIsReached() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", createDispatchConfig(50.0), createNodesConfig(), 1, 4);
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 4)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(hitEstimator, MockSearchCluster.createDispatchConfig(50.0), new Group(0, List.of()), 4)) {
expectedEvents.add(new Event(5000, 100, 0));
expectedEvents.add(new Event(4900, 100, 1));
@@ -110,10 +105,9 @@ public class InterleavedSearchInvokerTest {
@Test
void requireCorrectCoverageCalculationWhenAllNodesOk() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", 1, 2);
invokers.add(new MockInvoker(0, createCoverage(50155, 50155, 50155, 1, 1, 0)));
invokers.add(new MockInvoker(1, createCoverage(49845, 49845, 49845, 1, 1, 0)));
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) {
expectedEvents.add(new Event(null, 100, 0));
expectedEvents.add(new Event(null, 200, 1));
@@ -132,10 +126,9 @@ public class InterleavedSearchInvokerTest {
@Test
void requireCorrectCoverageCalculationWhenResultsAreLimitedByMatchPhase() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", 1, 2);
invokers.add(new MockInvoker(0, createCoverage(10101, 50155, 50155, 1, 1, DEGRADED_BY_MATCH_PHASE)));
invokers.add(new MockInvoker(1, createCoverage(13319, 49845, 49845, 1, 1, DEGRADED_BY_MATCH_PHASE)));
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) {
expectedEvents.add(new Event(null, 100, 0));
expectedEvents.add(new Event(null, 200, 1));
@@ -155,10 +148,9 @@ public class InterleavedSearchInvokerTest {
@Test
void requireCorrectCoverageCalculationWhenResultsAreLimitedBySoftTimeout() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", 1, 2);
invokers.add(new MockInvoker(0, createCoverage(5000, 50155, 50155, 1, 1, DEGRADED_BY_TIMEOUT)));
invokers.add(new MockInvoker(1, createCoverage(4900, 49845, 49845, 1, 1, DEGRADED_BY_TIMEOUT)));
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) {
expectedEvents.add(new Event(null, 100, 0));
expectedEvents.add(new Event(null, 200, 1));
@@ -178,10 +170,9 @@ public class InterleavedSearchInvokerTest {
@Test
void requireCorrectCoverageCalculationWhenOneNodeIsUnexpectedlyDown() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", 1, 2);
invokers.add(new MockInvoker(0, createCoverage(50155, 50155, 50155, 1, 1, 0)));
invokers.add(new MockInvoker(1, createCoverage(49845, 49845, 49845, 1, 1, 0)));
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) {
expectedEvents.add(new Event(null, 100, 0));
expectedEvents.add(null);
@@ -342,7 +333,6 @@ public class InterleavedSearchInvokerTest {
@Test
void requireThatGroupingsAreMerged() throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", 1, 2);
List<SearchInvoker> invokers = new ArrayList<>();
Grouping grouping1 = new Grouping(0);
@@ -365,7 +355,7 @@ public class InterleavedSearchInvokerTest {
.addAggregationResult(new MinAggregationResult().setMin(new IntegerResultNode(6)).setTag(3))));
invokers.add(new MockInvoker(0).setHits(List.of(new GroupingListHit(List.of(grouping2)))));
- try (InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, cluster, new Group(0, List.of()), Collections.emptySet())) {
+ try (InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, new Group(0, List.of()), Collections.emptySet())) {
invoker.responseAvailable(invokers.get(0));
invoker.responseAvailable(invokers.get(1));
Result result = invoker.search(query, null);
@@ -377,11 +367,12 @@ public class InterleavedSearchInvokerTest {
}
private static InterleavedSearchInvoker createInterLeavedTestInvoker(List<Double> a, List<Double> b, Group group) {
- SearchCluster cluster = new MockSearchCluster("!", 1, 2);
+ DispatchConfig dispatchConfig = new DispatchConfig.Builder().build();
+ TopKEstimator hitEstimator = new TopKEstimator(30, dispatchConfig.topKProbability(), 0.05);
List<SearchInvoker> invokers = new ArrayList<>();
invokers.add(createInvoker(a, 0));
invokers.add(createInvoker(b, 1));
- InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, cluster, group, Collections.emptySet());
+ InterleavedSearchInvoker invoker = new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, Collections.emptySet());
invoker.responseAvailable(invokers.get(0));
invoker.responseAvailable(invokers.get(1));
return invoker;
@@ -402,13 +393,12 @@ public class InterleavedSearchInvokerTest {
return hits;
}
- void verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, int expectedCoverage) throws IOException {
- SearchCluster cluster = new MockSearchCluster("!", dispatchConfig, nodesConfig, 1, 2);
+ void verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(int expectedCoverage) throws IOException {
invokers.add(new MockInvoker(0, createCoverage(50155, 50155, 60000, 1, 1, 0)));
Coverage errorCoverage = new Coverage(0, 0, 0);
errorCoverage.setNodesTried(1);
invokers.add(new SearchErrorInvoker(ErrorMessage.createBackendCommunicationError("node is down"), errorCoverage));
- try (SearchInvoker invoker = createInterleavedInvoker(cluster, new Group(0, List.of()), 0)) {
+ try (SearchInvoker invoker = createInterleavedInvoker(new Group(0, List.of()), 0)) {
expectedEvents.add(new Event(null, 1, 1));
expectedEvents.add(new Event(null, 100, 0));
@@ -429,19 +419,20 @@ public class InterleavedSearchInvokerTest {
@Test
void requireCorrectCoverageCalculationWhenDegradedCoverageIsExpectedUsingTargetActiveDocs() throws IOException {
- verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(MockSearchCluster.createDispatchConfigBuilder(100.0)
- .redundancy(1)
- .build(),
- MockSearchCluster.createNodesConfig(),
+ verifyCorrectCoverageCalculationWhenDegradedCoverageIsExpected(
42);
}
- private InterleavedSearchInvoker createInterleavedInvoker(SearchCluster searchCluster, Group group, int numInvokers) {
+ private InterleavedSearchInvoker createInterleavedInvoker(Group group, int numInvokers) {
+ return createInterleavedInvoker(hitEstimator, dispatchConfig, group, numInvokers);
+ }
+ private InterleavedSearchInvoker createInterleavedInvoker(TopKEstimator hitEstimator, DispatchConfig dispatchConfig,
+ Group group, int numInvokers) {
for (int i = 0; i < numInvokers; i++) {
invokers.add(new MockInvoker(i));
}
- return new InterleavedSearchInvoker(Timer.wrap(clock), invokers, searchCluster, group,null) {
+ return new InterleavedSearchInvoker(Timer.wrap(clock), invokers, hitEstimator, dispatchConfig, group,null) {
@Override
protected LinkedBlockingQueue<SearchInvoker> newQueue() {
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
index c90153e8008..32ca63693b4 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/MockSearchCluster.java
@@ -62,9 +62,6 @@ public class MockSearchCluster extends SearchCluster {
}
@Override
- public List<Node> nodes() { return nodes; }
-
- @Override
public ImmutableMap<Integer, Group> groups() {
return groups;
}
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
index 6789d0347d5..82b7845d63d 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/rpc/RpcSearchInvokerTest.java
@@ -5,7 +5,6 @@ package com.yahoo.search.dispatch.rpc;
import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.common.collect.ImmutableMap;
import com.yahoo.compress.CompressionType;
-import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
@@ -14,7 +13,6 @@ import com.yahoo.search.searchchain.Execution;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -25,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.*;
*/
public class RpcSearchInvokerTest {
+ private final CompressService compressor = new CompressService();
@Test
void testProtobufSerialization() throws IOException {
var compressionTypeHolder = new AtomicReference<CompressionType>();
@@ -32,21 +31,21 @@ public class RpcSearchInvokerTest {
var lengthHolder = new AtomicInteger();
var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder);
var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123)));
- var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, 1000);
+ var invoker = new RpcSearchInvoker(mockSearcher(), compressor, new Node(7, "seven", 1), mockPool, 1000);
Query q = new Query("search/?query=test&hits=10&offset=3");
RpcSearchInvoker.RpcContext context = (RpcSearchInvoker.RpcContext) invoker.sendSearchRequest(q, null);
assertEquals(lengthHolder.get(), context.compressedPayload.uncompressedSize());
assertSame(context.compressedPayload.data(), payloadHolder.get());
- var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get());
+ var bytes = compressor.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get());
var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build();
assertEquals(10, request.getHits());
assertEquals(3, request.getOffset());
assertTrue(request.getQueryTreeBlob().size() > 0);
- var invoker2 = new RpcSearchInvoker(mockSearcher(), new Node(8, "eight", 1), mockPool, 1000);
+ var invoker2 = new RpcSearchInvoker(mockSearcher(), compressor, new Node(8, "eight", 1), mockPool, 1000);
RpcSearchInvoker.RpcContext context2 = (RpcSearchInvoker.RpcContext) invoker2.sendSearchRequest(q, context);
assertSame(context, context2);
assertEquals(lengthHolder.get(), context.compressedPayload.uncompressedSize());
@@ -61,12 +60,12 @@ public class RpcSearchInvokerTest {
var lengthHolder = new AtomicInteger();
var mockClient = parameterCollectorClient(compressionTypeHolder, payloadHolder, lengthHolder);
var mockPool = new RpcResourcePool(ImmutableMap.of(7, mockClient.createConnection("foo", 123)));
- var invoker = new RpcSearchInvoker(mockSearcher(), new Node(7, "seven", 1), mockPool, maxHits);
+ var invoker = new RpcSearchInvoker(mockSearcher(), compressor, new Node(7, "seven", 1), mockPool, maxHits);
Query q = new Query("search/?query=test&hits=10&offset=3");
invoker.sendSearchRequest(q, null);
- var bytes = mockPool.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get());
+ var bytes = compressor.compressor().decompress(payloadHolder.get(), compressionTypeHolder.get(), lengthHolder.get());
var request = SearchProtocol.SearchRequest.newBuilder().mergeFrom(bytes).build();
assertEquals(maxHits, request.getHits());