diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2019-12-18 18:59:23 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2019-12-18 21:09:04 +0000 |
commit | 18b31ea7d44d149ccd742999b5691402acb19a7b (patch) | |
tree | d14c1964563a5a73a3312b3a0f016ca54b5f571a /container-search | |
parent | 5763bd24da07a59f2653986e61d940b4d2a87e9a (diff) |
This is the correct way of shutting down the the Dispatcher.
Avoid using Thread.interrupt for shutdown, handle it gracefully.
1 - Shutdown the SearchCluster
1 - Set it to closed.
2 - Synchronize clearing of monitored nodes.
3 - Wake up monitor thread if sleeping.
4 - Join with monitorthread.
2 - Release the invokerPool.
Diffstat (limited to 'container-search')
3 files changed, 25 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java index a016f7d695c..fc186a95679 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -12,6 +12,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; @@ -32,7 +33,7 @@ public class ClusterMonitor<T> { private final MonitorThread monitorThread; - private volatile boolean shutdown = false; + private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); @@ -70,6 +71,7 @@ public class ClusterMonitor<T> { /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { + if (closed.get()) return; // Do not touch state if close has started. TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); Boolean wasWorking = monitor.isKnownWorking(); monitor.failed(error); @@ -79,6 +81,7 @@ public class ClusterMonitor<T> { /** Called when a node responded */ public synchronized void responded(T node) { + if (closed.get()) return; // Do not touch state if close has started. TrafficNodeMonitor<T> monitor = nodeMonitors.get(node); Boolean wasWorking = monitor.isKnownWorking(); monitor.responded(); @@ -90,10 +93,11 @@ public class ClusterMonitor<T> { * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { - for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext(); ) { + for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { BaseNodeMonitor<T> monitor= i.next(); nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded } + if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); } @@ -104,15 +108,21 @@ public class ClusterMonitor<T> { /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List<BaseNodeMonitor<T>> nodeMonitors() { - synchronized (nodeMonitors) { - return new java.util.ArrayList<>(nodeMonitors.values()); - } + return new java.util.ArrayList<>(nodeMonitors.values()); } /** Must be called when this goes out of use */ public void shutdown() { - shutdown = true; - monitorThread.interrupt(); + closed.set(true); + synchronized (this) { + nodeMonitors.clear(); + } + synchronized (nodeManager) { + nodeManager.notifyAll(); + } + try { + monitorThread.join(); + } catch (InterruptedException e) {} } private class MonitorThread extends Thread { @@ -128,14 +138,16 @@ public class ClusterMonitor<T> { // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); - while (!isInterrupted()) { + while (!closed.get()) { try { - Thread.sleep(configuration.getCheckInterval()); + synchronized (nodeManager) { + nodeManager.wait(configuration.getCheckInterval()); + } log.finest("Activating ping"); ping(pingExecutor); } catch (Throwable e) { - if (shutdown && e instanceof InterruptedException) { + if (closed.get() && e instanceof InterruptedException) { break; } else if ( ! (e instanceof Exception) ) { log.log(Level.WARNING,"Error in monitor thread, will quit", e); diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index e078ffa685f..224facd0c5b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -131,8 +131,9 @@ public class Dispatcher extends AbstractComponent { @Override public void deconstruct() { - invokerFactory.release(); + /* The seach cluster must be shutdown first as it uses the invokerfactory. */ searchCluster.shutDown(); + invokerFactory.release(); } public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 3595a24ca92..fb55e330ebe 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -408,8 +408,7 @@ public class SearchCluster implements NodeManager<Node> { activeDocuments += n.getActiveDocuments(); } long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups; - boolean sufficient = isGroupCoverageSufficient(nodes.size(), nodesInGroup, activeDocuments, averageDocumentsInOtherGroups); - return sufficient; + return isGroupCoverageSufficient(nodes.size(), nodesInGroup, activeDocuments, averageDocumentsInOtherGroups); } private void trackGroupCoverageChanges(int index, Group group, boolean fullCoverage, long averageDocuments) { |