diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java | 42 |
1 files changed, 31 insertions, 11 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index a016f7d695c..d4b6279be89 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -12,6 +12,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,15 +33,21 @@ public class ClusterMonitor<T> { private final MonitorThread monitorThread; - private volatile boolean shutdown = false; + private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); public ClusterMonitor(NodeManager<T> manager) { + this(manager, true); + } + + public ClusterMonitor(NodeManager<T> manager, boolean startPingThread) { nodeManager = manager; monitorThread = new MonitorThread("search.clustermonitor"); - monitorThread.start(); + if (startPingThread) { + monitorThread.start(); + } } /** Returns the configuration of this cluster monitor */ @@ -70,6 +77,7 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { + if (closed.get()) return; // Do not touch state if close has started. TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); Boolean wasWorking = monitor.isKnownWorking(); monitor.failed(error); @@ -79,6 +87,7 @@ public class ClusterMonitor<T> { /** Called when a node responded */ public synchronized void responded(T node) { + if (closed.get()) return; // Do not touch state if close has started. TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); Boolean wasWorking = monitor.isKnownWorking(); monitor.responded(); @@ -90,10 +99,11 @@ public class ClusterMonitor<T> { * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { - for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext(); ) { + for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { BaseNodeMonitor<T> monitor= i.next(); nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded } + if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); } @@ -104,15 +114,23 @@ public class ClusterMonitor<T> { /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List<BaseNodeMonitor<T>> nodeMonitors() { - synchronized (nodeMonitors) { - return new java.util.ArrayList<>(nodeMonitors.values()); - } + return new java.util.ArrayList<>(nodeMonitors.values()); } /** Must be called when this goes out of use */ public void shutdown() { - shutdown = true; - monitorThread.interrupt(); + closed.set(true); + synchronized (this) { + nodeMonitors.clear(); + } + synchronized (nodeManager) { + nodeManager.notifyAll(); + } + try { + if (monitorThread.isAlive()) { + monitorThread.join(); + } + } catch (InterruptedException e) {} } private class MonitorThread extends Thread { @@ -128,14 +146,16 @@ public class ClusterMonitor<T> { // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); - while (!isInterrupted()) { + while (!closed.get()) { try { - Thread.sleep(configuration.getCheckInterval()); log.finest("Activating ping"); ping(pingExecutor); + synchronized (nodeManager) { + nodeManager.wait(configuration.getCheckInterval()); + } } catch (Throwable e) { - if (shutdown && e instanceof InterruptedException) { + if (closed.get() && e instanceof InterruptedException) { break; } else if ( ! (e instanceof Exception) ) { log.log(Level.WARNING,"Error in monitor thread, will quit", e); |