diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 37 |
1 files changed, 16 insertions, 21 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index 6f6b0fc2b79..43d0e08886d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent { private final DispatchConfig dispatchConfig; private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; + private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; - final ClusterMonitor<Node> clusterMonitor; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; - this.clusterMonitor = clusterMonitor; } private void countDown() { @@ -121,14 +120,14 @@ public class Dispatcher extends AbstractComponent { DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(), - toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), + toNodes(clusterId.stringValue(), nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)), invokerFactories); } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); - this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. + this.clusterMonitor.start(); // Populate nodes to monitor before starting it. } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, @@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent { this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; this.invokerFactories = invokerFactories; - this.volatileItems = update(clusterMonitor); + this.clusterMonitor = clusterMonitor; + this.volatileItems = update(); searchCluster.addMonitoring(clusterMonitor); } @@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent { * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; - * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated. * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible @@ -180,7 +180,7 @@ public class Dispatcher extends AbstractComponent { * under the assumption that this is the common case, i.e., new nodes have no documents yet. */ void updateWithNewConfig(DispatchNodesConfig nodesConfig) { - try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up. + try (var items = volatileItems()) { // Mark a reference to the old snapshot, which we want to have cleaned up. items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0. // Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do. @@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent { }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. - ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage()); + searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(newMonitor); - - // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. - items.get().clusterMonitor.shutdown(); + this.volatileItems = update(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { - var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), - clusterMonitor); - return items; + private VolatileItems update() { + return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig)); } private void initialWarmup(double warmupTime) { @@ -234,9 +229,9 @@ public class Dispatcher extends AbstractComponent { case LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME; }; } - private static List<Node> toNodes(DispatchNodesConfig nodesConfig) { + private static List<Node> toNodes(String clusterName, DispatchNodesConfig nodesConfig) { return nodesConfig.node().stream() - .map(n -> new Node(n.key(), n.host(), n.group())) + .map(n -> new Node(clusterName, n.key(), n.host(), n.group())) .toList(); } @@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - volatileItems.clusterMonitor.shutdown(); + clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } |