aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java
blob: 105143c08209261845608394e2fecad06eb763ce (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;

import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeListener;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.logging.Level;
import java.util.logging.Logger;

import static com.yahoo.vdslib.state.State.DOWN;
import static com.yahoo.vdslib.state.State.INITIALIZING;
import static com.yahoo.vdslib.state.State.STOPPING;
import static java.util.logging.Level.FINE;
import static java.util.logging.Level.FINEST;

/**
 * This class gets node state updates and timer events and uses these to decide
 * whether a new cluster state should be generated.
 *
 * TODO refactor logic out into smaller, separate components. Still state duplication
 * between ClusterStateGenerator and StateChangeHandler, especially for temporal
 * state transition configuration parameters.
 */
public class StateChangeHandler {

    private static final Logger log = Logger.getLogger(StateChangeHandler.class.getName());

    private final FleetControllerContext context;
    private final Timer timer;
    private final EventLogInterface eventLog;
    private boolean stateMayHaveChanged = false;
    private boolean isMaster = false;

    private Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
    private int maxInitProgressTime = 5000;
    private int maxPrematureCrashes = 4;
    private long stableStateTimePeriod = 60 * 60 * 1000;
    private int maxSlobrokDisconnectGracePeriod = 1000;
    private static final boolean disableUnstableNodes = true;

    public StateChangeHandler(FleetControllerContext context, Timer timer, EventLogInterface eventLog) {
        this.context = context;
        this.timer = timer;
        this.eventLog = eventLog;
        maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000);
        maxTransitionTime.put(NodeType.STORAGE, 5000);
    }

    public void handleAllDistributorsInSync(final ClusterState currentState,
                                            final Set<ConfiguredNode> nodes,
                                            final DatabaseHandler database,
                                            final DatabaseHandler.DatabaseContext dbContext) {
        int startTimestampsReset = 0;
        context.log(log, FINE, "handleAllDistributorsInSync invoked for state version %d", currentState.getVersion());
        for (NodeType nodeType : NodeType.getTypes()) {
            for (ConfiguredNode configuredNode : nodes) {
                final Node node = new Node(nodeType, configuredNode.index());
                final NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node);
                final NodeState nodeState = currentState.getNodeState(node);
                if (nodeInfo != null && nodeState != null) {
                    if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) {
                        log.log(FINE, () -> String.format("Storing away new start timestamp for node %s (%d)", node, nodeState.getStartTimestamp()));
                        nodeInfo.setStartTimestamp(nodeState.getStartTimestamp());
                    }
                    if (nodeState.getStartTimestamp() > 0) {
                        log.log(FINE, "Resetting timestamp in cluster state for node %s", node);
                        ++startTimestampsReset;
                    }
                } else if (log.isLoggable(FINE)) {
                    log.log(FINE, node + ": " +
                                            (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " +
                                            (nodeState == null ? "null" : nodeState.getStartTimestamp()));
                }
            }
        }
        if (startTimestampsReset > 0) {
            eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset +
                    " start timestamps as all available distributors have seen newest cluster state.",
                    timer.getCurrentTimeInMillis()));
            stateMayHaveChanged = true;
            database.saveStartTimestamps(dbContext);
        } else {
            log.log(FINE, "Found no start timestamps to reset in cluster state.");
        }
    }

    public boolean stateMayHaveChanged() {
        return stateMayHaveChanged;
    }

    public void setStateChangedFlag() { stateMayHaveChanged = true; }
    public void unsetStateChangedFlag() {
        stateMayHaveChanged = false;
    }

    public void setMaster(boolean isMaster) {
        this.isMaster = isMaster;
    }

    public void setMaxTransitionTime(Map<NodeType, Integer> map) { maxTransitionTime = map; }
    public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; }
    public void setMaxSlobrokDisconnectGracePeriod(int millisecs) {
        maxSlobrokDisconnectGracePeriod = millisecs;
    }
    public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; }
    public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; }

    // TODO nodeListener is only used via updateNodeInfoFromReportedState -> handlePrematureCrash
    // TODO this will recursively invoke proposeNewNodeState, which will presumably (i.e. hopefully) be a no-op...
    public void handleNewReportedNodeState(ClusterState currentClusterState,
                                           NodeInfo node,
                                           NodeState reportedState,
                                           NodeListener nodeListener) {
        NodeState currentState = currentClusterState.getNodeState(node.getNode());
        Level level = (currentState.equals(reportedState) && node.getVersion() == 0) ? FINEST : FINE;
        log.log(level, () -> String.format("Got nodestate reply from %s: %s (Current state is %s)",
                                           node, node.getReportedState().getTextualDifference(reportedState), currentState.toString(true)));
        long currentTime = timer.getCurrentTimeInMillis();

        if (reportedState.getState().equals(DOWN)) {
            node.setTimeOfFirstFailingConnectionAttempt(currentTime);
        }

        // *** LOGGING ONLY
        if ( ! reportedState.similarTo(node.getReportedState())) {
            if (reportedState.getState().equals(DOWN)) {
                eventLog.addNodeOnlyEvent(NodeEvent.forBaseline(node, "Failed to get node state: " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), Level.INFO);
            } else {
                eventLog.addNodeOnlyEvent(NodeEvent.forBaseline(node, "Now reporting state " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), FINE);
            }
        }

        if (reportedState.equals(node.getReportedState()) &&  ! reportedState.getState().equals(INITIALIZING)) {
            return;
        }

        updateNodeInfoFromReportedState(node, currentState, reportedState, nodeListener);

        if (reportedState.getMinUsedBits() != currentState.getMinUsedBits()) {
            int oldCount = currentState.getMinUsedBits();
            int newCount = reportedState.getMinUsedBits();
            log.log(FINE,
                    () -> String.format("Altering node state to reflect that min distribution bit count has changed from %d to %d", oldCount, newCount));
            eventLog.add(NodeEvent.forBaseline(node, String.format("Altered min distribution bit count from %d to %d", oldCount, newCount),
                         NodeEvent.Type.CURRENT, currentTime), isMaster);
        } else {
            log.log(FINE, () -> String.format("Not altering state of %s in cluster state because new state is too similar: %s",
                                              node, currentState.getTextualDifference(reportedState)));
        }

        stateMayHaveChanged = true;
    }

    public void handleNewNode(NodeInfo node) {
        String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress();
        eventLog.add(NodeEvent.forBaseline(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
    }

    public void handleMissingNode(ClusterState currentClusterState, NodeInfo node, NodeListener nodeListener) {
        long timeNow = timer.getCurrentTimeInMillis();

        if (node.getLatestNodeStateRequestTime() != null) {
            eventLog.add(NodeEvent.forBaseline(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster);
        } else {
            eventLog.add(NodeEvent.forBaseline(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster);
        }

        if (node.getReportedState().getState().equals(STOPPING)) {
            log.log(FINE, () -> "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down");
            NodeState ns = node.getReportedState().clone();
            ns.setState(DOWN);
            handleNewReportedNodeState(currentClusterState, node, ns.clone(), nodeListener);
        } else {
            log.log(FINE, () -> "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok");
        }

        stateMayHaveChanged = true;
    }

    /**
     * Propose a new state for a node. This may happen due to an administrator action, orchestration, or
     * a configuration change.
     *
     * If the newly proposed state differs from the state the node currently has in the system,
     * a cluster state regeneration will be triggered.
     */
    public void proposeNewNodeState(ClusterState currentClusterState, NodeInfo node, NodeState proposedState) {
        NodeState currentState = currentClusterState.getNodeState(node.getNode());

        if (currentState.getState().equals(proposedState.getState()))
            return;

        stateMayHaveChanged = true;

        log.log(FINE, () -> String.format("Got new wanted nodestate for %s: %s", node, currentState.getTextualDifference(proposedState)));
        // Should be checked earlier before state was set in cluster
        assert(proposedState.getState().validWantedNodeState(node.getNode().getType()));
        long timeNow = timer.getCurrentTimeInMillis();
        NodeState currentReported = node.getReportedState();
        if (proposedState.above(currentReported)) {
            eventLog.add(NodeEvent.forBaseline(node, String.format("Wanted state %s, but we cannot force node into that " +
                    "state yet as it is currently in %s", proposedState, currentReported),
                    NodeEvent.Type.REPORTED, timeNow), isMaster);
            return;
        }
        if ( ! proposedState.similarTo(currentState)) {
            eventLog.add(NodeEvent.forBaseline(node, String.format("Node state set to %s.", proposedState),
                    NodeEvent.Type.WANTED, timeNow), isMaster);
        }
    }

    public void handleNewRpcAddress(NodeInfo node) {
        String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress();
        eventLog.add(NodeEvent.forBaseline(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
    }

    public void handleReturnedRpcAddress(NodeInfo node) {
        String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress();
        eventLog.add(NodeEvent.forBaseline(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
    }

    void reconfigureFromOptions(FleetControllerOptions options) {
        setMaxPrematureCrashes(options.maxPrematureCrashes());
        setStableStateTimePeriod(options.stableStateTimePeriod());
        setMaxInitProgressTime(options.maxInitProgressTime());
        setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod());
        setMaxTransitionTime(options.maxTransitionTime());
        setStateChangedFlag(); // Always trigger state recomputation after reconfig
    }

    // TODO too many hidden behavior dependencies between this and the actually
    // generated cluster state. Still a bit of a mine field...
    // TODO remove all node state mutation from this function entirely in favor of ClusterStateGenerator!
    //  `--> this will require adding more event edges and premature crash handling to it. Which is fine.
    public boolean watchTimers(ContentCluster cluster, ClusterState currentClusterState, NodeListener nodeListener) {
        boolean triggeredAnyTimers = false;
        long currentTime = timer.getCurrentTimeInMillis();

        for(NodeInfo node : cluster.getNodeInfos()) {
            triggeredAnyTimers |= handleTimeDependentOpsForNode(currentClusterState, nodeListener, currentTime, node);
        }

        if (triggeredAnyTimers) {
            stateMayHaveChanged = true;
        }
        return triggeredAnyTimers;
    }

    private boolean handleTimeDependentOpsForNode(ClusterState currentClusterState,
                                                  NodeListener nodeListener,
                                                  long currentTime,
                                                  NodeInfo node) {
        NodeState currentStateInSystem = currentClusterState.getNodeState(node.getNode());
        NodeState lastReportedState = node.getReportedState();
        boolean triggeredAnyTimers =
                reportDownIfOutdatedSlobrokNode(currentClusterState, nodeListener, currentTime, node, lastReportedState);

        if (nodeStillUnavailableAfterTransitionTimeExceeded(currentTime, node, currentStateInSystem, lastReportedState))
            triggeredAnyTimers = true;

        if (nodeInitProgressHasTimedOut(currentTime, node, currentStateInSystem, lastReportedState)) {
            eventLog.add(NodeEvent.forBaseline(node, String.format(
                        "%d milliseconds without initialize progress. Marking node down. " +
                        "Premature crash count is now %d.",
                        currentTime - node.getInitProgressTime(),
                        node.getPrematureCrashCount() + 1),
                    NodeEvent.Type.CURRENT, currentTime), isMaster);
            handlePrematureCrash(node, nodeListener);
            triggeredAnyTimers = true;
        }

        if (mayResetCrashCounterOnStableUpNode(currentTime, node, lastReportedState)) {
            node.setPrematureCrashCount(0);
            log.log(FINE, () -> "Resetting premature crash count on node " + node + " as it has been up for a long time.");
            triggeredAnyTimers = true;
        } else if (mayResetCrashCounterOnStableDownNode(currentTime, node, lastReportedState)) {
            node.setPrematureCrashCount(0);
            log.log(FINE, () -> "Resetting premature crash count on node " + node + " as it has been down for a long time.");
            triggeredAnyTimers = true;
        }

        return triggeredAnyTimers;
    }

    private boolean nodeInitProgressHasTimedOut(long currentTime, NodeInfo node, NodeState currentStateInSystem, NodeState lastReportedState) {
        return !currentStateInSystem.getState().equals(DOWN)
            && node.getWantedState().above(new NodeState(node.getNode().getType(), DOWN))
            && lastReportedState.getState().equals(INITIALIZING)
            && maxInitProgressTime != 0
            && node.getInitProgressTime() + maxInitProgressTime <= currentTime
            && node.getNode().getType().equals(NodeType.STORAGE);
    }

    // TODO: Merge this and the below method
    private boolean mayResetCrashCounterOnStableDownNode(long currentTime, NodeInfo node, NodeState lastReportedState) {
        return node.getDownStableStateTime() + stableStateTimePeriod <= currentTime
            && lastReportedState.getState().equals(DOWN)
            && node.getPrematureCrashCount() <= maxPrematureCrashes
            && node.getPrematureCrashCount() != 0;
    }

    private boolean mayResetCrashCounterOnStableUpNode(long currentTime, NodeInfo node, NodeState lastReportedState) {
        return node.getUpStableStateTime() + stableStateTimePeriod <= currentTime
            && lastReportedState.getState().equals(State.UP)
            && node.getPrematureCrashCount() <= maxPrematureCrashes
            && node.getPrematureCrashCount() != 0;
    }

    private boolean nodeStillUnavailableAfterTransitionTimeExceeded(
            long currentTime,
            NodeInfo node,
            NodeState currentStateInSystem,
            NodeState lastReportedState)
    {
        return currentStateInSystem.getState().equals(State.MAINTENANCE)
            && node.getWantedState().above(new NodeState(node.getNode().getType(), DOWN))
            && (lastReportedState.getState().equals(DOWN) || node.isNotInSlobrok())
            && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime;
    }

    private boolean reportDownIfOutdatedSlobrokNode(ClusterState currentClusterState,
                                                    NodeListener nodeListener,
                                                    long currentTime,
                                                    NodeInfo node,
                                                    NodeState lastReportedState)
    {
        if (node.isNotInSlobrok()
            && !lastReportedState.getState().equals(DOWN)
            && node.lastSeenInSlobrok() + maxSlobrokDisconnectGracePeriod <= currentTime)
        {
            final String desc = String.format(
                    "Set node down as it has been out of slobrok for %d ms which " +
                    "is more than the max limit of %d ms.",
                    currentTime - node.lastSeenInSlobrok(),
                    maxSlobrokDisconnectGracePeriod);
            node.abortCurrentNodeStateRequests();
            NodeState state = lastReportedState.clone();
            state.setState(DOWN);
            if (!state.hasDescription()) {
                state.setDescription(desc);
            }
            eventLog.add(NodeEvent.forBaseline(node, desc, NodeEvent.Type.CURRENT, currentTime), isMaster);
            handleNewReportedNodeState(currentClusterState, node, state.clone(), nodeListener);
            node.setReportedState(state, currentTime);
            return true;
        }
        return false;
    }

    private boolean isNotControlledShutdown(NodeState state) { return ! isControlledShutdown(state); }

    private boolean isControlledShutdown(NodeState state) {
        return state.getState() == State.STOPPING
                && List.of("Received signal 15 (SIGTERM - Termination signal)", "controlled shutdown")
                       .contains(state.getDescription());
    }

    /**
     * Modify a node's cross-state information in the cluster based on a newly arrived reported state.
     *
     * @param  node the node we are computing the state of
     * @param  currentState the current state of the node
     * @param  reportedState the new state reported by (or, in the case of down - inferred from) the node
     * @param  nodeListener this listener is notified for some of the system state changes that this will return
     */
    private void updateNodeInfoFromReportedState(final NodeInfo node,
                                                 final NodeState currentState,
                                                 final NodeState reportedState,
                                                 final NodeListener nodeListener) {
        final long timeNow = timer.getCurrentTimeInMillis();
        log.log(FINE, () -> String.format("Finding new cluster state entry for %s switching state %s", node, currentState.getTextualDifference(reportedState)));

        if (handleReportedNodeCrashEdge(node, currentState, reportedState, nodeListener, timeNow)) {
            return;
        }
        if (initializationProgressHasIncreased(currentState, reportedState)) {
            node.setInitProgressTime(timeNow);
            log.log(FINEST, () -> "Reset initialize timer on " + node + " to " + node.getInitProgressTime());
        }
        if (handleImplicitCrashEdgeFromReverseInitProgress(node, currentState, reportedState, nodeListener, timeNow)) {
            return;
        }
        markNodeUnstableIfDownEdgeDuringInit(node, currentState, reportedState, nodeListener, timeNow);
    }

    // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up.
    private void markNodeUnstableIfDownEdgeDuringInit(final NodeInfo node,
                                                      final NodeState currentState,
                                                      final NodeState reportedState,
                                                      final NodeListener nodeListener,
                                                      final long timeNow) {
        if (currentState.getState().equals(INITIALIZING)
                && reportedState.getState().oneOf("ds")
                && isNotControlledShutdown(reportedState))
        {
            eventLog.add(NodeEvent.forBaseline(node, String.format("Stop or crash during initialization. " +
                    "Premature crash count is now %d.", node.getPrematureCrashCount() + 1),
                    NodeEvent.Type.CURRENT, timeNow), isMaster);
            handlePrematureCrash(node, nodeListener);
        }
    }

    // TODO do we need this when we have startup timestamps? at least it's unit tested.
    // TODO this seems fairly contrived...
    // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up.
    private boolean handleImplicitCrashEdgeFromReverseInitProgress(final NodeInfo node,
                                                                   final NodeState currentState,
                                                                   final NodeState reportedState,
                                                                   final NodeListener nodeListener,
                                                                   final long timeNow) {
        if (currentState.getState().equals(INITIALIZING) &&
            (reportedState.getState().equals(INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress()))
        {
            eventLog.add(NodeEvent.forBaseline(node, String.format(
                        "Stop or crash during initialization detected from reverse initializing progress." +
                        " Progress was %g but is now %g. Premature crash count is now %d.",
                        currentState.getInitProgress(), reportedState.getInitProgress(),
                        node.getPrematureCrashCount() + 1),
                    NodeEvent.Type.CURRENT, timeNow), isMaster);
            node.setRecentlyObservedUnstableDuringInit(true);
            handlePrematureCrash(node, nodeListener);
            return true;
        }
        return false;
    }

    private boolean handleReportedNodeCrashEdge(NodeInfo node, NodeState currentState,
                                                NodeState reportedState, NodeListener nodeListener,
                                                long timeNow) {
        if (nodeUpToDownEdge(node, currentState, reportedState)) {
            node.setTransitionTime(timeNow);
            if (node.getUpStableStateTime() + stableStateTimePeriod > timeNow && isNotControlledShutdown(reportedState)) {
                log.log(FINE, () -> "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + timeNow);
                eventLog.add(NodeEvent.forBaseline(node,
                        String.format("Stopped or possibly crashed after %d ms, which is before " +
                                      "stable state time period. Premature crash count is now %d.",
                                timeNow - node.getUpStableStateTime(), node.getPrematureCrashCount() + 1),
                        NodeEvent.Type.CURRENT,
                        timeNow), isMaster);
                return handlePrematureCrash(node, nodeListener);
            }
        }
        return false;
    }

    private boolean initializationProgressHasIncreased(NodeState currentState, NodeState reportedState) {
        return reportedState.getState().equals(INITIALIZING) &&
            (!currentState.getState().equals(INITIALIZING) ||
             reportedState.getInitProgress() > currentState.getInitProgress());
    }

    private boolean nodeUpToDownEdge(NodeInfo node, NodeState currentState, NodeState reportedState) {
        return currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis")
            && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(INITIALIZING));
    }

    private boolean handlePrematureCrash(NodeInfo node, NodeListener changeListener) {
        node.setPrematureCrashCount(node.getPrematureCrashCount() + 1);
        if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) {
            NodeState wantedState = new NodeState(node.getNode().getType(), DOWN)
                    .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row");
            NodeState oldState = node.getWantedState();
            node.setWantedState(wantedState);
            if ( ! oldState.equals(wantedState)) {
                changeListener.handleNewWantedNodeState(node, wantedState);
            }
            return true;
        }
        return false;
    }

}