summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-23 12:01:38 +0100
committerGitHub <noreply@github.com>2022-11-23 12:01:38 +0100
commit02dbfdece2c0bc6a1e2352c737da9c2ba7ae3d82 (patch)
tree317255e68f5f644ad9ee48d2eaee245cdacde5b0 /container-search/src/main/java/com
parent43342ee8c758981e79b72f8e9ab8992d4e7e26db (diff)
parent0a3d6beec5b28150694297a99716ea23732dc6c5 (diff)
Merge pull request #24964 from vespa-engine/balder/do-not-inject-rpc-resource-pool-and-decouple
Balder/do not inject rpc resource pool and decouple [run-systemtest]
Diffstat (limited to 'container-search/src/main/java/com')
-rw-r--r--container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java73
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java27
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java12
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java15
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java32
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java11
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java19
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java9
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java42
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java16
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java54
16 files changed, 203 insertions, 145 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
index 34ddd8e16cc..a2155cc8649 100644
--- a/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
+++ b/container-search/src/main/java/com/yahoo/prelude/fastsearch/FastSearcher.java
@@ -84,7 +84,7 @@ public class FastSearcher extends VespaBackEndSearcher {
@Override
public Result doSearch2(Query query, Execution execution) {
- if (dispatcher.searchCluster().allGroupsHaveSize1())
+ if (dispatcher.allGroupsHaveSize1())
forceSinglePassGrouping(query);
try (SearchInvoker invoker = getSearchInvoker(query)) {
Result result = invoker.search(query, execution);
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index a862e6c2d98..0b627e91bc5 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -111,7 +111,7 @@ public class ClusterMonitor<T> {
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
public List<BaseNodeMonitor<T>> nodeMonitors() {
- return new java.util.ArrayList<>(nodeMonitors.values());
+ return List.copyOf(nodeMonitors.values());
}
/** Must be called when this goes out of use */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index e4147f6ba14..55073f25f0e 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -58,6 +58,7 @@ public class Dispatcher extends AbstractComponent {
private final LoadBalancer loadBalancer;
private final InvokerFactory invokerFactory;
private final int maxHitsPerNode;
+ private final RpcResourcePool rpcResourcePool;
private static final QueryProfileType argumentType;
@@ -72,28 +73,29 @@ public class Dispatcher extends AbstractComponent {
public static QueryProfileType getArgumentType() { return argumentType; }
@Inject
- public Dispatcher(RpcResourcePool resourcePool,
- ComponentId clusterId,
+ public Dispatcher(ComponentId clusterId,
DispatchConfig dispatchConfig,
DispatchNodesConfig nodesConfig,
VipStatus vipStatus) {
- this(resourcePool, new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig,
- vipStatus, new RpcPingFactory(resourcePool)),
- dispatchConfig);
+ this(clusterId, dispatchConfig, nodesConfig, vipStatus,
+ new RpcResourcePool(dispatchConfig, nodesConfig));
}
+ private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
+ DispatchNodesConfig nodesConfig, VipStatus vipStatus,
+ RpcResourcePool resourcePool) {
+ this(new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig,
+ vipStatus, new RpcPingFactory(resourcePool)),
+ dispatchConfig, resourcePool);
- private Dispatcher(RpcResourcePool resourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) {
- this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(resourcePool, searchCluster));
}
- private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
- return switch (policy) {
- case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN;
- case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2;
- case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
- case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
- };
+ private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig,
+ RpcResourcePool rpcResourcePool) {
+ this(new ClusterMonitor<>(searchCluster, true),
+ searchCluster, dispatchConfig,
+ new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig),
+ rpcResourcePool);
}
/* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
@@ -101,11 +103,20 @@ public class Dispatcher extends AbstractComponent {
SearchCluster searchCluster,
DispatchConfig dispatchConfig,
InvokerFactory invokerFactory) {
+ this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null);
+ }
+
+ private Dispatcher(ClusterMonitor<Node> clusterMonitor,
+ SearchCluster searchCluster,
+ DispatchConfig dispatchConfig,
+ InvokerFactory invokerFactory,
+ RpcResourcePool rpcResourcePool) {
this.searchCluster = searchCluster;
this.clusterMonitor = clusterMonitor;
this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy()));
this.invokerFactory = invokerFactory;
+ this.rpcResourcePool = rpcResourcePool;
this.maxHitsPerNode = dispatchConfig.maxHitsPerNode();
searchCluster.addMonitoring(clusterMonitor);
Thread warmup = new Thread(() -> warmup(dispatchConfig.warmuptime()));
@@ -124,6 +135,15 @@ public class Dispatcher extends AbstractComponent {
searchCluster.pingIterationCompleted();
}
+ private static LoadBalancer.Policy toLoadBalancerPolicy(DispatchConfig.DistributionPolicy.Enum policy) {
+ return switch (policy) {
+ case ROUNDROBIN: yield LoadBalancer.Policy.ROUNDROBIN;
+ case BEST_OF_RANDOM_2: yield LoadBalancer.Policy.BEST_OF_RANDOM_2;
+ case ADAPTIVE,LATENCY_AMORTIZED_OVER_REQUESTS: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_REQUESTS;
+ case LATENCY_AMORTIZED_OVER_TIME: yield LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
+ };
+ }
+
/**
* Will run important code in order to trigger JIT compilation and avoid cold start issues.
* Currently warms up lz4 compression code.
@@ -132,9 +152,8 @@ public class Dispatcher extends AbstractComponent {
new Compressor().warmup(seconds);
}
- /** Returns the search cluster this dispatches to */
- public SearchCluster searchCluster() {
- return searchCluster;
+ public boolean allGroupsHaveSize1() {
+ return searchCluster.allGroupsHaveSize1();
}
@Override
@@ -142,6 +161,9 @@ public class Dispatcher extends AbstractComponent {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
clusterMonitor.shutdown();
invokerFactory.release();
+ if (rpcResourcePool != null) {
+ rpcResourcePool.close();
+ }
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
@@ -149,7 +171,8 @@ public class Dispatcher extends AbstractComponent {
}
public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
- SearchInvoker invoker = getSearchPathInvoker(query, searcher).orElseGet(() -> getInternalInvoker(query, searcher));
+ SearchCluster cluster = searchCluster; // Take a snapshot
+ SearchInvoker invoker = getSearchPathInvoker(query, searcher, cluster).orElseGet(() -> getInternalInvoker(query, searcher, cluster));
if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
query.setHits(0);
@@ -159,12 +182,12 @@ public class Dispatcher extends AbstractComponent {
}
/** Builds an invoker based on searchpath */
- private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher) {
+ private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster) {
String searchPath = query.getModel().getSearchPath();
if (searchPath == null) return Optional.empty();
try {
- List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster);
+ List<Node> nodes = SearchPath.selectNodes(searchPath, cluster);
if (nodes.isEmpty()) return Optional.empty();
query.trace(false, 2, "Dispatching with search path ", searchPath);
@@ -178,8 +201,8 @@ public class Dispatcher extends AbstractComponent {
}
}
- private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher) {
- Optional<Node> directNode = searchCluster.localCorpusDispatchTarget();
+ private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster) {
+ Optional<Node> directNode = cluster.localCorpusDispatchTarget();
if (directNode.isPresent()) {
Node node = directNode.get();
query.trace(false, 2, "Dispatching to ", node);
@@ -191,10 +214,10 @@ public class Dispatcher extends AbstractComponent {
.orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node));
}
- int covered = searchCluster.groupsWithSufficientCoverage();
- int groups = searchCluster.orderedGroups().size();
+ int covered = cluster.groupsWithSufficientCoverage();
+ int groups = cluster.orderedGroups().size();
int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS);
- Set<Integer> rejected = rejectGroupBlockingFeed(searchCluster.orderedGroups());
+ Set<Integer> rejected = rejectGroupBlockingFeed(cluster.orderedGroups());
for (int i = 0; i < max; i++) {
Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected);
if (groupInCluster.isEmpty()) break; // No groups available
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
index 39be91cf3e8..8751a969fe0 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InterleavedSearchInvoker.java
@@ -6,7 +6,6 @@ import com.yahoo.prelude.fastsearch.GroupingListHit;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.dispatch.searchcluster.Group;
-import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
@@ -38,27 +37,37 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
private final Timer timer;
private final Set<SearchInvoker> invokers;
- private final SearchCluster searchCluster;
+ private final DispatchConfig dispatchConfig;
private final Group group;
private final LinkedBlockingQueue<SearchInvoker> availableForProcessing;
private final Set<Integer> alreadyFailedNodes;
private final CoverageAggregator coverageAggregator;
+ private final TopKEstimator hitEstimator;
private Query query;
private TimeoutHandler timeoutHandler;
public InterleavedSearchInvoker(Timer timer, Collection<SearchInvoker> invokers,
- SearchCluster searchCluster,
+ TopKEstimator hitEstimator,
+ DispatchConfig dispatchConfig,
Group group,
Set<Integer> alreadyFailedNodes) {
super(Optional.empty());
this.timer = timer;
this.invokers = Collections.newSetFromMap(new IdentityHashMap<>());
this.invokers.addAll(invokers);
- this.searchCluster = searchCluster;
+ this.dispatchConfig = dispatchConfig;
this.group = group;
this.availableForProcessing = newQueue();
this.alreadyFailedNodes = alreadyFailedNodes;
- coverageAggregator = new CoverageAggregator(invokers.size());
+ this.coverageAggregator = new CoverageAggregator(invokers.size());
+ this.hitEstimator = hitEstimator;
+ }
+
+ private int estimateHitsToFetch(int wantedHits, int numPartitions) {
+ return hitEstimator.estimateK(wantedHits, numPartitions);
+ }
+ private int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
+ return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
}
private TimeoutHandler createTimeoutHandler(DispatchConfig config, int askedNodes, Query query) {
@@ -84,8 +93,8 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
if (group.isBalanced() && !group.isSparse()) {
Double topkProbabilityOverrride = query.properties().getDouble(Dispatcher.topKProbability);
q = (topkProbabilityOverrride != null)
- ? searchCluster.estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
- : searchCluster.estimateHitsToFetch(neededHits, invokers.size());
+ ? estimateHitsToFetch(neededHits, invokers.size(), topkProbabilityOverrride)
+ : estimateHitsToFetch(neededHits, invokers.size());
}
query.setHits(q);
query.setOffset(0);
@@ -94,7 +103,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
for (SearchInvoker invoker : invokers) {
context = invoker.sendSearchRequest(query, context);
}
- timeoutHandler = createTimeoutHandler(searchCluster.dispatchConfig(), invokers.size(), query);
+ timeoutHandler = createTimeoutHandler(dispatchConfig, invokers.size(), query);
query.setHits(originalHits);
query.setOffset(originalOffset);
@@ -127,7 +136,7 @@ public class InterleavedSearchInvoker extends SearchInvoker implements ResponseM
groupingResultAggregator.toAggregatedHit().ifPresent(h -> result.getResult().hits().add(h));
insertNetworkErrors(result.getResult());
- CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)searchCluster.dispatchConfig().redundancy(), timeoutHandler);
+ CoverageAggregator adjusted = coverageAggregator.adjustedDegradedCoverage((int)dispatchConfig.redundancy(), timeoutHandler);
result.getResult().setCoverage(adjusted.createCoverage(timeoutHandler));
int needed = query.getOffset() + query.getHits();
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
index 02cf11c9fe7..caeae9c2c1d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/InvokerFactory.java
@@ -10,6 +10,7 @@ import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
import com.yahoo.search.result.Coverage;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.ArrayList;
import java.util.HashSet;
@@ -21,11 +22,16 @@ import java.util.Set;
* @author ollivir
*/
public abstract class InvokerFactory {
+ private static final double SKEW_FACTOR = 0.05;
- protected final SearchCluster searchCluster;
+ private final SearchCluster searchCluster;
+ private final DispatchConfig dispatchConfig;
+ private final TopKEstimator hitEstimator;
- public InvokerFactory(SearchCluster searchCluster) {
+ public InvokerFactory(SearchCluster searchCluster, DispatchConfig dispatchConfig) {
this.searchCluster = searchCluster;
+ this.dispatchConfig = dispatchConfig;
+ this.hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
}
protected abstract Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
@@ -90,7 +96,7 @@ public abstract class InvokerFactory {
if (invokers.size() == 1 && failed == null) {
return Optional.of(invokers.get(0));
} else {
- return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, searchCluster, group, failed));
+ return Optional.of(new InterleavedSearchInvoker(Timer.monotonic, invokers, hitEstimator, dispatchConfig, group, failed));
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java
new file mode 100644
index 00000000000..786b804edaa
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressPayload.java
@@ -0,0 +1,15 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.Compressor;
+import com.yahoo.search.Query;
+
+/**
+ * Interface for compressing and decompressing request/response
+ *
+ * @author baldersheim
+ */
+public interface CompressPayload {
+ Compressor.Compression compress(Query query, byte[] payload);
+ byte[] decompress(Client.ProtobufResponse response);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java
new file mode 100644
index 00000000000..9e7fc9b5b29
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/CompressService.java
@@ -0,0 +1,32 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.processing.request.CompoundName;
+import com.yahoo.search.Query;
+
+/**
+ * Implement interface to compress/decompress request/response
+ *
+ * @author baldersheim
+ */
+public class CompressService implements CompressPayload {
+ /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
+ public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 256);
+
+
+ @Override
+ public Compressor.Compression compress(Query query, byte[] payload) {
+ CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
+ return compressor.compress(compression, payload);
+ }
+
+ @Override
+ public byte[] decompress(Client.ProtobufResponse response) {
+ CompressionType compression = CompressionType.valueOf(response.compression());
+ return compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ }
+ Compressor compressor() { return compressor; }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
index 073d10b8f49..762438aa489 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcClient.java
@@ -18,7 +18,7 @@ import com.yahoo.jrt.Values;
*
* @author bratseth
*/
-class RpcClient implements Client {
+public class RpcClient implements Client {
private final Supervisor supervisor;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
new file mode 100644
index 00000000000..fd8e0e4f81a
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcConnectionPool.java
@@ -0,0 +1,11 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.dispatch.rpc;
+
+/**
+ * Interface for getting a connection given a node id.
+ *
+ * @author balderersheim
+ */
+public interface RpcConnectionPool {
+ Client.NodeConnection getConnection(int nodeId);
+}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
index a1ebea3b695..d5b1b876540 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java
@@ -9,6 +9,7 @@ import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.dispatch.SearchInvoker;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.SearchCluster;
+import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.Optional;
@@ -17,19 +18,18 @@ import java.util.Optional;
*/
public class RpcInvokerFactory extends InvokerFactory {
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
+ private final CompressPayload compressor;
- public RpcInvokerFactory(RpcResourcePool rpcResourcePool, SearchCluster searchCluster) {
- super(searchCluster);
+ public RpcInvokerFactory(RpcConnectionPool rpcResourcePool, SearchCluster searchCluster, DispatchConfig dispatchConfig) {
+ super(searchCluster, dispatchConfig);
this.rpcResourcePool = rpcResourcePool;
+ this.compressor = new CompressService();
}
@Override
- protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher,
- Query query,
- int maxHits,
- Node node) {
- return Optional.of(new RpcSearchInvoker(searcher, node, rpcResourcePool, maxHits));
+ protected Optional<SearchInvoker> createNodeSearchInvoker(VespaBackEndSearcher searcher, Query query, int maxHits, Node node) {
+ return Optional.of(new RpcSearchInvoker(searcher, compressor, node, rpcResourcePool, maxHits));
}
@Override
@@ -37,7 +37,6 @@ public class RpcInvokerFactory extends InvokerFactory {
Query query = result.getQuery();
boolean summaryNeedsQuery = searcher.summaryNeedsQuery(query);
- return new RpcProtobufFillInvoker(rpcResourcePool, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery);
+ return new RpcProtobufFillInvoker(rpcResourcePool, compressor, searcher.getDocumentDatabase(query), searcher.getServerId(), summaryNeedsQuery);
}
-
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index 44f0af2aca1..53dc54f7bc5 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -25,17 +25,19 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
private static final boolean triggeredClassLoading = ErrorMessage.createBackendCommunicationError("TriggerClassLoading") instanceof ErrorMessage;
private final Node node;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool connectionPool;
private final ClusterMonitor<Node> clusterMonitor;
private final long pingSequenceId;
private final PongHandler pongHandler;
+ private final Compressor compressor;
- public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool, PongHandler pongHandler) {
+ public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcConnectionPool connectionPool, PongHandler pongHandler, Compressor compressor) {
this.node = node;
- this.resourcePool = rpcResourcePool;
+ this.connectionPool = connectionPool;
this.clusterMonitor = clusterMonitor;
- pingSequenceId = node.createPingSequenceId();
+ this.pingSequenceId = node.createPingSequenceId();
this.pongHandler = pongHandler;
+ this. compressor = compressor;
}
@Override
@@ -63,16 +65,16 @@ public class RpcPing implements Pinger, Client.ResponseReceiver {
}
private void sendPing() {
- var connection = resourcePool.getConnection(node.key());
+ var connection = connectionPool.getConnection(node.key());
var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
- Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
+ Compressor.Compression compressionResult = compressor.compress(PING_COMPRESSION, ping);
connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), this, timeoutSeconds);
}
private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression, response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response.compressedPayload(), compression, response.uncompressedSize());
var reply = SearchProtocol.MonitorReply.parseFrom(responseBytes);
if (reply.getDistributionKey() != node.key()) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
index 2e690198c1c..31e152c6ff9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPingFactory.java
@@ -1,6 +1,8 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.searchcluster.Node;
import com.yahoo.search.dispatch.searchcluster.PingFactory;
@@ -9,15 +11,16 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler;
public class RpcPingFactory implements PingFactory {
- private final RpcResourcePool rpcResourcePool;
+ private final RpcConnectionPool rpcResourcePool;
+ private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 512);
- public RpcPingFactory(RpcResourcePool rpcResourcePool) {
+ public RpcPingFactory(RpcConnectionPool rpcResourcePool) {
this.rpcResourcePool = rpcResourcePool;
}
@Override
public Pinger createPinger(Node node, ClusterMonitor<Node> monitor, PongHandler pongHandler) {
- return new RpcPing(node, monitor, rpcResourcePool, pongHandler);
+ return new RpcPing(node, monitor, rpcResourcePool, pongHandler, compressor);
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
index c84795352f5..4e538fb54dc 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcProtobufFillInvoker.java
@@ -5,7 +5,6 @@ import ai.vespa.searchlib.searchprotocol.protobuf.SearchProtocol;
import com.google.protobuf.InvalidProtocolBufferException;
import com.yahoo.collections.ListMap;
import com.yahoo.collections.Pair;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.container.protect.Error;
import com.yahoo.data.access.Inspector;
@@ -44,9 +43,10 @@ public class RpcProtobufFillInvoker extends FillInvoker {
private static final Logger log = Logger.getLogger(RpcProtobufFillInvoker.class.getName());
private final DocumentDatabase documentDb;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final boolean summaryNeedsQuery;
private final String serverId;
+ private final CompressPayload compressor;
private BlockingQueue<Pair<Client.ResponseOrError<ProtobufResponse>, List<FastHit>>> responses;
@@ -56,11 +56,12 @@ public class RpcProtobufFillInvoker extends FillInvoker {
/** The number of responses we should receive (and process) before this is complete */
private int outstandingResponses;
- RpcProtobufFillInvoker(RpcResourcePool resourcePool, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
+ RpcProtobufFillInvoker(RpcConnectionPool resourcePool, CompressPayload compressor, DocumentDatabase documentDb, String serverId, boolean summaryNeedsQuery) {
this.documentDb = documentDb;
this.resourcePool = resourcePool;
this.serverId = serverId;
this.summaryNeedsQuery = summaryNeedsQuery;
+ this.compressor = compressor;
}
@Override
@@ -121,9 +122,8 @@ public class RpcProtobufFillInvoker extends FillInvoker {
ListMap<Integer, FastHit> hitsByNode = new ListMap<>();
for (Iterator<Hit> i = result.hits().unorderedDeepIterator(); i.hasNext();) {
Hit h = i.next();
- if (!(h instanceof FastHit))
+ if (!(h instanceof FastHit hit))
continue;
- FastHit hit = (FastHit) h;
hitsByNode.put(hit.getDistributionKey(), hit);
}
@@ -143,7 +143,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
}
Query query = result.getQuery();
- Compressor.Compression compressionResult = resourcePool.compress(query, payload);
+ Compressor.Compression compressionResult = compressor.compress(query, payload);
node.request(RPC_METHOD, compressionResult.type(), payload.length, compressionResult.data(),
roe -> receive(roe, hits), clientTimeout);
}
@@ -189,9 +189,7 @@ public class RpcProtobufFillInvoker extends FillInvoker {
hasReportedError = true;
} else {
Client.ProtobufResponse response = responseOrError.response().get();
- CompressionType compression = CompressionType.valueOf(response.compression());
- byte[] responseBytes = resourcePool.compressor().decompress(response.compressedPayload(), compression,
- response.uncompressedSize());
+ byte[] responseBytes = compressor.decompress(response);
return fill(result, hitsContext, summaryClass, responseBytes);
}
return 0;
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 7ecdb24c211..db95921c47b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -2,13 +2,6 @@
package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
-import com.yahoo.component.annotation.Inject;
-import com.yahoo.component.AbstractComponent;
-import com.yahoo.compress.CompressionType;
-import com.yahoo.compress.Compressor;
-import com.yahoo.compress.Compressor.Compression;
-import com.yahoo.processing.request.CompoundName;
-import com.yahoo.search.Query;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
import com.yahoo.vespa.config.search.DispatchConfig;
@@ -26,51 +19,37 @@ import java.util.concurrent.ThreadLocalRandom;
*
* @author ollivir
*/
-public class RpcResourcePool extends AbstractComponent {
-
- /** The compression method which will be used with rpc dispatch. "lz4" (default) and "none" is supported. */
- public final static CompoundName dispatchCompression = new CompoundName("dispatch.compression");
-
- private final Compressor compressor = new Compressor(CompressionType.LZ4, 5, 0.95, 32);
+public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
- private final RpcClient client;
+ private final RpcClient rpcClient;
RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) {
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
this.nodeConnectionPools = builder.build();
- client = null;
+ this.rpcClient = null;
}
- @Inject
public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
super();
- client = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
+ rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
// Create rpc node connection pools indexed by the node distribution key
+ int numConnections = dispatchConfig.numJrtConnectionsPerNode();
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
- var numConnections = dispatchConfig.numJrtConnectionsPerNode();
for (var node : nodesConfig.node()) {
var connections = new ArrayList<NodeConnection>(numConnections);
for (int i = 0; i < numConnections; i++) {
- connections.add(client.createConnection(node.host(), node.port()));
+ connections.add(rpcClient.createConnection(node.host(), node.port()));
}
builder.put(node.key(), new NodeConnectionPool(connections));
}
this.nodeConnectionPools = builder.build();
}
- public Compressor compressor() {
- return compressor;
- }
-
- public Compression compress(Query query, byte[] payload) {
- CompressionType compression = CompressionType.valueOf(query.properties().getString(dispatchCompression, "LZ4").toUpperCase());
- return compressor.compress(compression, payload);
- }
-
+ @Override
public NodeConnection getConnection(int nodeId) {
var pool = nodeConnectionPools.get(nodeId);
if (pool == null) {
@@ -81,11 +60,10 @@ public class RpcResourcePool extends AbstractComponent {
}
@Override
- public void deconstruct() {
- super.deconstruct();
+ public void close() {
nodeConnectionPools.values().forEach(NodeConnectionPool::release);
- if (client != null) {
- client.close();
+ if (rpcClient != null) {
+ rpcClient.close();
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
index 64e0dd666dd..31de0b147c9 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcSearchInvoker.java
@@ -1,7 +1,6 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch.rpc;
-import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.Query;
@@ -29,19 +28,21 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
private final VespaBackEndSearcher searcher;
private final Node node;
- private final RpcResourcePool resourcePool;
+ private final RpcConnectionPool resourcePool;
private final BlockingQueue<Client.ResponseOrError<ProtobufResponse>> responses;
private final int maxHits;
+ private final CompressPayload compressor;
private Query query;
- RpcSearchInvoker(VespaBackEndSearcher searcher, Node node, RpcResourcePool resourcePool, int maxHits) {
+ RpcSearchInvoker(VespaBackEndSearcher searcher, CompressPayload compressor, Node node, RpcConnectionPool resourcePool, int maxHits) {
super(Optional.of(node));
this.searcher = searcher;
this.node = node;
this.resourcePool = resourcePool;
this.responses = new LinkedBlockingQueue<>(1);
this.maxHits = maxHits;
+ this.compressor = compressor;
}
@Override
@@ -78,7 +79,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
if (incomingContext instanceof RpcContext)
return (RpcContext)incomingContext;
- return new RpcContext(resourcePool, query,
+ return new RpcContext(compressor, query,
ProtobufSerialization.serializeSearchRequest(query,
Math.min(query.getHits(), maxHits),
searcher.getServerId(), requestTimeout));
@@ -110,8 +111,7 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
}
ProtobufResponse protobufResponse = response.response().get();
- CompressionType compression = CompressionType.valueOf(protobufResponse.compression());
- byte[] payload = resourcePool.compressor().decompress(protobufResponse.compressedPayload(), compression, protobufResponse.uncompressedSize());
+ byte[] payload = compressor.decompress(protobufResponse);
return ProtobufSerialization.deserializeToSearchResult(payload, query, searcher, node.pathIndex(), node.key());
}
@@ -133,8 +133,8 @@ public class RpcSearchInvoker extends SearchInvoker implements Client.ResponseRe
final Compressor.Compression compressedPayload;
- RpcContext(RpcResourcePool resourcePool, Query query, byte[] payload) {
- compressedPayload = resourcePool.compress(query, payload);
+ RpcContext(CompressPayload compressor, Query query, byte[] payload) {
+ compressedPayload = compressor.compress(query, payload);
}
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 54a3e42b9ab..ca2fce0b32b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -8,7 +8,6 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
-import com.yahoo.search.dispatch.TopKEstimator;
import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
@@ -29,16 +28,14 @@ public class SearchCluster implements NodeManager<Node> {
private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
- private final DispatchConfig dispatchConfig;
+ private final double minActivedocsPercentage;
private final String clusterId;
+ private final VipStatus vipStatus;
+ private final PingFactory pingFactory;
private final Map<Integer, Group> groups;
private final List<Group> orderedGroups;
private final List<Node> nodes;
- private final VipStatus vipStatus;
- private final PingFactory pingFactory;
- private final TopKEstimator hitEstimator;
private long nextLogTime = 0;
- private static final double SKEW_FACTOR = 0.05;
/**
* A search node on this local machine having the entire corpus, which we therefore
@@ -48,13 +45,13 @@ public class SearchCluster implements NodeManager<Node> {
* if it only queries this cluster when the local node cannot be used, to avoid unnecessary
* cross-node network traffic.
*/
- private final Optional<Node> localCorpusDispatchTarget;
+ private final Node localCorpusDispatchTarget;
public SearchCluster(String clusterId, DispatchConfig dispatchConfig,
DispatchNodesConfig nodesConfig,
VipStatus vipStatus, PingFactory pingFactory) {
this.clusterId = clusterId;
- this.dispatchConfig = dispatchConfig;
+ this.minActivedocsPercentage = dispatchConfig.minActivedocsPercentage();
this.vipStatus = vipStatus;
this.pingFactory = pingFactory;
@@ -71,7 +68,6 @@ public class SearchCluster implements NodeManager<Node> {
nodes.forEach(node -> groupIntroductionOrder.put(node.group(), groups.get(node.group())));
this.orderedGroups = List.copyOf(groupIntroductionOrder.values());
- hitEstimator = new TopKEstimator(30.0, dispatchConfig.topKProbability(), SKEW_FACTOR);
this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), nodes, groups);
}
@@ -85,7 +81,7 @@ public class SearchCluster implements NodeManager<Node> {
}
}
- private static Optional<Node> findLocalCorpusDispatchTarget(String selfHostname,
+ private static Node findLocalCorpusDispatchTarget(String selfHostname,
List<Node> nodes,
Map<Integer, Group> groups) {
// A search node in the search cluster in question is configured on the same host as the currently running container.
@@ -96,15 +92,15 @@ public class SearchCluster implements NodeManager<Node> {
.filter(node -> node.hostname().equals(selfHostname))
.toList();
// Only use direct dispatch if we have exactly 1 search node on the same machine:
- if (localSearchNodes.size() != 1) return Optional.empty();
+ if (localSearchNodes.size() != 1) return null;
Node localSearchNode = localSearchNodes.iterator().next();
Group localSearchGroup = groups.get(localSearchNode.group());
// Only use direct dispatch if the local search node has the entire corpus
- if (localSearchGroup.nodes().size() != 1) return Optional.empty();
+ if (localSearchGroup.nodes().size() != 1) return null;
- return Optional.of(localSearchNode);
+ return localSearchNode;
}
private static List<Node> toNodes(DispatchNodesConfig nodesConfig) {
@@ -113,13 +109,6 @@ public class SearchCluster implements NodeManager<Node> {
.toList();
}
- public DispatchConfig dispatchConfig() {
- return dispatchConfig;
- }
-
- /** Returns an immutable list of all nodes in this. */
- public List<Node> nodes() { return nodes; }
-
/** Returns the groups of this cluster as an immutable map indexed by group id */
public Map<Integer, Group> groups() { return groups; }
@@ -148,16 +137,16 @@ public class SearchCluster implements NodeManager<Node> {
* or empty if we should not dispatch directly.
*/
public Optional<Node> localCorpusDispatchTarget() {
- if ( localCorpusDispatchTarget.isEmpty()) return Optional.empty();
+ if ( localCorpusDispatchTarget == null) return Optional.empty();
// Only use direct dispatch if the local group has sufficient coverage
- Group localSearchGroup = groups().get(localCorpusDispatchTarget.get().group());
+ Group localSearchGroup = groups().get(localCorpusDispatchTarget.group());
if ( ! localSearchGroup.hasSufficientCoverage()) return Optional.empty();
// Only use direct dispatch if the local search node is not down
- if ( localCorpusDispatchTarget.get().isWorking() == Boolean.FALSE) return Optional.empty();
+ if ( localCorpusDispatchTarget.isWorking() == Boolean.FALSE) return Optional.empty();
- return localCorpusDispatchTarget;
+ return Optional.of(localCorpusDispatchTarget);
}
private void updateWorkingState(Node node, boolean isWorking) {
@@ -185,7 +174,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateVipStatusOnNodeChange(Node node, boolean nodeIsWorking) {
- if (localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if (localCorpusDispatchTarget == null) { // consider entire cluster
if (hasInformationAboutAllNodes())
setInRotationOnlyIf(hasWorkingNodes());
}
@@ -198,7 +187,7 @@ public class SearchCluster implements NodeManager<Node> {
}
private void updateVipStatusOnCoverageChange(Group group, boolean sufficientCoverage) {
- if ( localCorpusDispatchTarget.isEmpty()) { // consider entire cluster
+ if ( localCorpusDispatchTarget == null) { // consider entire cluster
// VIP status does not depend on coverage
}
else if (usesLocalCorpusIn(group)) { // follow the status of this group
@@ -213,13 +202,6 @@ public class SearchCluster implements NodeManager<Node> {
vipStatus.removeFromRotation(clusterId);
}
- public int estimateHitsToFetch(int wantedHits, int numPartitions) {
- return hitEstimator.estimateK(wantedHits, numPartitions);
- }
- public int estimateHitsToFetch(int wantedHits, int numPartitions, double topKProbability) {
- return hitEstimator.estimateK(wantedHits, numPartitions, topKProbability);
- }
-
public boolean hasInformationAboutAllNodes() {
return nodes.stream().allMatch(node -> node.isWorking() != null);
}
@@ -229,11 +211,11 @@ public class SearchCluster implements NodeManager<Node> {
}
private boolean usesLocalCorpusIn(Node node) {
- return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().equals(node);
+ return node.equals(localCorpusDispatchTarget);
}
private boolean usesLocalCorpusIn(Group group) {
- return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
+ return (localCorpusDispatchTarget != null) && localCorpusDispatchTarget.group() == group.id();
}
/** Used by the cluster monitor to manage node status */
@@ -286,7 +268,7 @@ public class SearchCluster implements NodeManager<Node> {
private boolean isGroupCoverageSufficient(long activeDocuments, long medianDocuments) {
double documentCoverage = 100.0 * (double) activeDocuments / medianDocuments;
- if (medianDocuments > 0 && documentCoverage < dispatchConfig.minActivedocsPercentage())
+ if (medianDocuments > 0 && documentCoverage < minActivedocsPercentage)
return false;
return true;
}