summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-11-23 11:35:03 +0100
committerHenning Baldersheim <balder@yahoo-inc.com>2022-11-23 11:36:28 +0100
commit0a3d6beec5b28150694297a99716ea23732dc6c5 (patch)
tree5437b4e56ef6fe10084b99fe22966cd7a9d9deab /container-search/src/main/java/com/yahoo/search
parentef637d4a7236d6570c748ba5782e0435f628bd9a (diff)
Put the RpcClient back in the RpcResourcePool where it belongs.
Remove parts of schema test no longer valid.
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java2
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java41
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java12
3 files changed, 26 insertions, 29 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index a862e6c2d98..0b627e91bc5 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -111,7 +111,7 @@ public class ClusterMonitor<T> {
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
public List<BaseNodeMonitor<T>> nodeMonitors() {
- return new java.util.ArrayList<>(nodeMonitors.values());
+ return List.copyOf(nodeMonitors.values());
}
/** Must be called when this goes out of use */
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index b7a2d9f70ba..55073f25f0e 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -12,7 +12,6 @@ import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException;
-import com.yahoo.search.dispatch.rpc.RpcClient;
import com.yahoo.search.dispatch.rpc.RpcInvokerFactory;
import com.yahoo.search.dispatch.rpc.RpcPingFactory;
import com.yahoo.search.dispatch.rpc.RpcResourcePool;
@@ -58,7 +57,6 @@ public class Dispatcher extends AbstractComponent {
private final ClusterMonitor<Node> clusterMonitor;
private final LoadBalancer loadBalancer;
private final InvokerFactory invokerFactory;
- private final RpcClient rpcClient;
private final int maxHitsPerNode;
private final RpcResourcePool rpcResourcePool;
@@ -80,29 +78,24 @@ public class Dispatcher extends AbstractComponent {
DispatchNodesConfig nodesConfig,
VipStatus vipStatus) {
this(clusterId, dispatchConfig, nodesConfig, vipStatus,
- new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()));
- }
- private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
- DispatchNodesConfig nodesConfig, VipStatus vipStatus, RpcClient rpcClient) {
- this(clusterId, dispatchConfig, nodesConfig, vipStatus, rpcClient,
- new RpcResourcePool(rpcClient, nodesConfig, dispatchConfig.numJrtConnectionsPerNode()));
+ new RpcResourcePool(dispatchConfig, nodesConfig));
}
private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig,
DispatchNodesConfig nodesConfig, VipStatus vipStatus,
- RpcClient rpcClient, RpcResourcePool resourcePool) {
+ RpcResourcePool resourcePool) {
this(new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig,
vipStatus, new RpcPingFactory(resourcePool)),
- dispatchConfig, rpcClient, resourcePool);
+ dispatchConfig, resourcePool);
}
private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig,
- RpcClient rpcClient, RpcResourcePool rpcResourcePool) {
+ RpcResourcePool rpcResourcePool) {
this(new ClusterMonitor<>(searchCluster, true),
searchCluster, dispatchConfig,
new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig),
- rpcClient, rpcResourcePool);
+ rpcResourcePool);
}
/* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */
@@ -110,21 +103,19 @@ public class Dispatcher extends AbstractComponent {
SearchCluster searchCluster,
DispatchConfig dispatchConfig,
InvokerFactory invokerFactory) {
- this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null, null);
+ this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null);
}
private Dispatcher(ClusterMonitor<Node> clusterMonitor,
SearchCluster searchCluster,
DispatchConfig dispatchConfig,
InvokerFactory invokerFactory,
- RpcClient rpcClient,
RpcResourcePool rpcResourcePool) {
this.searchCluster = searchCluster;
this.clusterMonitor = clusterMonitor;
this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy()));
this.invokerFactory = invokerFactory;
- this.rpcClient = rpcClient;
this.rpcResourcePool = rpcResourcePool;
this.maxHitsPerNode = dispatchConfig.maxHitsPerNode();
searchCluster.addMonitoring(clusterMonitor);
@@ -173,9 +164,6 @@ public class Dispatcher extends AbstractComponent {
if (rpcResourcePool != null) {
rpcResourcePool.close();
}
- if (rpcClient != null) {
- rpcClient.close();
- }
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
@@ -183,7 +171,8 @@ public class Dispatcher extends AbstractComponent {
}
public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) {
- SearchInvoker invoker = getSearchPathInvoker(query, searcher).orElseGet(() -> getInternalInvoker(query, searcher));
+ SearchCluster cluster = searchCluster; // Take a snapshot
+ SearchInvoker invoker = getSearchPathInvoker(query, searcher, cluster).orElseGet(() -> getInternalInvoker(query, searcher, cluster));
if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) {
query.setHits(0);
@@ -193,12 +182,12 @@ public class Dispatcher extends AbstractComponent {
}
/** Builds an invoker based on searchpath */
- private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher) {
+ private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster) {
String searchPath = query.getModel().getSearchPath();
if (searchPath == null) return Optional.empty();
try {
- List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster);
+ List<Node> nodes = SearchPath.selectNodes(searchPath, cluster);
if (nodes.isEmpty()) return Optional.empty();
query.trace(false, 2, "Dispatching with search path ", searchPath);
@@ -212,8 +201,8 @@ public class Dispatcher extends AbstractComponent {
}
}
- private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher) {
- Optional<Node> directNode = searchCluster.localCorpusDispatchTarget();
+ private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster) {
+ Optional<Node> directNode = cluster.localCorpusDispatchTarget();
if (directNode.isPresent()) {
Node node = directNode.get();
query.trace(false, 2, "Dispatching to ", node);
@@ -225,10 +214,10 @@ public class Dispatcher extends AbstractComponent {
.orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node));
}
- int covered = searchCluster.groupsWithSufficientCoverage();
- int groups = searchCluster.orderedGroups().size();
+ int covered = cluster.groupsWithSufficientCoverage();
+ int groups = cluster.orderedGroups().size();
int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS);
- Set<Integer> rejected = rejectGroupBlockingFeed(searchCluster.orderedGroups());
+ Set<Integer> rejected = rejectGroupBlockingFeed(cluster.orderedGroups());
for (int i = 0; i < max; i++) {
Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected);
if (groupInCluster.isEmpty()) break; // No groups available
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
index 71e8cc0baa8..db95921c47b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java
@@ -4,6 +4,7 @@ package com.yahoo.search.dispatch.rpc;
import com.google.common.collect.ImmutableMap;
import com.yahoo.search.dispatch.FillInvoker;
import com.yahoo.search.dispatch.rpc.Client.NodeConnection;
+import com.yahoo.vespa.config.search.DispatchConfig;
import com.yahoo.vespa.config.search.DispatchNodesConfig;
import java.util.ArrayList;
@@ -22,22 +23,26 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
/** Connections to the search nodes this talks to, indexed by node id ("partid") */
private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools;
+ private final RpcClient rpcClient;
RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) {
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection))));
this.nodeConnectionPools = builder.build();
+ this.rpcClient = null;
}
- public RpcResourcePool(RpcClient client, DispatchNodesConfig nodesConfig, int numConnections) {
+ public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) {
super();
+ rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads());
// Create rpc node connection pools indexed by the node distribution key
+ int numConnections = dispatchConfig.numJrtConnectionsPerNode();
var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>();
for (var node : nodesConfig.node()) {
var connections = new ArrayList<NodeConnection>(numConnections);
for (int i = 0; i < numConnections; i++) {
- connections.add(client.createConnection(node.host(), node.port()));
+ connections.add(rpcClient.createConnection(node.host(), node.port()));
}
builder.put(node.key(), new NodeConnectionPool(connections));
}
@@ -57,6 +62,9 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable {
@Override
public void close() {
nodeConnectionPools.values().forEach(NodeConnectionPool::release);
+ if (rpcClient != null) {
+ rpcClient.close();
+ }
}
private static class NodeConnectionPool {