aboutsummaryrefslogtreecommitdiffstats
path: root/container-search
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2019-12-18 18:59:23 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2019-12-18 21:09:04 +0000
commit18b31ea7d44d149ccd742999b5691402acb19a7b (patch)
treed14c1964563a5a73a3312b3a0f016ca54b5f571a /container-search
parent5763bd24da07a59f2653986e61d940b4d2a87e9a (diff)
This is the correct way of shutting down the the Dispatcher.
Avoid using Thread.interrupt for shutdown, handle it gracefully. 1 - Shutdown the SearchCluster 1 - Set it to closed. 2 - Synchronize clearing of monitored nodes. 3 - Wake up monitor thread if sleeping. 4 - Join with monitorthread. 2 - Release the invokerPool.
Diffstat (limited to 'container-search')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java32
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java3
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java3
3 files changed, 25 insertions, 13 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
index a016f7d695c..fc186a95679 100644
--- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -12,6 +12,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -32,7 +33,7 @@ public class ClusterMonitor<T> {
private final MonitorThread monitorThread;
- private volatile boolean shutdown = false;
+ private final AtomicBoolean closed = new AtomicBoolean(false);
/** A map from Node to corresponding MonitoredNode */
private final Map<T, TrafficNodeMonitor<T>> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>());
@@ -70,6 +71,7 @@ public class ClusterMonitor<T> {
/** Called from ClusterSearcher/NodeManager when a node failed */
public synchronized void failed(T node, ErrorMessage error) {
+ if (closed.get()) return; // Do not touch state if close has started.
TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
Boolean wasWorking = monitor.isKnownWorking();
monitor.failed(error);
@@ -79,6 +81,7 @@ public class ClusterMonitor<T> {
/** Called when a node responded */
public synchronized void responded(T node) {
+ if (closed.get()) return; // Do not touch state if close has started.
TrafficNodeMonitor<T> monitor = nodeMonitors.get(node);
Boolean wasWorking = monitor.isKnownWorking();
monitor.responded();
@@ -90,10 +93,11 @@ public class ClusterMonitor<T> {
* Ping all nodes which needs pinging to discover state changes
*/
public void ping(Executor executor) {
- for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext(); ) {
+ for (Iterator<BaseNodeMonitor<T>> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) {
BaseNodeMonitor<T> monitor= i.next();
nodeManager.ping(monitor.getNode(), executor); // Cause call to failed or responded
}
+ if (closed.get()) return; // Do nothing to change state if close has started.
nodeManager.pingIterationCompleted();
}
@@ -104,15 +108,21 @@ public class ClusterMonitor<T> {
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
public List<BaseNodeMonitor<T>> nodeMonitors() {
- synchronized (nodeMonitors) {
- return new java.util.ArrayList<>(nodeMonitors.values());
- }
+ return new java.util.ArrayList<>(nodeMonitors.values());
}
/** Must be called when this goes out of use */
public void shutdown() {
- shutdown = true;
- monitorThread.interrupt();
+ closed.set(true);
+ synchronized (this) {
+ nodeMonitors.clear();
+ }
+ synchronized (nodeManager) {
+ nodeManager.notifyAll();
+ }
+ try {
+ monitorThread.join();
+ } catch (InterruptedException e) {}
}
private class MonitorThread extends Thread {
@@ -128,14 +138,16 @@ public class ClusterMonitor<T> {
// any thread local connections are reused) 2) a new thread will be started to execute
// new pings when a ping is not responding
ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
- while (!isInterrupted()) {
+ while (!closed.get()) {
try {
- Thread.sleep(configuration.getCheckInterval());
+ synchronized (nodeManager) {
+ nodeManager.wait(configuration.getCheckInterval());
+ }
log.finest("Activating ping");
ping(pingExecutor);
}
catch (Throwable e) {
- if (shutdown && e instanceof InterruptedException) {
+ if (closed.get() && e instanceof InterruptedException) {
break;
} else if ( ! (e instanceof Exception) ) {
log.log(Level.WARNING,"Error in monitor thread, will quit", e);
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index e078ffa685f..224facd0c5b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -131,8 +131,9 @@ public class Dispatcher extends AbstractComponent {
@Override
public void deconstruct() {
- invokerFactory.release();
+ /* The seach cluster must be shutdown first as it uses the invokerfactory. */
searchCluster.shutDown();
+ invokerFactory.release();
}
public FillInvoker getFillInvoker(Result result, VespaBackEndSearcher searcher) {
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 3595a24ca92..fb55e330ebe 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -408,8 +408,7 @@ public class SearchCluster implements NodeManager<Node> {
activeDocuments += n.getActiveDocuments();
}
long averageDocumentsInOtherGroups = sumOfActiveDocuments / otherGroups;
- boolean sufficient = isGroupCoverageSufficient(nodes.size(), nodesInGroup, activeDocuments, averageDocumentsInOtherGroups);
- return sufficient;
+ return isGroupCoverageSufficient(nodes.size(), nodesInGroup, activeDocuments, averageDocumentsInOtherGroups);
}
private void trackGroupCoverageChanges(int index, Group group, boolean fullCoverage, long averageDocuments) {