summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java29
1 files changed, 12 insertions, 17 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index eca0c8058a1..43d0e08886d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent {
private final DispatchConfig dispatchConfig;
private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
+ private final ClusterMonitor<Node> clusterMonitor;
private volatile VolatileItems volatileItems;
private static class VolatileItems {
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
- final ClusterMonitor<Node> clusterMonitor;
final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
Runnable cleanup = () -> { };
- VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) {
+ VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
- this.clusterMonitor = clusterMonitor;
}
private void countDown() {
@@ -128,7 +127,7 @@ public class Dispatcher extends AbstractComponent {
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories);
- this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it.
+ this.clusterMonitor.start(); // Populate nodes to monitor before starting it.
}
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
@@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent {
this.rpcResourcePool = rpcConnectionPool;
this.searchCluster = searchCluster;
this.invokerFactories = invokerFactories;
- this.volatileItems = update(clusterMonitor);
+ this.clusterMonitor = clusterMonitor;
+ this.volatileItems = update();
searchCluster.addMonitoring(clusterMonitor);
}
@@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent {
* 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it;
* it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot.
* 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time;
- * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that.
+ * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated.
* 5. The search cluster is owned by the dispatcher, and is updated on node set changes;
* its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher,
* as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible
@@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent {
};
// Update the nodes the search cluster keeps track of, and what nodes are monitored.
- ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage());
+ searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
- this.volatileItems = update(newMonitor);
-
- // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
- items.get().clusterMonitor.shutdown();
+ this.volatileItems = update();
} // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
}
- private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
- var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
- clusterMonitor);
- return items;
+ private VolatileItems update() {
+ return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig));
}
private void initialWarmup(double warmupTime) {
@@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
- volatileItems.clusterMonitor.shutdown();
+ clusterMonitor.shutdown();
if (rpcResourcePool != null) {
rpcResourcePool.close();
}