diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 29 |
1 files changed, 12 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index eca0c8058a1..43d0e08886d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent { private final DispatchConfig dispatchConfig; private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; + private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; - final ClusterMonitor<Node> clusterMonitor; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; - this.clusterMonitor = clusterMonitor; } private void countDown() { @@ -128,7 +127,7 @@ public class Dispatcher extends AbstractComponent { Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); - this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. + this.clusterMonitor.start(); // Populate nodes to monitor before starting it. } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, @@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent { this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; this.invokerFactories = invokerFactories; - this.volatileItems = update(clusterMonitor); + this.clusterMonitor = clusterMonitor; + this.volatileItems = update(); searchCluster.addMonitoring(clusterMonitor); } @@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent { * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; - * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated. * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible @@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent { }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. - ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage()); + searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(newMonitor); - - // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. - items.get().clusterMonitor.shutdown(); + this.volatileItems = update(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { - var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), - clusterMonitor); - return items; + private VolatileItems update() { + return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig)); } private void initialWarmup(double warmupTime) { @@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - volatileItems.clusterMonitor.shutdown(); + clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } |