summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
diff options
context:
space:
mode:
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java42
1 files changed, 31 insertions, 11 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index a016f7d695c..d4b6279be89 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -12,6 +12,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,15 +33,21 @@ public class ClusterMonitor<T> {
private final MonitorThread monitorThread;
- private volatile boolean shutdown = false;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/** A map from Node to corresponding MonitoredNode */
private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
public ClusterMonitor(NodeManager<T> manager) {
+ this(manager, true);
+ }
+
+ public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) {
nodeManager = manager;
monitorThread = new MonitorThread("search.clustermonitor");
- monitorThread.start();
+ if (startPingThread) {
+ monitorThread.start();
+ }
}
/** Returns the configuration of this cluster monitor */
@@ -70,6 +77,7 @@ public class ClusterMonitor<T> {
/** Called from ClusterSearcher/NodeManager when a node failed */
public synchronized void failed(T node, ErrorMessage error) {
+ if (closed.get()) return; // Do not touch state if close has started.
TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
Boolean wasWorking = monitor.isKnownWorking();
monitor.failed(error);
@@ -79,6 +87,7 @@ public class ClusterMonitor<T> {
/** Called when a node responded */
public synchronized void responded(T node) {
+ if (closed.get()) return; // Do not touch state if close has started.
TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
Boolean wasWorking = monitor.isKnownWorking();
monitor.responded();
@@ -90,10 +99,11 @@ public class ClusterMonitor<T> {
* Ping all nodes which needs pinging to discover state changes
*/
public void ping(Executor executor) {
- for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext(); ) {
+ for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) {
BaseNodeMonitor<T> monitor= i.next();
nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded
}
+ if (closed.get()) return; // Do nothing to change state if close has started.
nodeManager.pingIterationCompleted();
}
@@ -104,15 +114,23 @@ public class ClusterMonitor<T> {
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
public List<BaseNodeMonitor<T>> nodeMonitors() {
- synchronized (nodeMonitors) {
- return new java.util.ArrayList<>(nodeMonitors.values());
- }
+ return new java.util.ArrayList<>(nodeMonitors.values());
}
/** Must be called when this goes out of use */
public void shutdown() {
- shutdown = true;
- monitorThread.interrupt();
+ closed.set(true);
+ synchronized (this) {
+ nodeMonitors.clear();
+ }
+ synchronized (nodeManager) {
+ nodeManager.notifyAll();
+ }
+ try {
+ if (monitorThread.isAlive()) {
+ monitorThread.join();
+ }
+ } catch (InterruptedException e) {}
}
private class MonitorThread extends Thread {
@@ -128,14 +146,16 @@ public class ClusterMonitor<T> {
// any thread local connections are reused) 2) a new thread will be started to execute
// new pings when a ping is not responding
ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
- while (!isInterrupted()) {
+ while (!closed.get()) {
try {
- Thread.sleep(configuration.getCheckInterval());
log.finest("Activating ping");
ping(pingExecutor);
+ synchronized (nodeManager) {
+ nodeManager.wait(configuration.getCheckInterval());
+ }
}
catch (Throwable e) {
- if (shutdown && e instanceof InterruptedException) {
+ if (closed.get() && e instanceof InterruptedException) {
break;
} else if ( ! (e instanceof Exception) ) {
log.log(Level.WARNING,"Error in monitor thread, will quit", e);