diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-23 11:35:03 +0100 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-11-23 11:36:28 +0100 |
commit | 0a3d6beec5b28150694297a99716ea23732dc6c5 (patch) | |
tree | 5437b4e56ef6fe10084b99fe22966cd7a9d9deab /container-search | |
parent | ef637d4a7236d6570c748ba5782e0435f628bd9a (diff) |
Put the RpcClient back in the RpcResourcePool where it belongs.
Remove parts of schema test no longer valid.
Diffstat (limited to 'container-search')
5 files changed, 28 insertions, 33 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index a862e6c2d98..0b627e91bc5 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -111,7 +111,7 @@ public class ClusterMonitor<T> { /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List<BaseNodeMonitor<T>> nodeMonitors() { - return new java.util.ArrayList<>(nodeMonitors.values()); + return List.copyOf(nodeMonitors.values()); } /** Must be called when this goes out of use */ diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index b7a2d9f70ba..55073f25f0e 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -12,7 +12,6 @@ import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.SearchPath.InvalidSearchPathException; -import com.yahoo.search.dispatch.rpc.RpcClient; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -58,7 +57,6 @@ public class Dispatcher extends AbstractComponent { private final ClusterMonitor<Node> clusterMonitor; private final LoadBalancer loadBalancer; private final InvokerFactory invokerFactory; - private final RpcClient rpcClient; private final int maxHitsPerNode; private final RpcResourcePool rpcResourcePool; @@ -80,29 +78,24 @@ public class Dispatcher extends AbstractComponent { DispatchNodesConfig nodesConfig, VipStatus vipStatus) { this(clusterId, dispatchConfig, nodesConfig, vipStatus, - new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads())); - } - private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, - DispatchNodesConfig nodesConfig, VipStatus vipStatus, RpcClient rpcClient) { - this(clusterId, dispatchConfig, nodesConfig, vipStatus, rpcClient, - new RpcResourcePool(rpcClient, nodesConfig, dispatchConfig.numJrtConnectionsPerNode())); + new RpcResourcePool(dispatchConfig, nodesConfig)); } private Dispatcher(ComponentId clusterId, DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig, VipStatus vipStatus, - RpcClient rpcClient, RpcResourcePool resourcePool) { + RpcResourcePool resourcePool) { this(new SearchCluster(clusterId.stringValue(), dispatchConfig, nodesConfig, vipStatus, new RpcPingFactory(resourcePool)), - dispatchConfig, rpcClient, resourcePool); + dispatchConfig, resourcePool); } private Dispatcher(SearchCluster searchCluster, DispatchConfig dispatchConfig, - RpcClient rpcClient, RpcResourcePool rpcResourcePool) { + RpcResourcePool rpcResourcePool) { this(new ClusterMonitor<>(searchCluster, true), searchCluster, dispatchConfig, new RpcInvokerFactory(rpcResourcePool, searchCluster, dispatchConfig), - rpcClient, rpcResourcePool); + rpcResourcePool); } /* Protected for simple mocking in tests. Beware that searchCluster is shutdown on in deconstruct() */ @@ -110,21 +103,19 @@ public class Dispatcher extends AbstractComponent { SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory) { - this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null, null); + this(clusterMonitor, searchCluster, dispatchConfig, invokerFactory, null); } private Dispatcher(ClusterMonitor<Node> clusterMonitor, SearchCluster searchCluster, DispatchConfig dispatchConfig, InvokerFactory invokerFactory, - RpcClient rpcClient, RpcResourcePool rpcResourcePool) { this.searchCluster = searchCluster; this.clusterMonitor = clusterMonitor; this.loadBalancer = new LoadBalancer(searchCluster, toLoadBalancerPolicy(dispatchConfig.distributionPolicy())); this.invokerFactory = invokerFactory; - this.rpcClient = rpcClient; this.rpcResourcePool = rpcResourcePool; this.maxHitsPerNode = dispatchConfig.maxHitsPerNode(); searchCluster.addMonitoring(clusterMonitor); @@ -173,9 +164,6 @@ public class Dispatcher extends AbstractComponent { if (rpcResourcePool != null) { rpcResourcePool.close(); } - if (rpcClient != null) { - rpcClient.close(); - } } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { @@ -183,7 +171,8 @@ public class Dispatcher extends AbstractComponent { } public SearchInvoker getSearchInvoker(Query query, VespaBackEndSearcher searcher) { - SearchInvoker invoker = getSearchPathInvoker(query, searcher).orElseGet(() -> getInternalInvoker(query, searcher)); + SearchCluster cluster = searchCluster; // Take a snapshot + SearchInvoker invoker = getSearchPathInvoker(query, searcher, cluster).orElseGet(() -> getInternalInvoker(query, searcher, cluster)); if (query.properties().getBoolean(com.yahoo.search.query.Model.ESTIMATE)) { query.setHits(0); @@ -193,12 +182,12 @@ public class Dispatcher extends AbstractComponent { } /** Builds an invoker based on searchpath */ - private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher) { + private Optional<SearchInvoker> getSearchPathInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster) { String searchPath = query.getModel().getSearchPath(); if (searchPath == null) return Optional.empty(); try { - List<Node> nodes = SearchPath.selectNodes(searchPath, searchCluster); + List<Node> nodes = SearchPath.selectNodes(searchPath, cluster); if (nodes.isEmpty()) return Optional.empty(); query.trace(false, 2, "Dispatching with search path ", searchPath); @@ -212,8 +201,8 @@ public class Dispatcher extends AbstractComponent { } } - private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher) { - Optional<Node> directNode = searchCluster.localCorpusDispatchTarget(); + private SearchInvoker getInternalInvoker(Query query, VespaBackEndSearcher searcher, SearchCluster cluster) { + Optional<Node> directNode = cluster.localCorpusDispatchTarget(); if (directNode.isPresent()) { Node node = directNode.get(); query.trace(false, 2, "Dispatching to ", node); @@ -225,10 +214,10 @@ public class Dispatcher extends AbstractComponent { .orElseThrow(() -> new IllegalStateException("Could not dispatch directly to " + node)); } - int covered = searchCluster.groupsWithSufficientCoverage(); - int groups = searchCluster.orderedGroups().size(); + int covered = cluster.groupsWithSufficientCoverage(); + int groups = cluster.orderedGroups().size(); int max = Integer.min(Integer.min(covered + 1, groups), MAX_GROUP_SELECTION_ATTEMPTS); - Set<Integer> rejected = rejectGroupBlockingFeed(searchCluster.orderedGroups()); + Set<Integer> rejected = rejectGroupBlockingFeed(cluster.orderedGroups()); for (int i = 0; i < max; i++) { Optional<Group> groupInCluster = loadBalancer.takeGroup(rejected); if (groupInCluster.isEmpty()) break; // No groups available diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java index 71e8cc0baa8..db95921c47b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcResourcePool.java @@ -4,6 +4,7 @@ package com.yahoo.search.dispatch.rpc; import com.google.common.collect.ImmutableMap; import com.yahoo.search.dispatch.FillInvoker; import com.yahoo.search.dispatch.rpc.Client.NodeConnection; +import com.yahoo.vespa.config.search.DispatchConfig; import com.yahoo.vespa.config.search.DispatchNodesConfig; import java.util.ArrayList; @@ -22,22 +23,26 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { /** Connections to the search nodes this talks to, indexed by node id ("partid") */ private final ImmutableMap<Integer, NodeConnectionPool> nodeConnectionPools; + private final RpcClient rpcClient; RpcResourcePool(Map<Integer, NodeConnection> nodeConnections) { var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); nodeConnections.forEach((key, connection) -> builder.put(key, new NodeConnectionPool(Collections.singletonList(connection)))); this.nodeConnectionPools = builder.build(); + this.rpcClient = null; } - public RpcResourcePool(RpcClient client, DispatchNodesConfig nodesConfig, int numConnections) { + public RpcResourcePool(DispatchConfig dispatchConfig, DispatchNodesConfig nodesConfig) { super(); + rpcClient = new RpcClient("dispatch-client", dispatchConfig.numJrtTransportThreads()); // Create rpc node connection pools indexed by the node distribution key + int numConnections = dispatchConfig.numJrtConnectionsPerNode(); var builder = new ImmutableMap.Builder<Integer, NodeConnectionPool>(); for (var node : nodesConfig.node()) { var connections = new ArrayList<NodeConnection>(numConnections); for (int i = 0; i < numConnections; i++) { - connections.add(client.createConnection(node.host(), node.port())); + connections.add(rpcClient.createConnection(node.host(), node.port())); } builder.put(node.key(), new NodeConnectionPool(connections)); } @@ -57,6 +62,9 @@ public class RpcResourcePool implements RpcConnectionPool, AutoCloseable { @Override public void close() { nodeConnectionPools.values().forEach(NodeConnectionPool::release); + if (rpcClient != null) { + rpcClient.close(); + } } private static class NodeConnectionPool { diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java index be23c21eaf6..2f960add4a8 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/FastSearcherTestCase.java @@ -13,7 +13,6 @@ import com.yahoo.prelude.fastsearch.SummaryParameters; import com.yahoo.search.Query; import com.yahoo.search.Result; import com.yahoo.search.Searcher; -import com.yahoo.search.dispatch.rpc.RpcClient; import com.yahoo.search.dispatch.rpc.RpcResourcePool; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.grouping.GroupingRequest; @@ -186,7 +185,7 @@ public class FastSearcherTestCase { b.searchcluster(searchClusterB); VipStatus vipStatus = new VipStatus(b.build()); List<Node> nodes_1 = ImmutableList.of(new Node(0, "host0", 0)); - RpcResourcePool rpcPool_1 = new RpcResourcePool(new RpcClient("rpcclient", 1), MockDispatcher.toNodesConfig(nodes_1), 2); + RpcResourcePool rpcPool_1 = new RpcResourcePool(MockDispatcher.toDispatchConfig(), MockDispatcher.toNodesConfig(nodes_1)); MockDispatcher dispatch_1 = MockDispatcher.create(nodes_1, rpcPool_1, vipStatus); dispatch_1.clusterMonitor.shutdown(); vipStatus.addToRotation(clusterName); diff --git a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java index 10e188d092c..a444159952d 100644 --- a/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java +++ b/container-search/src/test/java/com/yahoo/prelude/fastsearch/test/MockDispatcher.java @@ -4,7 +4,6 @@ package com.yahoo.prelude.fastsearch.test; import com.yahoo.container.handler.VipStatus; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.Dispatcher; -import com.yahoo.search.dispatch.rpc.RpcClient; import com.yahoo.search.dispatch.rpc.RpcInvokerFactory; import com.yahoo.search.dispatch.rpc.RpcPingFactory; import com.yahoo.search.dispatch.rpc.RpcResourcePool; @@ -20,7 +19,7 @@ class MockDispatcher extends Dispatcher { public final ClusterMonitor clusterMonitor; public static MockDispatcher create(List<Node> nodes) { - var rpcResourcePool = new RpcResourcePool(new RpcClient("rpcclient", 1), toNodesConfig(nodes), 2); + var rpcResourcePool = new RpcResourcePool(toDispatchConfig(), toNodesConfig(nodes)); return create(nodes, rpcResourcePool, new VipStatus()); } |