aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java37
1 files changed, 16 insertions, 21 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index 6f6b0fc2b79..43d0e08886d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent {
private final DispatchConfig dispatchConfig;
private final RpcConnectionPool rpcResourcePool;
private final SearchCluster searchCluster;
+ private final ClusterMonitor<Node> clusterMonitor;
private volatile VolatileItems volatileItems;
private static class VolatileItems {
final LoadBalancer loadBalancer;
final InvokerFactory invokerFactory;
- final ClusterMonitor<Node> clusterMonitor;
final AtomicInteger inflight = new AtomicInteger(1); // Initial reference.
Runnable cleanup = () -> { };
- VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) {
+ VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) {
this.loadBalancer = loadBalancer;
this.invokerFactory = invokerFactory;
- this.clusterMonitor = clusterMonitor;
}
private void countDown() {
@@ -121,14 +120,14 @@ public class Dispatcher extends AbstractComponent {
DispatchNodesConfig nodesConfig, VipStatus vipStatus, InvokerFactoryFactory invokerFactories) {
this(dispatchConfig, rpcConnectionPool,
new SearchCluster(clusterId.stringValue(), dispatchConfig.minActivedocsPercentage(),
- toNodes(nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
+ toNodes(clusterId.stringValue(), nodesConfig), vipStatus, new RpcPingFactory(rpcConnectionPool)),
invokerFactories);
}
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) {
this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories);
- this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it.
+ this.clusterMonitor.start(); // Populate nodes to monitor before starting it.
}
Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool,
@@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent {
this.rpcResourcePool = rpcConnectionPool;
this.searchCluster = searchCluster;
this.invokerFactories = invokerFactories;
- this.volatileItems = update(clusterMonitor);
+ this.clusterMonitor = clusterMonitor;
+ this.volatileItems = update();
searchCluster.addMonitoring(clusterMonitor);
}
@@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent {
* 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it;
* it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot.
* 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time;
- * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that.
+ * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated.
* 5. The search cluster is owned by the dispatcher, and is updated on node set changes;
* its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher,
* as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible
@@ -180,7 +180,7 @@ public class Dispatcher extends AbstractComponent {
* under the assumption that this is the common case, i.e., new nodes have no documents yet.
*/
void updateWithNewConfig(DispatchNodesConfig nodesConfig) {
- try (var items = volatileItems()) { // Marking a reference to the old snapshot, which we want to have cleaned up.
+ try (var items = volatileItems()) { // Mark a reference to the old snapshot, which we want to have cleaned up.
items.get().countDown(); // Decrement for its initial creation reference, so it may reach 0.
// Let the RPC pool know about the new nodes, and set up the delayed cleanup that we need to do.
@@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent {
};
// Update the nodes the search cluster keeps track of, and what nodes are monitored.
- ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(nodesConfig), dispatchConfig.minActivedocsPercentage());
+ searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage());
// Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this.
- this.volatileItems = update(newMonitor);
-
- // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to.
- items.get().clusterMonitor.shutdown();
+ this.volatileItems = update();
} // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread.
}
- private VolatileItems update(ClusterMonitor<Node> clusterMonitor) {
- var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
- invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig),
- clusterMonitor);
- return items;
+ private VolatileItems update() {
+ return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())),
+ invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig));
}
private void initialWarmup(double warmupTime) {
@@ -234,9 +229,9 @@ public class Dispatcher extends AbstractComponent {
case LATENCY_AMORTIZED_OVER_TIME -> LoadBalancer.Policy.LATENCY_AMORTIZED_OVER_TIME;
};
}
- private static List<Node> toNodes(DispatchNodesConfig nodesConfig) {
+ private static List<Node> toNodes(String clusterName, DispatchNodesConfig nodesConfig) {
return nodesConfig.node().stream()
- .map(n -> new Node(n.key(), n.host(), n.group()))
+ .map(n -> new Node(clusterName, n.key(), n.host(), n.group()))
.toList();
}
@@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
// The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster.
- volatileItems.clusterMonitor.shutdown();
+ clusterMonitor.shutdown();
if (rpcResourcePool != null) {
rpcResourcePool.close();
}