summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2017-09-04 09:24:28 +0200
committerHenning Baldersheim <balder@yahoo-inc.com>2017-09-04 09:24:28 +0200
commite930d8e8944ad0afdd69740e9130e1bdedf76956 (patch)
treeed2efc2ec80ed59342e9d7a6b9d9efe4ed2e33fc /clustercontroller-core
parent01fab1f2a7302d470c2a25e21c95dd5a5b766e1c (diff)
Try to shut down fleetcontroller in a controlled manner without relying on the infamous thread.interrupt.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java50
1 files changed, 29 insertions, 21 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 9329d78d453..759b15f92ee 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;
public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener,
@@ -46,7 +47,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private final DatabaseHandler database;
private final MasterElectionHandler masterElectionHandler;
private Thread runner = null;
- private boolean running = true;
+ private AtomicBoolean running = new AtomicBoolean(true);
private FleetControllerOptions options;
private FleetControllerOptions nextOptions;
private final List<SystemStateListener> systemStateListeners = new LinkedList<>();
@@ -189,9 +190,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
public Object getMonitor() { return monitor; }
public boolean isRunning() {
- synchronized(monitor) {
- return running;
- }
+ return running.get();
}
public boolean isMaster() {
@@ -257,16 +256,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
public int getRpcPort() { return rpcServer.getPort(); }
public void shutdown() throws InterruptedException, java.io.IOException {
- boolean isStillRunning = false;
- synchronized(monitor) {
- if (running) {
- isStillRunning = true;
- }
- }
- if (runner != null && isStillRunning) {
+ if (runner != null && isRunning()) {
log.log(LogLevel.INFO, "Joining event thread.");
- running = false;
- runner.interrupt();
+ running.set(false);
+ synchronized(monitor) { monitor.notifyAll(); }
runner.join();
}
log.log(LogLevel.INFO, "Fleetcontroller done shutting down event thread.");
@@ -512,43 +505,54 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
didWork |= handleLeadershipEdgeTransitions();
stateChangeHandler.setMaster(isMaster);
+ if ( ! isRunning()) { return; }
// Process zero or more getNodeState responses that we have received.
didWork |= stateGatherer.processResponses(this);
+ if ( ! isRunning()) { return; }
+
if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount)) {
didWork |= resyncLocallyCachedState();
} else {
stepDownAsStateGatherer();
}
+ if ( ! isRunning()) { return; }
didWork |= systemStateBroadcaster.processResponses();
+ if ( ! isRunning()) { return; }
if (masterElectionHandler.isMaster()) {
didWork |= broadcastClusterStateToEligibleNodes();
}
+ if ( ! isRunning()) { return; }
didWork |= processAnyPendingStatusPageRequest();
-
+ if ( ! isRunning()) { return; }
if (rpcServer != null) {
didWork |= rpcServer.handleRpcRequests(cluster, consolidatedClusterState(), this, this);
}
+ if ( ! isRunning()) { return; }
didWork |= processNextQueuedRemoteTask();
processingCycle = false;
++cycleCount;
long tickStopTime = timer.getCurrentTimeInMillis();
- if (tickStopTime >= tickStartTime)
+ if (tickStopTime >= tickStartTime) {
metricUpdater.addTickTime(tickStopTime - tickStartTime, didWork);
- if ( ! didWork && ! waitingForCycle)
+ }
+ if ( ! didWork && ! waitingForCycle) {
monitor.wait(options.cycleWaitTime);
+ }
+ if ( ! isRunning()) { return; }
tickStartTime = timer.getCurrentTimeInMillis();
processingCycle = true;
if (nextOptions != null) { // if reconfiguration has given us new options, propagate them
switchToNewConfig();
}
}
-
- propagateNewStatesToListeners();
+ if (isRunning()) {
+ propagateNewStatesToListeners();
+ }
}
private boolean updateMasterElectionState() throws InterruptedException {
@@ -802,15 +806,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
controllerThreadId = Thread.currentThread().getId();
try {
processingCycle = true;
- while(running)
+ while( isRunning() ) {
tick();
+ }
} catch (InterruptedException e) {
log.log(LogLevel.DEBUG, "Event thread stopped by interrupt exception: " + e);
} catch (Throwable t) {
t.printStackTrace();
log.log(LogLevel.ERROR, "Fatal error killed fleet controller", t);
- synchronized (monitor) { running = false; }
+ synchronized (monitor) { running.set(false); }
System.exit(1);
+ } finally {
+ running.set(false);
+ synchronized (monitor) { monitor.notifyAll(); }
}
}
@@ -834,7 +842,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
try{
while (cycleCount < wantedCycle) {
if (System.currentTimeMillis() > endTime) throw new IllegalStateException("Timed out waiting for cycle to complete. Not completed after " + timeoutMS + " ms.");
- if (!running) throw new IllegalStateException("Fleetcontroller not running. Will never complete cycles");
+ if ( !isRunning() ) throw new IllegalStateException("Fleetcontroller not running. Will never complete cycles");
try{ monitor.wait(100); } catch (InterruptedException e) {}
}
} finally {