diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java | 80 |
1 files changed, 66 insertions, 14 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index 332bf4ea2c4..d81f9079a02 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -3,16 +3,22 @@ package com.yahoo.search.cluster; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; +import com.yahoo.yolean.UncheckedInterruptedException; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; @@ -36,7 +42,16 @@ public class ClusterMonitor<T> { private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ - private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); + private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap<>()); + + // Used during reconfiguration to ensure async RPC calls are complete. + private final Set<T> nodesToRemove = new LinkedHashSet<>(); + + // Used during reconfiguration to ensure all nodes have data. + private final Set<T> nodesToUpdate = new LinkedHashSet<>(); + + // Used for reconfiguration, and during shutdown. + private boolean skipNextWait = false; public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) { nodeManager = manager; @@ -46,6 +61,22 @@ public class ClusterMonitor<T> { } } + /** Updates the monitored set of nodes, and waits for 1. data on new nodes, and 2. RPC completion of removed nodes. */ + public synchronized void reconfigure(Collection<T> nodes) { + if ( ! monitorThread.isAlive()) throw new IllegalStateException("monitor thread must be alive for reconfiguration"); + + nodesToUpdate.addAll(nodes); + nodesToRemove.addAll(nodeMonitors.keySet()); + nodesToRemove.removeAll(nodes); + for (T node : nodes) if ( ! nodeMonitors.containsKey(node)) add(node, true); + + synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); } + try { while ( ! nodesToRemove.isEmpty() || ! nodesToUpdate.isEmpty()) wait(1); } + catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } + + nodeManager.pingIterationCompleted(); + } + public void start() { if ( ! monitorThread.isAlive()) { monitorThread.start(); @@ -74,30 +105,48 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { - if (closed.get()) return; // Do not touch state if close has started. - TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); - Boolean wasWorking = monitor.isKnownWorking(); - monitor.failed(error); - if (wasWorking != monitor.isKnownWorking()) - nodeManager.failed(node); + updateMonitoredNode(node, monitor -> monitor.failed(error), nodeManager::failed); } /** Called when a node responded */ public synchronized void responded(T node) { - if (closed.get()) return; // Do not touch state if close has started. + updateMonitoredNode(node, TrafficNodeMonitor::responded, nodeManager::working); + } + + private void updateMonitoredNode(T node, Consumer<TrafficNodeMonitor<T>> monitorUpdate, Consumer<T> nodeUpdate) { TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); - Boolean wasWorking = monitor.isKnownWorking(); - monitor.responded(); - if (wasWorking != monitor.isKnownWorking()) - nodeManager.working(node); + + // Don't touch state during shutdown. + if (closed.get()) monitor = null; + + // Node was removed during reconfiguration, and should no longer be monitored. + if (nodesToRemove.remove(node)) { + nodeMonitors.remove(node); + monitor = null; + } + + // Update monitor state only when it actually changes. + if (monitor != null) { + Boolean wasWorking = monitor.isKnownWorking(); + monitorUpdate.accept(monitor); + if (wasWorking != monitor.isKnownWorking()) + nodeUpdate.accept(node); + } + + // If the node was added in a recent reconfiguration, we now have its required data. + nodesToUpdate.remove(node); } /** * Ping all nodes which needs pinging to discover state changes */ - public void ping(Executor executor) { + public synchronized void ping(Executor executor) { for (var monitor : nodeMonitors()) { if (closed.get()) return; // Do nothing to change state if close has started. + if (nodesToRemove.remove(monitor.getNode())) { + nodeMonitors.remove(monitor.getNode()); + continue; + } nodeManager.ping(this, monitor.getNode(), executor); } nodeManager.pingIterationCompleted(); @@ -120,6 +169,7 @@ public class ClusterMonitor<T> { nodeMonitors.clear(); } synchronized (nodeManager) { + skipNextWait = true; nodeManager.notifyAll(); } try { @@ -148,7 +198,9 @@ public class ClusterMonitor<T> { log.finest("Activating ping"); ping(pingExecutor); synchronized (nodeManager) { - nodeManager.wait(configuration.getCheckInterval()); + if ( ! skipNextWait) + nodeManager.wait(configuration.getCheckInterval()); + skipNextWait = false; } } catch (Throwable e) { |