diff options
author | jonmv <venstad@gmail.com> | 2023-09-05 10:47:23 +0200 |
---|---|---|
committer | jonmv <venstad@gmail.com> | 2023-09-05 11:01:15 +0200 |
commit | 5c5be982848fb8a3f1d84fa522f380b8706e6ddb (patch) | |
tree | 87879722d4763785b73648b6778bb3b428b5f555 /container-search/src/main/java/com/yahoo | |
parent | 932a5311bf7acfc9bad8e45be39cec5540b0a692 (diff) |
Keep and reconfigure ClusterMonitor when reconfiguring dispatcher
Diffstat (limited to 'container-search/src/main/java/com/yahoo')
3 files changed, 80 insertions, 40 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 332bf4ea2c4..d81f9079a02 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -3,16 +3,22 @@ package com.yahoo.search.cluster; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.UncheckedInterruptedException; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,7 +42,16 @@ public class ClusterMonitor<T> { private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ - private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); + private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap<>()); + + // Used during reconfiguration to ensure async RPC calls are complete. + private final Set<T> nodesToRemove = new LinkedHashSet<>(); + + // Used during reconfiguration to ensure all nodes have data. + private final Set<T> nodesToUpdate = new LinkedHashSet<>(); + + // Used for reconfiguration, and during shutdown. + private boolean skipNextWait = false; public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) { nodeManager = manager; @@ -46,6 +61,22 @@ public class ClusterMonitor<T> { } } + /** Updates the monitored set of nodes, and waits for 1. data on new nodes, and 2. RPC completion of removed nodes. */ + public synchronized void reconfigure(Collection<T> nodes) { + if ( ! monitorThread.isAlive()) throw new IllegalStateException("monitor thread must be alive for reconfiguration"); + + nodesToUpdate.addAll(nodes); + nodesToRemove.addAll(nodeMonitors.keySet()); + nodesToRemove.removeAll(nodes); + for (T node : nodes) if ( ! nodeMonitors.containsKey(node)) add(node, true); + + synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); } + try { while ( ! nodesToRemove.isEmpty() || ! nodesToUpdate.isEmpty()) wait(1); } + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } + + nodeManager.pingIterationCompleted(); + } + public void start() { if ( ! monitorThread.isAlive()) { monitorThread.start(); @@ -74,30 +105,48 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { - if (closed.get()) return; // Do not touch state if close has started. - TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); - Boolean wasWorking = monitor.isKnownWorking(); - monitor.failed(error); - if (wasWorking != monitor.isKnownWorking()) - nodeManager.failed(node); + updateMonitoredNode(node, monitor -> monitor.failed(error), nodeManager::failed); } /** Called when a node responded */ public synchronized void responded(T node) { - if (closed.get()) return; // Do not touch state if close has started. + updateMonitoredNode(node, TrafficNodeMonitor::responded, nodeManager::working); + } + + private void updateMonitoredNode(T node, Consumer<TrafficNodeMonitor<T>> monitorUpdate, Consumer<T> nodeUpdate) { TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); - Boolean wasWorking = monitor.isKnownWorking(); - monitor.responded(); - if (wasWorking != monitor.isKnownWorking()) - nodeManager.working(node); + + // Don't touch state during shutdown. + if (closed.get()) monitor = null; + + // Node was removed during reconfiguration, and should no longer be monitored. + if (nodesToRemove.remove(node)) { + nodeMonitors.remove(node); + monitor = null; + } + + // Update monitor state only when it actually changes. + if (monitor != null) { + Boolean wasWorking = monitor.isKnownWorking(); + monitorUpdate.accept(monitor); + if (wasWorking != monitor.isKnownWorking()) + nodeUpdate.accept(node); + } + + // If the node was added in a recent reconfiguration, we now have its required data. + nodesToUpdate.remove(node); } /** * Ping all nodes which needs pinging to discover state changes */ - public void ping(Executor executor) { + public synchronized void ping(Executor executor) { for (var monitor : nodeMonitors()) { if (closed.get()) return; // Do nothing to change state if close has started. + if (nodesToRemove.remove(monitor.getNode())) { + nodeMonitors.remove(monitor.getNode()); + continue; + } nodeManager.ping(this, monitor.getNode(), executor); } nodeManager.pingIterationCompleted(); @@ -120,6 +169,7 @@ public class ClusterMonitor<T> { nodeMonitors.clear(); } synchronized (nodeManager) { + skipNextWait = true; nodeManager.notifyAll(); } try { @@ -148,7 +198,9 @@ public class ClusterMonitor<T> { log.finest("Activating ping"); ping(pingExecutor); synchronized (nodeManager) { - nodeManager.wait(configuration.getCheckInterval()); + if ( ! skipNextWait) + nodeManager.wait(configuration.getCheckInterval()); + skipNextWait = false; } } catch (Throwable e) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index eca0c8058a1..43d0e08886d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -60,20 +60,19 @@ public class Dispatcher extends AbstractComponent { private final DispatchConfig dispatchConfig; private final RpcConnectionPool rpcResourcePool; private final SearchCluster searchCluster; + private final ClusterMonitor<Node> clusterMonitor; private volatile VolatileItems volatileItems; private static class VolatileItems { final LoadBalancer loadBalancer; final InvokerFactory invokerFactory; - final ClusterMonitor<Node> clusterMonitor; final AtomicInteger inflight = new AtomicInteger(1); // Initial reference. Runnable cleanup = () -> { }; - VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory, ClusterMonitor<Node> clusterMonitor) { + VolatileItems(LoadBalancer loadBalancer, InvokerFactory invokerFactory) { this.loadBalancer = loadBalancer; this.invokerFactory = invokerFactory; - this.clusterMonitor = clusterMonitor; } private void countDown() { @@ -128,7 +127,7 @@ public class Dispatcher extends AbstractComponent { Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, SearchCluster searchCluster, InvokerFactoryFactory invokerFactories) { this(dispatchConfig, rpcConnectionPool, searchCluster, new ClusterMonitor<>(searchCluster, false), invokerFactories); - this.volatileItems.clusterMonitor.start(); // Populate nodes to monitor before starting it. + this.clusterMonitor.start(); // Populate nodes to monitor before starting it. } Dispatcher(DispatchConfig dispatchConfig, RpcConnectionPool rpcConnectionPool, @@ -137,7 +136,8 @@ public class Dispatcher extends AbstractComponent { this.rpcResourcePool = rpcConnectionPool; this.searchCluster = searchCluster; this.invokerFactories = invokerFactories; - this.volatileItems = update(clusterMonitor); + this.clusterMonitor = clusterMonitor; + this.volatileItems = update(); searchCluster.addMonitoring(clusterMonitor); } @@ -171,7 +171,7 @@ public class Dispatcher extends AbstractComponent { * 3. The load balancer is owned by the volatile snapshot, and is swapped atomically with it; * it is used internally by the dispatcher to select search nodes for queries, and is discarded with its snapshot. * 4. The cluster monitor is a subordinate to the search cluster, and does whatever that tells it to, at any time; - * it is technically owned by the volatile snapshot, but mostly to show it is swapped together with that. + * it is technically owned by the dispatcher, but in updated by the search cluster, when that is updated. * 5. The search cluster is owned by the dispatcher, and is updated on node set changes; * its responsibility is to keep track of the state of the backend, and to provide a view of it to the dispatcher, * as well as keep the container vip status updated accordingly; it should therefore preserve as much as possible @@ -192,21 +192,16 @@ public class Dispatcher extends AbstractComponent { }; // Update the nodes the search cluster keeps track of, and what nodes are monitored. - ClusterMonitor<Node> newMonitor = searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), dispatchConfig.minActivedocsPercentage()); + searchCluster.updateNodes(toNodes(searchCluster.name(), nodesConfig), clusterMonitor, dispatchConfig.minActivedocsPercentage()); // Update the snapshot to use the new nodes set in the search cluster; the RPC pool is ready for this. - this.volatileItems = update(newMonitor); - - // Wait for the old cluster monitor to die; it may be pinging nodes we want to shut down RPC connections to. - items.get().clusterMonitor.shutdown(); + this.volatileItems = update(); } // Close the old snapshot, which may trigger the RPC cleanup now, or when the last invoker is closed, by a search thread. } - private VolatileItems update(ClusterMonitor<Node> clusterMonitor) { - var items = new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), - invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig), - clusterMonitor); - return items; + private VolatileItems update() { + return new VolatileItems(new LoadBalancer(searchCluster.groupList().groups(), toLoadBalancerPolicy(dispatchConfig.distributionPolicy())), + invokerFactories.create(rpcResourcePool, searchCluster.groupList(), dispatchConfig)); } private void initialWarmup(double warmupTime) { @@ -255,7 +250,7 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { // The clustermonitor must be shutdown first as it uses the invokerfactory through the searchCluster. - volatileItems.clusterMonitor.shutdown(); + clusterMonitor.shutdown(); if (rpcResourcePool != null) { rpcResourcePool.close(); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 59b4637a627..f7a77ebf963 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -6,7 +6,6 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.yolean.UncheckedInterruptedException; import java.util.ArrayList; import java.util.Collection; @@ -62,7 +61,7 @@ public class SearchCluster implements NodeManager<Node> { public String name() { return clusterId; } /** Sets the new nodes to monitor to be the new nodes, but keep any existing node instances which equal the new ones. */ - public ClusterMonitor<Node> updateNodes(Collection<Node> newNodes, double minActivedocsPercentage) { + public void updateNodes(Collection<Node> newNodes, ClusterMonitor<Node> monitor, double minActivedocsPercentage) { List<Node> currentNodes = new ArrayList<>(newNodes); List<Node> addedNodes = new ArrayList<>(); Map<Node, Node> retainedNodes = groups.nodes().stream().collect(toMap(node -> node, node -> node)); @@ -72,15 +71,9 @@ public class SearchCluster implements NodeManager<Node> { else addedNodes.add(currentNodes.get(i)); } SearchGroupsImpl groups = toGroups(currentNodes, minActivedocsPercentage); - ClusterMonitor<Node> monitor = new ClusterMonitor<>(this, false); - for (Node node : groups.nodes()) monitor.add(node, true); - monitor.start(); - try { while (addedNodes.stream().anyMatch(node -> node.isWorking() == null)) { Thread.sleep(1); } } - catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } - pingIterationCompleted(groups); this.localCorpusDispatchTarget = findLocalCorpusDispatchTarget(HostName.getLocalhost(), groups); + monitor.reconfigure(groups.nodes()); this.groups = groups; - return monitor; } public void addMonitoring(ClusterMonitor<Node> clusterMonitor) { |