diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-04 09:24:28 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2017-09-04 09:24:28 +0200 |
commit | e930d8e8944ad0afdd69740e9130e1bdedf76956 (patch) | |
tree | ed2efc2ec80ed59342e9d7a6b9d9efe4ed2e33fc /clustercontroller-core/src | |
parent | 01fab1f2a7302d470c2a25e21c95dd5a5b766e1c (diff) |
Try to shut down fleetcontroller in a controlled manner without relying on the infamous thread.interrupt.
Diffstat (limited to 'clustercontroller-core/src')
-rw-r--r-- | clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java | 50 |
1 files changed, 29 insertions, 21 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 9329d78d453..759b15f92ee 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import java.io.FileNotFoundException; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Logger; public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAddedOrRemovedListener, SystemStateListener, @@ -46,7 +47,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final DatabaseHandler database; private final MasterElectionHandler masterElectionHandler; private Thread runner = null; - private boolean running = true; + private AtomicBoolean running = new AtomicBoolean(true); private FleetControllerOptions options; private FleetControllerOptions nextOptions; private final List<SystemStateListener> systemStateListeners = new LinkedList<>(); @@ -189,9 +190,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public Object getMonitor() { return monitor; } public boolean isRunning() { - synchronized(monitor) { - return running; - } + return running.get(); } public boolean isMaster() { @@ -257,16 +256,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public int getRpcPort() { return rpcServer.getPort(); } public void shutdown() throws InterruptedException, java.io.IOException { - boolean isStillRunning = false; - synchronized(monitor) { - if (running) { - isStillRunning = true; - } - } - if (runner != null && isStillRunning) { + if (runner != null && isRunning()) { log.log(LogLevel.INFO, "Joining event thread."); - running = false; - runner.interrupt(); + running.set(false); + synchronized(monitor) { monitor.notifyAll(); } runner.join(); } log.log(LogLevel.INFO, "Fleetcontroller done shutting down event thread."); @@ -512,43 +505,54 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd didWork |= handleLeadershipEdgeTransitions(); stateChangeHandler.setMaster(isMaster); + if ( ! isRunning()) { return; } // Process zero or more getNodeState responses that we have received. didWork |= stateGatherer.processResponses(this); + if ( ! isRunning()) { return; } + if (masterElectionHandler.isAmongNthFirst(options.stateGatherCount)) { didWork |= resyncLocallyCachedState(); } else { stepDownAsStateGatherer(); } + if ( ! isRunning()) { return; } didWork |= systemStateBroadcaster.processResponses(); + if ( ! isRunning()) { return; } if (masterElectionHandler.isMaster()) { didWork |= broadcastClusterStateToEligibleNodes(); } + if ( ! isRunning()) { return; } didWork |= processAnyPendingStatusPageRequest(); - + if ( ! isRunning()) { return; } if (rpcServer != null) { didWork |= rpcServer.handleRpcRequests(cluster, consolidatedClusterState(), this, this); } + if ( ! isRunning()) { return; } didWork |= processNextQueuedRemoteTask(); processingCycle = false; ++cycleCount; long tickStopTime = timer.getCurrentTimeInMillis(); - if (tickStopTime >= tickStartTime) + if (tickStopTime >= tickStartTime) { metricUpdater.addTickTime(tickStopTime - tickStartTime, didWork); - if ( ! didWork && ! waitingForCycle) + } + if ( ! didWork && ! waitingForCycle) { monitor.wait(options.cycleWaitTime); + } + if ( ! isRunning()) { return; } tickStartTime = timer.getCurrentTimeInMillis(); processingCycle = true; if (nextOptions != null) { // if reconfiguration has given us new options, propagate them switchToNewConfig(); } } - - propagateNewStatesToListeners(); + if (isRunning()) { + propagateNewStatesToListeners(); + } } private boolean updateMasterElectionState() throws InterruptedException { @@ -802,15 +806,19 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd controllerThreadId = Thread.currentThread().getId(); try { processingCycle = true; - while(running) + while( isRunning() ) { tick(); + } } catch (InterruptedException e) { log.log(LogLevel.DEBUG, "Event thread stopped by interrupt exception: " + e); } catch (Throwable t) { t.printStackTrace(); log.log(LogLevel.ERROR, "Fatal error killed fleet controller", t); - synchronized (monitor) { running = false; } + synchronized (monitor) { running.set(false); } System.exit(1); + } finally { + running.set(false); + synchronized (monitor) { monitor.notifyAll(); } } } @@ -834,7 +842,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try{ while (cycleCount < wantedCycle) { if (System.currentTimeMillis() > endTime) throw new IllegalStateException("Timed out waiting for cycle to complete. Not completed after " + timeoutMS + " ms."); - if (!running) throw new IllegalStateException("Fleetcontroller not running. Will never complete cycles"); + if ( !isRunning() ) throw new IllegalStateException("Fleetcontroller not running. Will never complete cycles"); try{ monitor.wait(100); } catch (InterruptedException e) {} } } finally { |