aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java80
1 files changed, 66 insertions, 14 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index 332bf4ea2c4..d81f9079a02 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -3,16 +3,22 @@ package com.yahoo.search.cluster;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.yolean.UncheckedInterruptedException;
+import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -36,7 +42,16 @@ public class ClusterMonitor<T> {
private final AtomicBoolean closed = new AtomicBoolean(false);
/** A map from Node to corresponding MonitoredNode */
- private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
+ private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap<>());
+
+ // Used during reconfiguration to ensure async RPC calls are complete.
+ private final Set<T> nodesToRemove = new LinkedHashSet<>();
+
+ // Used during reconfiguration to ensure all nodes have data.
+ private final Set<T> nodesToUpdate = new LinkedHashSet<>();
+
+ // Used for reconfiguration, and during shutdown.
+ private boolean skipNextWait = false;
public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) {
nodeManager = manager;
@@ -46,6 +61,22 @@ public class ClusterMonitor<T> {
}
}
+ /** Updates the monitored set of nodes, and waits for 1. data on new nodes, and 2. RPC completion of removed nodes. */
+ public synchronized void reconfigure(Collection<T> nodes) {
+ if ( ! monitorThread.isAlive()) throw new IllegalStateException("monitor thread must be alive for reconfiguration");
+
+ nodesToUpdate.addAll(nodes);
+ nodesToRemove.addAll(nodeMonitors.keySet());
+ nodesToRemove.removeAll(nodes);
+ for (T node : nodes) if ( ! nodeMonitors.containsKey(node)) add(node, true);
+
+ synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); }
+ try { while ( ! nodesToRemove.isEmpty() || ! nodesToUpdate.isEmpty()) wait(1); }
+ catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); }
+
+ nodeManager.pingIterationCompleted();
+ }
+
public void start() {
if ( ! monitorThread.isAlive()) {
monitorThread.start();
@@ -74,30 +105,48 @@ public class ClusterMonitor<T> {
/** Called from ClusterSearcher/NodeManager when a node failed */
public synchronized void failed(T node, ErrorMessage error) {
- if (closed.get()) return; // Do not touch state if close has started.
- TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
- Boolean wasWorking = monitor.isKnownWorking();
- monitor.failed(error);
- if (wasWorking != monitor.isKnownWorking())
- nodeManager.failed(node);
+ updateMonitoredNode(node, monitor -> monitor.failed(error), nodeManager::failed);
}
/** Called when a node responded */
public synchronized void responded(T node) {
- if (closed.get()) return; // Do not touch state if close has started.
+ updateMonitoredNode(node, TrafficNodeMonitor::responded, nodeManager::working);
+ }
+
+ private void updateMonitoredNode(T node, Consumer<TrafficNodeMonitor<T>> monitorUpdate, Consumer<T> nodeUpdate) {
TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
- Boolean wasWorking = monitor.isKnownWorking();
- monitor.responded();
- if (wasWorking != monitor.isKnownWorking())
- nodeManager.working(node);
+
+ // Don't touch state during shutdown.
+ if (closed.get()) monitor = null;
+
+ // Node was removed during reconfiguration, and should no longer be monitored.
+ if (nodesToRemove.remove(node)) {
+ nodeMonitors.remove(node);
+ monitor = null;
+ }
+
+ // Update monitor state only when it actually changes.
+ if (monitor != null) {
+ Boolean wasWorking = monitor.isKnownWorking();
+ monitorUpdate.accept(monitor);
+ if (wasWorking != monitor.isKnownWorking())
+ nodeUpdate.accept(node);
+ }
+
+ // If the node was added in a recent reconfiguration, we now have its required data.
+ nodesToUpdate.remove(node);
}
/**
* Ping all nodes which needs pinging to discover state changes
*/
- public void ping(Executor executor) {
+ public synchronized void ping(Executor executor) {
for (var monitor : nodeMonitors()) {
if (closed.get()) return; // Do nothing to change state if close has started.
+ if (nodesToRemove.remove(monitor.getNode())) {
+ nodeMonitors.remove(monitor.getNode());
+ continue;
+ }
nodeManager.ping(this, monitor.getNode(), executor);
}
nodeManager.pingIterationCompleted();
@@ -120,6 +169,7 @@ public class ClusterMonitor<T> {
nodeMonitors.clear();
}
synchronized (nodeManager) {
+ skipNextWait = true;
nodeManager.notifyAll();
}
try {
@@ -148,7 +198,9 @@ public class ClusterMonitor<T> {
log.finest("Activating ping");
ping(pingExecutor);
synchronized (nodeManager) {
- nodeManager.wait(configuration.getCheckInterval());
+ if ( ! skipNextWait)
+ nodeManager.wait(configuration.getCheckInterval());
+ skipNextWait = false;
}
}
catch (Throwable e) {