From cf687abd43e57e52afe0a56df727bc0a95621da1 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 5 Oct 2016 11:30:50 +0200 Subject: Rewrite and refactor core cluster controller state generation logic Cluster controller will now generate the new cluster state on-demand in a "pure functional" way instead of conditionally patching a working state over time. This makes understanding (and changing) the state generation logic vastly easier than it previously was. --- .../core/AnnotatedClusterState.java | 69 ++ .../core/ClusterStateGenerator.java | 345 ++++++++ .../core/ClusterStateHistoryEntry.java | 46 + .../clustercontroller/core/ClusterStateReason.java | 15 + .../clustercontroller/core/ClusterStateView.java | 4 + .../core/EventDiffCalculator.java | 143 ++++ .../clustercontroller/core/FleetController.java | 197 +++-- .../core/GroupAvailabilityCalculator.java | 4 + .../core/MasterElectionHandler.java | 4 +- .../vespa/clustercontroller/core/NodeEvent.java | 4 + .../vespa/clustercontroller/core/NodeInfo.java | 32 +- .../clustercontroller/core/NodeStateReason.java | 10 + .../clustercontroller/core/StateChangeHandler.java | 530 ++++++++++++ .../core/StateVersionTracker.java | 140 +++ .../core/SystemStateGenerator.java | 941 --------------------- .../core/database/DatabaseHandler.java | 2 + .../core/database/MasterDataGatherer.java | 4 +- .../core/status/ClusterStateRequestHandler.java | 10 +- .../core/status/LegacyIndexPageRequestHandler.java | 25 +- .../core/status/StaticResourceRequestHandler.java | 66 -- 20 files changed, 1509 insertions(+), 1082 deletions(-) create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java delete mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java delete mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java (limited to 'clustercontroller-core/src/main/java') diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java new file mode 100644 index 00000000000..05a66ddbf2b --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java @@ -0,0 +1,69 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class AnnotatedClusterState { + private final ClusterState clusterState; + private final Map nodeStateReasons; + private final Optional clusterStateReason; + + public AnnotatedClusterState(ClusterState clusterState, + Optional clusterStateReason, + Map nodeStateReasons) + { + this.clusterState = clusterState; + this.clusterStateReason = clusterStateReason; + this.nodeStateReasons = nodeStateReasons; + } + + public static AnnotatedClusterState emptyState() { + return new AnnotatedClusterState(ClusterState.emptyState(), Optional.empty(), emptyNodeStateReasons()); + } + + static Map emptyNodeStateReasons() { + return Collections.emptyMap(); + } + + public ClusterState getClusterState() { + return clusterState; + } + + public Map getNodeStateReasons() { + return Collections.unmodifiableMap(nodeStateReasons); + } + + public Optional getClusterStateReason() { + return clusterStateReason; + } + + @Override + public String toString() { + return clusterState.toString(); + } + + public String toString(boolean verbose) { + return clusterState.toString(verbose); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AnnotatedClusterState that = (AnnotatedClusterState) o; + return Objects.equals(clusterState, that.clusterState) && + Objects.equals(nodeStateReasons, that.nodeStateReasons) && + Objects.equals(clusterStateReason, that.clusterStateReason); + } + + @Override + public int hashCode() { + return Objects.hash(clusterState, nodeStateReasons, clusterStateReason); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java new file mode 100644 index 00000000000..e6fbed71153 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java @@ -0,0 +1,345 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +/** + * Pure functional cluster state generator which deterministically constructs a full + * cluster state given the state of the content cluster, a set of cluster controller + * configuration parameters and the current time. + * + * State version tracking is considered orthogonal to state generation. Therefore, + * cluster state version is _not_ set here; its incrementing must be handled by the + * caller. + */ +public class ClusterStateGenerator { + + static class Params { + public ContentCluster cluster; + public Map transitionTimes; + public long currentTimeInMillis = 0; + public int maxPrematureCrashes = 0; + public int minStorageNodesUp = 1; + public int minDistributorNodesUp = 1; + public double minRatioOfStorageNodesUp = 0.0; + public double minRatioOfDistributorNodesUp = 0.0; + public double minNodeRatioPerGroup = 0.0; + public int idealDistributionBits = 16; + public int highestObservedDistributionBitCount = 16; + public int lowestObservedDistributionBitCount = 16; + public int maxInitProgressTimeMs = 5000; + + Params() { + this.transitionTimes = buildTransitionTimeMap(0, 0); + } + + // FIXME de-dupe + static Map buildTransitionTimeMap(int distributorTransitionTimeMs, int storageTransitionTimeMs) { + Map maxTransitionTime = new TreeMap<>(); + maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.DISTRIBUTOR, distributorTransitionTimeMs); + maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.STORAGE, storageTransitionTimeMs); + return maxTransitionTime; + } + + Params cluster(ContentCluster cluster) { + this.cluster = cluster; + return this; + } + Params maxInitProgressTime(int maxTimeMs) { + this.maxInitProgressTimeMs = maxTimeMs; + return this; + } + Params transitionTimes(int timeMs) { + this.transitionTimes = buildTransitionTimeMap(timeMs, timeMs); + return this; + } + Params transitionTimes(Map timesMs) { + this.transitionTimes = timesMs; + return this; + } + Params currentTimeInMilllis(long currentTimeMs) { + this.currentTimeInMillis = currentTimeMs; + return this; + } + Params maxPrematureCrashes(int count) { + this.maxPrematureCrashes = count; + return this; + } + Params minStorageNodesUp(int nodes) { + this.minStorageNodesUp = nodes; + return this; + } + Params minDistributorNodesUp(int nodes) { + this.minDistributorNodesUp = nodes; + return this; + } + Params minRatioOfStorageNodesUp(double minRatio) { + this.minRatioOfStorageNodesUp = minRatio; + return this; + } + Params minRatioOfDistributorNodesUp(double minRatio) { + this.minRatioOfDistributorNodesUp = minRatio; + return this; + } + Params minNodeRatioPerGroup(double minRatio) { + this.minNodeRatioPerGroup = minRatio; + return this; + } + Params idealDistributionBits(int distributionBits) { + this.idealDistributionBits = distributionBits; + return this; + } + Params highestObservedDistributionBitCount(int bitCount) { + this.highestObservedDistributionBitCount = bitCount; + return this; + } + Params lowestObservedDistributionBitCount(int bitCount) { + this.lowestObservedDistributionBitCount = bitCount; + return this; + } + + /** + * Infer parameters from controller options. Important: does _not_ set cluster; + * it must be explicitly set afterwards on the returned parameter object before + * being used to compute states. + */ + static Params fromOptions(FleetControllerOptions opts) { + return new Params() + .maxPrematureCrashes(opts.maxPrematureCrashes) + .minStorageNodesUp(opts.minStorageNodesUp) + .minDistributorNodesUp(opts.minDistributorNodesUp) + .minRatioOfStorageNodesUp(opts.minRatioOfStorageNodesUp) + .minRatioOfDistributorNodesUp(opts.minRatioOfDistributorNodesUp) + .minNodeRatioPerGroup(opts.minNodeRatioPerGroup) + .idealDistributionBits(opts.distributionBits) + .transitionTimes(opts.maxTransitionTime); + } + } + + static AnnotatedClusterState generatedStateFrom(final Params params) { + final ContentCluster cluster = params.cluster; + final ClusterState workingState = ClusterState.emptyState(); + final Map nodeStateReasons = new HashMap<>(); + + for (final NodeInfo nodeInfo : cluster.getNodeInfo()) { + final NodeState nodeState = computeEffectiveNodeState(nodeInfo, params); + workingState.setNodeState(nodeInfo.getNode(), nodeState); + } + + takeDownGroupsWithTooLowAvailability(workingState, nodeStateReasons, params); + + final Optional reasonToBeDown = clusterDownReason(workingState, params); + if (reasonToBeDown.isPresent()) { + workingState.setClusterState(State.DOWN); + } + workingState.setDistributionBits(inferDistributionBitCount(cluster, workingState, params)); + + return new AnnotatedClusterState(workingState, reasonToBeDown, nodeStateReasons); + } + + private static boolean nodeIsConsideredTooUnstable(final NodeInfo nodeInfo, final Params params) { + return (params.maxPrematureCrashes != 0 + && nodeInfo.getPrematureCrashCount() > params.maxPrematureCrashes); + } + + private static void applyWantedStateToBaselineState(final NodeState baseline, final NodeState wanted) { + // Only copy state and description from Wanted state; this preserves auxiliary + // information such as disk states and startup timestamp. + baseline.setState(wanted.getState()); + baseline.setDescription(wanted.getDescription()); + } + + private static NodeState computeEffectiveNodeState(final NodeInfo nodeInfo, final Params params) { + final NodeState reported = nodeInfo.getReportedState(); + final NodeState wanted = nodeInfo.getWantedState(); + final NodeState baseline = reported.clone(); + + if (nodeIsConsideredTooUnstable(nodeInfo, params)) { + baseline.setState(State.DOWN); + } + if (startupTimestampAlreadyObservedByAllNodes(nodeInfo, baseline)) { + baseline.setStartTimestamp(0); + } + if (nodeInfo.isStorage()) { + applyStorageSpecificStateTransforms(nodeInfo, params, reported, wanted, baseline); + } + if (baseline.above(wanted)) { + applyWantedStateToBaselineState(baseline, wanted); + } + + return baseline; + } + + private static void applyStorageSpecificStateTransforms(NodeInfo nodeInfo, Params params, NodeState reported, + NodeState wanted, NodeState baseline) + { + if (reported.getState() == State.INITIALIZING) { + if (timedOutWithoutNewInitProgress(reported, nodeInfo, params) + || shouldForceInitToDown(reported) + || nodeInfo.recentlyObservedUnstableDuringInit()) + { + baseline.setState(State.DOWN); + } + if (shouldForceInitToMaintenance(reported, wanted)) { + baseline.setState(State.MAINTENANCE); + } + } + // TODO ensure that maintenance cannot override Down for any other cases + if (withinTemporalMaintenancePeriod(nodeInfo, baseline, params) && wanted.getState() != State.DOWN) { + baseline.setState(State.MAINTENANCE); + } + } + + // TODO remove notion of init timeout progress? Seems redundant when we've already got RPC timeouts + private static boolean timedOutWithoutNewInitProgress(final NodeState reported, final NodeInfo nodeInfo, final Params params) { + if (reported.getState() != State.INITIALIZING) { + return false; + } + if (params.maxInitProgressTimeMs <= 0) { + return false; // No upper bound for max init time; auto-down for all intents and purposes disabled. + } + return nodeInfo.getInitProgressTime() + params.maxInitProgressTimeMs <= params.currentTimeInMillis; + } + + // Init while listing buckets should be treated as Down, as distributors expect a storage node + // in Init mode to have a bucket set readily available. Clients also expect a node in Init to + // be able to receive operations. + // Precondition: reported.getState() == State.INITIALIZING + private static boolean shouldForceInitToDown(final NodeState reported) { + return reported.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001; + } + + // Special case: since each node is published with a single state, if we let a Retired node + // be published with Initializing, it'd start receiving feed and merges. Avoid this by + // having it be in maintenance instead for the duration of the init period. + private static boolean shouldForceInitToMaintenance(final NodeState reported, final NodeState wanted) { + return reported.getState() == State.INITIALIZING && wanted.getState() == State.RETIRED; + } + + private static boolean startupTimestampAlreadyObservedByAllNodes(final NodeInfo nodeInfo, final NodeState baseline) { + return baseline.getStartTimestamp() == nodeInfo.getStartTimestamp(); // TODO rename NodeInfo getter/setter + } + + /** + * Determines whether a given storage node should be implicitly set as being + * in a maintenance state despite its reported state being Down. This is + * predominantly a case when contact has just been lost with a node, but we + * do not want to immediately set it to Down just yet (where "yet" is a configurable + * amount of time; see params.transitionTime). This is to prevent common node + * restart/upgrade scenarios from triggering redistribution and data replication + * that would be useless work if the node comes back up immediately afterwards. + * + * Only makes sense to call for storage nodes, since distributors don't support + * being in maintenance mode. + */ + private static boolean withinTemporalMaintenancePeriod(final NodeInfo nodeInfo, + final NodeState baseline, + final Params params) + { + final Integer transitionTime = params.transitionTimes.get(nodeInfo.getNode().getType()); + if (transitionTime == 0 || !baseline.getState().oneOf("sd")) { + return false; + } + return nodeInfo.getTransitionTime() + transitionTime > params.currentTimeInMillis; + } + + private static void takeDownGroupsWithTooLowAvailability(final ClusterState workingState, + Map nodeStateReasons, + final Params params) + { + final GroupAvailabilityCalculator calc = new GroupAvailabilityCalculator.Builder() + .withMinNodeRatioPerGroup(params.minNodeRatioPerGroup) + .withDistribution(params.cluster.getDistribution()) + .build(); + final Set nodesToTakeDown = calc.nodesThatShouldBeDown(workingState); + + for (Integer idx : nodesToTakeDown) { + final Node node = storageNode(idx); + final NodeState newState = new NodeState(NodeType.STORAGE, State.DOWN); + newState.setDescription("group node availability below configured threshold"); + workingState.setNodeState(node, newState); + nodeStateReasons.put(node, NodeStateReason.GROUP_IS_DOWN); + } + } + + private static Node storageNode(int index) { + return new Node(NodeType.STORAGE, index); + } + + // TODO we'll want to explicitly persist a bit lower bound in ZooKeeper and ensure we + // never go below it (this is _not_ the case today). Nodes that have min bits lower than + // this will just have to start splitting out in the background before being allowed + // to join the cluster. + + private static int inferDistributionBitCount(final ContentCluster cluster, + final ClusterState state, + final Params params) + { + int bitCount = params.idealDistributionBits; + final Optional minBits = cluster.getConfiguredNodes().values().stream() + .map(configuredNode -> cluster.getNodeInfo(storageNode(configuredNode.index()))) + .filter(node -> state.getNodeState(node.getNode()).getState().oneOf("iur")) + .map(nodeInfo -> nodeInfo.getReportedState().getMinUsedBits()) + .min(Integer::compare); + + if (minBits.isPresent() && minBits.get() < bitCount) { + bitCount = minBits.get(); + } + if (bitCount > params.lowestObservedDistributionBitCount && bitCount < params.idealDistributionBits) { + bitCount = params.lowestObservedDistributionBitCount; + } + + return bitCount; + } + + private static boolean nodeStateIsConsideredAvailable(final NodeState ns) { + return (ns.getState() == State.UP + || ns.getState() == State.RETIRED + || ns.getState() == State.INITIALIZING); + } + + private static long countAvailableNodesOfType(final NodeType type, + final ContentCluster cluster, + final ClusterState state) + { + return cluster.getConfiguredNodes().values().stream() + .map(node -> state.getNodeState(new Node(type, node.index()))) + .filter(ClusterStateGenerator::nodeStateIsConsideredAvailable) + .count(); + } + + private static Optional clusterDownReason(final ClusterState state, final Params params) { + final ContentCluster cluster = params.cluster; + + final long upStorageCount = countAvailableNodesOfType(NodeType.STORAGE, cluster, state); + final long upDistributorCount = countAvailableNodesOfType(NodeType.DISTRIBUTOR, cluster, state); + // There's a 1-1 relationship between distributors and storage nodes, so don't need to + // keep track of separate node counts for computing availability ratios. + final long nodeCount = cluster.getConfiguredNodes().size(); + + if (upStorageCount < params.minStorageNodesUp) { + return Optional.of(ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE); + } + if (upDistributorCount < params.minDistributorNodesUp) { + return Optional.of(ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE); + } + if (params.minRatioOfStorageNodesUp * nodeCount > upStorageCount) { + return Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO); + } + if (params.minRatioOfDistributorNodesUp * nodeCount > upDistributorCount) { + return Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO); + } + return Optional.empty(); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java new file mode 100644 index 00000000000..3963fcaa45b --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java @@ -0,0 +1,46 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +import java.util.Objects; + +public class ClusterStateHistoryEntry { + + private final ClusterState state; + private final long time; + + ClusterStateHistoryEntry(final ClusterState state, final long time) { + this.state = state; + this.time = time; + } + + public ClusterState state() { + return state; + } + + public long time() { + return time; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterStateHistoryEntry that = (ClusterStateHistoryEntry) o; + return time == that.time && + Objects.equals(state, that.state); + } + + @Override + public int hashCode() { + return Objects.hash(state, time); + } + + // String representation only used for test expectation failures and debugging output. + // Actual status page history entry rendering emits formatted date/time. + public String toString() { + return String.format("state '%s' at time %d", state, time); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java new file mode 100644 index 00000000000..3557ed1ceb8 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +/** + * Explicit reasons for why a cluster has been assigned a particular global state. + * This only includes reasons that aren't directly possible to infer from diffing + * two cluster states; i.e. distribution bit changes aren't listed here because + * they are obvious from direct inspection. + */ +public enum ClusterStateReason { + TOO_FEW_STORAGE_NODES_AVAILABLE, + TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE, + TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO, + TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO, +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java index 328acfb4dbe..644d6b28b05 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java @@ -41,6 +41,10 @@ public class ClusterStateView { return new ClusterStateView(clusterState, createNewAggregator(clusterState, metricUpdater), metricUpdater); } + public static ClusterStateView create(final ClusterState clusterState, final MetricUpdater metricUpdater) { + return new ClusterStateView(clusterState, createNewAggregator(clusterState, metricUpdater), metricUpdater); + } + private static ClusterStatsAggregator createNewAggregator(ClusterState clusterState, MetricUpdater metricUpdater) { Set upDistributors = getIndicesOfUpNodes(clusterState, NodeType.DISTRIBUTOR); Set upStorageNodes = getIndicesOfUpNodes(clusterState, NodeType.STORAGE); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java new file mode 100644 index 00000000000..2e5d99f2e67 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java @@ -0,0 +1,143 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Responsible for inferring the difference between two cluster states and their + * state annotations and producing a set of events that describe the changes between + * the two. Diffing the states directly provides a clear picture of _what_ has changed, + * while the annotations are generally required to explain _why_ the changes happened + * in the first place. + * + * Events are primarily used for administrative/user visibility into what's happening + * in the cluster and are output to the Vespa log as well as kept in a circular history + * buffer per node and for the cluster as a whole. + */ +public class EventDiffCalculator { + + static class Params { + ContentCluster cluster; + AnnotatedClusterState fromState; + AnnotatedClusterState toState; + long currentTime; + + public Params cluster(ContentCluster cluster) { + this.cluster = cluster; + return this; + } + public Params fromState(AnnotatedClusterState clusterState) { + this.fromState = clusterState; + return this; + } + public Params toState(AnnotatedClusterState clusterState) { + this.toState = clusterState; + return this; + } + public Params currentTimeMs(long time) { + this.currentTime = time; + return this; + } + } + + public static List computeEventDiff(final Params params) { + final List events = new ArrayList<>(); + + emitPerNodeDiffEvents(params, events); + emitWholeClusterDiffEvent(params, events); + return events; + } + + private static ClusterEvent createClusterEvent(String description, Params params) { + return new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, description, params.currentTime); + } + + private static boolean clusterDownBecause(final Params params, ClusterStateReason wantedReason) { + final Optional actualReason = params.toState.getClusterStateReason(); + return actualReason.isPresent() && actualReason.get().equals(wantedReason); + } + + private static void emitWholeClusterDiffEvent(final Params params, final List events) { + final ClusterState fromState = params.fromState.getClusterState(); + final ClusterState toState = params.toState.getClusterState(); + + if (clusterHasTransitionedToUpState(fromState, toState)) { + events.add(createClusterEvent("Enough nodes available for system to become up", params)); + } else if (clusterHasTransitionedToDownState(fromState, toState)) { + if (clusterDownBecause(params, ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE)) { + events.add(createClusterEvent("Too few storage nodes available in cluster. Setting cluster state down", params)); + } else if (clusterDownBecause(params, ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE)) { + events.add(createClusterEvent("Too few distributor nodes available in cluster. Setting cluster state down", params)); + } else if (clusterDownBecause(params, ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO)) { + events.add(createClusterEvent("Too low ratio of available storage nodes. Setting cluster state down", params)); + } else if (clusterDownBecause(params, ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO)) { + events.add(createClusterEvent("Too low ratio of available distributor nodes. Setting cluster state down", params)); + } else { + events.add(createClusterEvent("Cluster is down", params)); + } + } + } + + private static NodeEvent createNodeEvent(NodeInfo nodeInfo, String description, Params params) { + return new NodeEvent(nodeInfo, description, NodeEvent.Type.CURRENT, params.currentTime); + } + + private static void emitPerNodeDiffEvents(final Params params, final List events) { + final ContentCluster cluster = params.cluster; + final ClusterState fromState = params.fromState.getClusterState(); + final ClusterState toState = params.toState.getClusterState(); + + for (ConfiguredNode node : cluster.getConfiguredNodes().values()) { + for (NodeType nodeType : NodeType.getTypes()) { + final Node n = new Node(nodeType, node.index()); + emitSingleNodeEvents(params, events, cluster, fromState, toState, n); + } + } + } + + private static void emitSingleNodeEvents(Params params, List events, ContentCluster cluster, ClusterState fromState, ClusterState toState, Node n) { + final NodeState nodeFrom = fromState.getNodeState(n); + final NodeState nodeTo = toState.getNodeState(n); + if (!nodeTo.equals(nodeFrom)) { + final NodeInfo info = cluster.getNodeInfo(n); + events.add(createNodeEvent(info, String.format("Altered node state in cluster state from '%s' to '%s'", + nodeFrom.toString(true), nodeTo.toString(true)), params)); + + NodeStateReason prevReason = params.fromState.getNodeStateReasons().get(n); + NodeStateReason currReason = params.toState.getNodeStateReasons().get(n); + if (isGroupDownEdge(prevReason, currReason)) { + events.add(createNodeEvent(info, "Group node availability is below configured threshold", params)); + } else if (isGroupUpEdge(prevReason, currReason)) { + events.add(createNodeEvent(info, "Group node availability has been restored", params)); + } + } + } + + private static boolean isGroupUpEdge(NodeStateReason prevReason, NodeStateReason currReason) { + return prevReason == NodeStateReason.GROUP_IS_DOWN && currReason != NodeStateReason.GROUP_IS_DOWN; + } + + private static boolean isGroupDownEdge(NodeStateReason prevReason, NodeStateReason currReason) { + return prevReason != NodeStateReason.GROUP_IS_DOWN && currReason == NodeStateReason.GROUP_IS_DOWN; + } + + private static boolean clusterHasTransitionedToUpState(ClusterState prevState, ClusterState currentState) { + return prevState.getClusterState() != State.UP && currentState.getClusterState() == State.UP; + } + + private static boolean clusterHasTransitionedToDownState(ClusterState prevState, ClusterState currentState) { + return prevState.getClusterState() != State.DOWN && currentState.getClusterState() == State.DOWN; + } + + public static Params params() { return new Params(); } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index ceeeddf49fa..b21cae4ed71 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -7,6 +7,7 @@ import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; import com.yahoo.vespa.clustercontroller.core.listeners.*; @@ -37,8 +38,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final ContentCluster cluster; private final Communicator communicator; private final NodeStateGatherer stateGatherer; - private final SystemStateGenerator systemStateGenerator; + private final StateChangeHandler stateChangeHandler; private final SystemStateBroadcaster systemStateBroadcaster; + private final StateVersionTracker stateVersionTracker; private final StatusPageServerInterface statusPageServer; private final RpcServer rpcServer; private final DatabaseHandler database; @@ -59,7 +61,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final List newStates = new ArrayList<>(); private long configGeneration = -1; private long nextConfigGeneration = -1; - private List remoteTasks = new ArrayList<>(); + private Queue remoteTasks = new LinkedList<>(); private final MetricUpdater metricUpdater; private boolean isMaster = false; @@ -69,7 +71,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final RunDataExtractor dataExtractor = new RunDataExtractor() { @Override - public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return systemStateGenerator.getClusterState(); } + public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); } @Override public FleetControllerOptions getOptions() { return options; } @Override @@ -87,7 +89,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd RpcServer server, NodeLookup nodeLookup, DatabaseHandler database, - SystemStateGenerator systemStateGenerator, + StateChangeHandler stateChangeHandler, SystemStateBroadcaster systemStateBroadcaster, MasterElectionHandler masterElectionHandler, MetricUpdater metricUpdater, @@ -103,8 +105,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd this.communicator = communicator; this.database = database; this.stateGatherer = nodeStateGatherer; - this.systemStateGenerator = systemStateGenerator; + this.stateChangeHandler = stateChangeHandler; this.systemStateBroadcaster = systemStateBroadcaster; + this.stateVersionTracker = new StateVersionTracker(metricUpdater); this.metricUpdater = metricUpdater; this.statusPageServer = statusPage; @@ -120,12 +123,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd new NodeHealthRequestHandler(dataExtractor)); this.statusRequestRouter.addHandler( "^/clusterstate", - new ClusterStateRequestHandler(systemStateGenerator)); + new ClusterStateRequestHandler(stateVersionTracker)); this.statusRequestRouter.addHandler( "^/$", new LegacyIndexPageRequestHandler( timer, options.showLocalSystemStatesInEventLog, cluster, - masterElectionHandler, systemStateGenerator, + masterElectionHandler, stateVersionTracker, eventLog, timer.getCurrentTimeInMillis(), dataExtractor)); propagateOptions(); @@ -169,7 +172,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.nodeStateRequestRoundTripTimeMaxSeconds); DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); NodeLookup lookUp = new SlobrokClient(timer); - SystemStateGenerator stateGenerator = new SystemStateGenerator(timer, log, metricUpdater); + StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); FleetController controller = new FleetController( @@ -246,7 +249,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public com.yahoo.vdslib.state.ClusterState getSystemState() { synchronized(monitor) { - return systemStateGenerator.getClusterState(); + return stateVersionTracker.getVersionedClusterState(); } } @@ -299,41 +302,41 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public void handleNewNodeState(NodeInfo node, NodeState newState) { verifyInControllerThread(); - systemStateGenerator.handleNewReportedNodeState(node, newState, this); + stateChangeHandler.handleNewReportedNodeState(stateVersionTracker.getVersionedClusterState(), node, newState, this); } @Override public void handleNewWantedNodeState(NodeInfo node, NodeState newState) { verifyInControllerThread(); wantedStateChanged = true; - systemStateGenerator.proposeNewNodeState(node, newState); + stateChangeHandler.proposeNewNodeState(stateVersionTracker.getVersionedClusterState(), node, newState); } @Override public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo newHostInfo) { verifyInControllerThread(); - systemStateGenerator.handleUpdatedHostInfo(nodeInfo, newHostInfo); + stateVersionTracker.handleUpdatedHostInfo(stateChangeHandler.getHostnames(), nodeInfo, newHostInfo); } @Override public void handleNewNode(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleNewNode(node); + stateChangeHandler.handleNewNode(node); } @Override public void handleMissingNode(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleMissingNode(node, this); + stateChangeHandler.handleMissingNode(stateVersionTracker.getVersionedClusterState(), node, this); } @Override public void handleNewRpcAddress(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleNewRpcAddress(node); + stateChangeHandler.handleNewRpcAddress(node); } @Override public void handleReturnedRpcAddress(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleReturnedRpcAddress(node); + stateChangeHandler.handleReturnedRpcAddress(node); } public void handleNewSystemState(com.yahoo.vdslib.state.ClusterState state) { @@ -370,7 +373,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd /** Called when all distributors have acked newest cluster state version. */ public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { - systemStateGenerator.handleAllDistributorsInSync(database, context); + Set nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); + stateChangeHandler.handleAllDistributorsInSync( + stateVersionTracker.getVersionedClusterState(), nodes, database, context); } private boolean changesConfiguredNodeSet(Collection newNodes) { @@ -409,17 +414,11 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd database.setZooKeeperSessionTimeout(options.zooKeeperSessionTimeout); stateGatherer.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); stateGatherer.setNodeStateRequestTimeout(options.nodeStateRequestTimeoutMS); - systemStateGenerator.setNodes(cluster.clusterInfo()); - systemStateGenerator.setMaxTransitionTime(options.maxTransitionTime); - systemStateGenerator.setMaxInitProgressTime(options.maxInitProgressTime); - systemStateGenerator.setMaxPrematureCrashes(options.maxPrematureCrashes); - systemStateGenerator.setStableStateTimePeriod(options.stableStateTimePeriod); - systemStateGenerator.setMinNodesUp(options.minDistributorNodesUp, options.minStorageNodesUp, - options.minRatioOfDistributorNodesUp, options.minRatioOfStorageNodesUp); - systemStateGenerator.setMinNodeRatioPerGroup(options.minNodeRatioPerGroup); - systemStateGenerator.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); - systemStateGenerator.setDistributionBits(options.distributionBits); - systemStateGenerator.setDistribution(options.storageDistribution); + + // TODO: remove as many temporal parameter dependencies as possible here. Currently duplication of state. + stateChangeHandler.reconfigureFromOptions(options); + stateChangeHandler.setStateChangedFlag(); // Always trigger state recomputation after reconfig + masterElectionHandler.setFleetControllerCount(options.fleetControllerCount); masterElectionHandler.setMasterZooKeeperCooldownPeriod(options.masterZooKeeperCooldownPeriod); @@ -491,7 +490,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd didWork = database.doNextZooKeeperTask(databaseContext); didWork |= updateMasterElectionState(); didWork |= handleLeadershipEdgeTransitions(); - systemStateGenerator.setMaster(isMaster); + stateChangeHandler.setMaster(isMaster); // Process zero or more getNodeState responses that we have received. didWork |= stateGatherer.processResponses(this); @@ -510,10 +509,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd didWork |= processAnyPendingStatusPageRequest(); if (rpcServer != null) { - didWork |= rpcServer.handleRpcRequests(cluster, systemStateGenerator.getClusterState(), this, this); + didWork |= rpcServer.handleRpcRequests(cluster, consolidatedClusterState(), this, this); } - processAllQueuedRemoteTasks(); + didWork |= processNextQueuedRemoteTask(); processingCycle = false; ++cycleCount; @@ -606,25 +605,52 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } } - private void processAllQueuedRemoteTasks() { + private boolean processNextQueuedRemoteTask() { if ( ! remoteTasks.isEmpty()) { - RemoteClusterControllerTask.Context context = new RemoteClusterControllerTask.Context(); - context.cluster = cluster; - context.currentState = systemStateGenerator.getConsolidatedClusterState(); - context.masterInfo = masterElectionHandler; - context.nodeStateOrHostInfoChangeHandler = this; - context.nodeAddedOrRemovedListener = this; - for (RemoteClusterControllerTask task : remoteTasks) { - log.finest("Processing remote task " + task.getClass().getName()); - task.doRemoteFleetControllerTask(context); - task.notifyCompleted(); - log.finest("Done processing remote task " + task.getClass().getName()); - } - log.fine("Completed processing remote tasks"); - remoteTasks.clear(); + final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); + final RemoteClusterControllerTask task = remoteTasks.poll(); + log.finest("Processing remote task " + task.getClass().getName()); + task.doRemoteFleetControllerTask(context); + task.notifyCompleted(); + log.finest("Done processing remote task " + task.getClass().getName()); + return true; } + return false; } + private RemoteClusterControllerTask.Context createRemoteTaskProcessingContext() { + final RemoteClusterControllerTask.Context context = new RemoteClusterControllerTask.Context(); + context.cluster = cluster; + context.currentState = consolidatedClusterState(); + context.masterInfo = masterElectionHandler; + context.nodeStateOrHostInfoChangeHandler = this; + context.nodeAddedOrRemovedListener = this; + return context; + } + + /** + * A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are + * up or down even when the whole cluster is down. The regular, published cluster state is not + * normally updated to reflect node events when the cluster is down. + */ + ClusterState consolidatedClusterState() { + final ClusterState publishedState = stateVersionTracker.getVersionedClusterState(); + if (publishedState.getClusterState() == State.UP) { + return publishedState; // Short-circuit; already represents latest node state + } + // Latest candidate state contains the most up to date state information, even if it may not + // have been published yet. + final ClusterState current = stateVersionTracker.getLatestCandidateState().getClusterState().clone(); + current.setVersion(publishedState.getVersion()); + return current; + } + + /* + System test observations: + - a node that stops normally (U -> S) then goes down erroneously triggers premature crash handling + - long time before content node state convergence (though this seems to be the case for legacy impl as well) + */ + private boolean resyncLocallyCachedState() throws InterruptedException { boolean didWork = false; // Let non-master state gatherers update wanted states once in a while, so states generated and shown are close to valid. @@ -637,31 +663,99 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Send getNodeState requests to zero or more nodes. didWork |= stateGatherer.sendMessages(cluster, communicator, this); - didWork |= systemStateGenerator.watchTimers(cluster, this); - didWork |= systemStateGenerator.notifyIfNewSystemState(cluster, this); + // Important: timer events must use a consolidated state, or they might trigger edge events multiple times. + didWork |= stateChangeHandler.watchTimers(cluster, consolidatedClusterState(), this); + + didWork |= recomputeClusterStateIfRequired(); if ( ! isStateGatherer) { if ( ! isMaster) { eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis())); // Update versions to use so what is shown is closer to what is reality on the master - systemStateGenerator.setLatestSystemStateVersion(database.getLatestSystemStateVersion()); + stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); } } isStateGatherer = true; return didWork; } + private boolean recomputeClusterStateIfRequired() { + if (mustRecomputeCandidateClusterState()) { + stateChangeHandler.unsetStateChangedFlag(); + final AnnotatedClusterState candidate = computeCurrentAnnotatedState(); + stateVersionTracker.updateLatestCandidateState(candidate); + + if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish() + || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()) + { + final long timeNowMs = timer.getCurrentTimeInMillis(); + final AnnotatedClusterState before = stateVersionTracker.getAnnotatedVersionedClusterState(); + + stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); + emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs); + handleNewSystemState(stateVersionTracker.getVersionedClusterState()); + return true; + } + } + return false; + } + + private AnnotatedClusterState computeCurrentAnnotatedState() { + ClusterStateGenerator.Params params = ClusterStateGenerator.Params.fromOptions(options); + params.currentTimeInMilllis(timer.getCurrentTimeInMillis()) + .cluster(cluster) + .lowestObservedDistributionBitCount(stateVersionTracker.getLowestObservedDistributionBits()); + return ClusterStateGenerator.generatedStateFrom(params); + } + + private void emitEventsForAlteredStateEdges(final AnnotatedClusterState fromState, + final AnnotatedClusterState toState, + final long timeNowMs) { + final List deltaEvents = EventDiffCalculator.computeEventDiff( + EventDiffCalculator.params() + .cluster(cluster) + .fromState(fromState) + .toState(toState) + .currentTimeMs(timeNowMs)); + for (Event event : deltaEvents) { + eventLog.add(event, isMaster); + } + + emitStateAppliedEvents(timeNowMs, fromState.getClusterState(), toState.getClusterState()); + } + + private void emitStateAppliedEvents(long timeNowMs, ClusterState fromClusterState, ClusterState toClusterState) { + eventLog.add(new ClusterEvent( + ClusterEvent.Type.SYSTEMSTATE, + "New cluster state version " + toClusterState.getVersion() + ". Change from last: " + + fromClusterState.getTextualDifference(toClusterState), + timeNowMs), isMaster); + + if (toClusterState.getDistributionBitCount() != fromClusterState.getDistributionBitCount()) { + eventLog.add(new ClusterEvent( + ClusterEvent.Type.SYSTEMSTATE, + "Altering distribution bits in system from " + + fromClusterState.getDistributionBitCount() + " to " + + toClusterState.getDistributionBitCount(), + timeNowMs), isMaster); + } + } + + private boolean mustRecomputeCandidateClusterState() { + return stateChangeHandler.stateMayHaveChanged() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper(); + } + private boolean handleLeadershipEdgeTransitions() throws InterruptedException { boolean didWork = false; if (masterElectionHandler.isMaster()) { if ( ! isMaster) { metricUpdater.becameMaster(); // If we just became master, restore wanted states from database - systemStateGenerator.setLatestSystemStateVersion(database.getLatestSystemStateVersion()); + stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); didWork = database.loadStartTimestamps(cluster); didWork |= database.loadWantedStates(databaseContext); eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to " - + systemStateGenerator.getClusterState().getVersion() + " to be in line.", timer.getCurrentTimeInMillis())); + + stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis())); long currentTime = timer.getCurrentTimeInMillis(); firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast; log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be " @@ -693,6 +787,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } catch (InterruptedException e) { log.log(LogLevel.DEBUG, "Event thread stopped by interrupt exception: " + e); } catch (Throwable t) { + t.printStackTrace(); log.log(LogLevel.ERROR, "Fatal error killed fleet controller", t); synchronized (monitor) { running = false; } System.exit(1); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java index 74b15b61ac3..e24e5f6914e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java @@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.stream.Stream; @@ -105,6 +106,9 @@ class GroupAvailabilityCalculator { } public Set nodesThatShouldBeDown(ClusterState state) { + if (distribution == null) { // FIXME: for tests that don't set distribution properly! + return Collections.emptySet(); + } if (isFlatCluster(distribution.getRootGroup())) { // Implicit group takedown only applies to hierarchic cluster setups. return new HashSet<>(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java index 6c48bdf12d0..1a48b088ca3 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java @@ -240,7 +240,7 @@ public class MasterElectionHandler implements MasterInterface { .append(".

"); } else if (masterGoneFromZooKeeperTime + masterZooKeeperCooldownPeriod > timer.getCurrentTimeInMillis()) { long time = timer.getCurrentTimeInMillis() - masterGoneFromZooKeeperTime; - sb.append("

There is currently no master. Only " + (time / 1000) + " seconds have past since") + sb.append("

There is currently no master. Only " + (time / 1000) + " seconds have passed since") .append(" old master disappeared. At least " + (masterZooKeeperCooldownPeriod / 1000) + " must pass") .append(" before electing new master unless all possible master candidates are online.

"); } @@ -249,7 +249,7 @@ public class MasterElectionHandler implements MasterInterface { sb.append("

As we are number ").append(nextInLineCount) .append(" in line for taking over as master, we're gathering state from nodes.

"); sb.append("

As we are not the master, we don't know about nodes current system state" - + " or wanted states, so some statistics below are a bit incorrect. Look at status page on master " + + " or wanted states, so some statistics below may be stale. Look at status page on master " + "for updated data.

"); } if (index * 2 > totalCount) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java index d9d83c705b1..944cbd02082 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java @@ -45,4 +45,8 @@ public class NodeEvent implements Event { public String getCategory() { return type.toString(); } + + public Type getType() { + return type; + } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index c261a4bb194..87a32e1e088 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -35,6 +35,18 @@ abstract public class NodeInfo implements Comparable { /** Whether this node has been configured to be retired and should therefore always return retired as its wanted state */ private boolean configuredRetired; + /** + * Node has been observed transitioning from Init to Down at least once during the last "premature crash count" + * period. Gets reset whenever the crash count is reset to zero after a period of stability. + * + * Flag can also be explicitly toggled by external code, such as if a reported node state + * handler discovers "reverse" init progress. This indicates a "silent" down edge and should be + * handled as such. + * + * It is an explicit choice that we only do this on an edge to Down (and not Stopping). Stopping implies + * an administrative action, not that the node itself is unstable. + */ + private boolean recentlyObservedUnstableDuringInit; /** The time we set the current state last. */ private long nextAttemptTime; @@ -97,6 +109,7 @@ abstract public class NodeInfo implements Comparable { this.version = getLatestVersion(); this.connectionVersion = getLatestVersion(); this.configuredRetired = configuredRetired; + this.recentlyObservedUnstableDuringInit = false; this.rpcAddress = rpcAddress; this.lastSeenInSlobrok = null; this.nextAttemptTime = 0; @@ -132,7 +145,17 @@ abstract public class NodeInfo implements Comparable { public int getConnectionAttemptCount() { return connectionAttemptCount; } + public boolean recentlyObservedUnstableDuringInit() { + return recentlyObservedUnstableDuringInit; + } + public void setRecentlyObservedUnstableDuringInit(boolean unstable) { + recentlyObservedUnstableDuringInit = unstable; + } + public void setPrematureCrashCount(int count) { + if (count == 0) { + recentlyObservedUnstableDuringInit = false; + } if (prematureCrashCount != count) { prematureCrashCount = count; log.log(LogLevel.DEBUG, "Premature crash count on " + toString() + " set to " + count); @@ -213,6 +236,7 @@ abstract public class NodeInfo implements Comparable { public ContentCluster getCluster() { return cluster; } /** Returns true if the node is currentl registered in slobrok */ + // FIXME why is this called "isRpcAddressOutdated" then??? public boolean isRpcAddressOutdated() { return lastSeenInSlobrok != null; } public Long getRpcAddressOutdatedTimestamp() { return lastSeenInSlobrok; } @@ -277,8 +301,10 @@ abstract public class NodeInfo implements Comparable { if (state.getState().equals(State.DOWN) && !reportedState.getState().oneOf("d")) { downStableStateTime = time; log.log(LogLevel.DEBUG, "Down stable state on " + toString() + " altered to " + time); - } - else if (state.getState().equals(State.UP) && !reportedState.getState().oneOf("u")) { + if (reportedState.getState() == State.INITIALIZING) { + recentlyObservedUnstableDuringInit = true; + } + } else if (state.getState().equals(State.UP) && !reportedState.getState().oneOf("u")) { upStableStateTime = time; log.log(LogLevel.DEBUG, "Up stable state on " + toString() + " altered to " + time); } @@ -403,7 +429,7 @@ abstract public class NodeInfo implements Comparable { public void setSystemStateVersionSent(ClusterState state) { if (state == null) throw new Error("Should not clear info for last version sent"); if (systemStateVersionSent.containsKey(state.getVersion())) { - throw new IllegalStateException("We have already sent cluster state version " + version + " to " + node); + throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node); } systemStateVersionSent.put(state.getVersion(), state); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java new file mode 100644 index 00000000000..da338626d5d --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java @@ -0,0 +1,10 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +public enum NodeStateReason { + // FIXME some of these reasons may be unnecessary as they are reported implicitly by reported/wanted state changes + NODE_TOO_UNSTABLE, + WITHIN_MAINTENANCE_GRACE_PERIOD, + FORCED_INTO_MAINTENANCE, + GROUP_IS_DOWN +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java new file mode 100644 index 00000000000..83ba274c422 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java @@ -0,0 +1,530 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.jrt.Spec; +import com.yahoo.log.LogLevel; +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.*; +import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; + +import java.util.*; +import java.util.logging.Logger; + +/** + * This class gets node state updates and timer events and uses these to decide + * whether a new cluster state should be generated. + * + * TODO refactor logic out into smaller, separate components. Still state duplication + * between ClusterStateGenerator and StateChangeHandler, especially for temporal + * state transition configuration parameters. + */ +public class StateChangeHandler { + + private static Logger log = Logger.getLogger(StateChangeHandler.class.getName()); + + private final Timer timer; + private final EventLogInterface eventLog; + private boolean stateMayHaveChanged = false; + private boolean isMaster = false; + + private Map maxTransitionTime = new TreeMap<>(); + private int maxInitProgressTime = 5000; + private int maxPrematureCrashes = 4; + private long stableStateTimePeriod = 60 * 60 * 1000; + private Map hostnames = new HashMap<>(); + private int maxSlobrokDisconnectGracePeriod = 1000; + private static final boolean disableUnstableNodes = true; + + /** + * @param metricUpdater may be null, in which case no metrics will be recorded. + */ + public StateChangeHandler(Timer timer, EventLogInterface eventLog, MetricUpdater metricUpdater) { + this.timer = timer; + this.eventLog = eventLog; + maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000); + maxTransitionTime.put(NodeType.STORAGE, 5000); + } + + public void handleAllDistributorsInSync(final ClusterState currentState, + final Set nodes, + final DatabaseHandler database, + final DatabaseHandler.Context dbContext) throws InterruptedException { + int startTimestampsReset = 0; + log.log(LogLevel.DEBUG, String.format("handleAllDistributorsInSync invoked for state version %d", currentState.getVersion())); + for (NodeType nodeType : NodeType.getTypes()) { + for (ConfiguredNode configuredNode : nodes) { + final Node node = new Node(nodeType, configuredNode.index()); + final NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node); + final NodeState nodeState = currentState.getNodeState(node); + if (nodeInfo != null && nodeState != null) { + if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Storing away new start timestamp for node %s (%d)", + node, nodeState.getStartTimestamp())); + } + nodeInfo.setStartTimestamp(nodeState.getStartTimestamp()); + } + if (nodeState.getStartTimestamp() > 0) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Resetting timestamp in cluster state for node %s", node)); + } + ++startTimestampsReset; + } + } else if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, node + ": " + + (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " + + (nodeState == null ? "null" : nodeState.getStartTimestamp())); + } + } + } + if (startTimestampsReset > 0) { + eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset + + " start timestamps as all available distributors have seen newest cluster state.", + timer.getCurrentTimeInMillis())); + stateMayHaveChanged = true; + database.saveStartTimestamps(dbContext); + } else { + log.log(LogLevel.DEBUG, "Found no start timestamps to reset in cluster state."); + } + } + + public boolean stateMayHaveChanged() { + return stateMayHaveChanged; + } + + public void setStateChangedFlag() { stateMayHaveChanged = true; } + public void unsetStateChangedFlag() { + stateMayHaveChanged = false; + } + + public void setMaster(boolean isMaster) { + this.isMaster = isMaster; + } + + public void setMaxTransitionTime(Map map) { maxTransitionTime = map; } + public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; } + public void setMaxSlobrokDisconnectGracePeriod(int millisecs) { + maxSlobrokDisconnectGracePeriod = millisecs; + } + public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; } + public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; } + + // TODO nodeListener is only used via updateNodeInfoFromReportedState -> handlePrematureCrash + // TODO this will recursively invoke proposeNewNodeState, which will presumably (i.e. hopefully) be a no-op... + public void handleNewReportedNodeState(final ClusterState currentClusterState, + final NodeInfo node, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener) + { + final NodeState currentState = currentClusterState.getNodeState(node.getNode()); + final LogLevel level = (currentState.equals(reportedState) && node.getVersion() == 0) ? LogLevel.SPAM : LogLevel.DEBUG; + if (log.isLoggable(level)) { + log.log(level, String.format("Got nodestate reply from %s: %s (Current state is %s)", + node, node.getReportedState().getTextualDifference(reportedState), currentState.toString(true))); + } + final long currentTime = timer.getCurrentTimeInMillis(); + + if (reportedState.getState().equals(State.DOWN)) { + node.setTimeOfFirstFailingConnectionAttempt(currentTime); + } + + // *** LOGGING ONLY + if ( ! reportedState.similarTo(node.getReportedState())) { + if (reportedState.getState().equals(State.DOWN)) { + eventLog.addNodeOnlyEvent(new NodeEvent(node, "Failed to get node state: " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.INFO); + } else { + eventLog.addNodeOnlyEvent(new NodeEvent(node, "Now reporting state " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.DEBUG); + } + } + + if (reportedState.equals(node.getReportedState()) && ! reportedState.getState().equals(State.INITIALIZING)) { + return; + } + + updateNodeInfoFromReportedState(node, currentState, reportedState, nodeListener); + + if (reportedState.getMinUsedBits() != currentState.getMinUsedBits()) { + final int oldCount = currentState.getMinUsedBits(); + final int newCount = reportedState.getMinUsedBits(); + log.log(LogLevel.DEBUG, + String.format("Altering node state to reflect that min distribution bit count has changed from %d to %d", + oldCount, newCount)); + eventLog.add(new NodeEvent(node, String.format("Altered min distribution bit count from %d to %d", oldCount, newCount), + NodeEvent.Type.CURRENT, currentTime), isMaster); + } else if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Not altering state of %s in cluster state because new state is too similar: %s", + node, currentState.getTextualDifference(reportedState))); + } + + stateMayHaveChanged = true; + } + + public void handleNewNode(NodeInfo node) { + setHostName(node); + String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress(); + eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); + } + + public void handleMissingNode(final ClusterState currentClusterState, + final NodeInfo node, + final NodeStateOrHostInfoChangeHandler nodeListener) + { + removeHostName(node); + + final long timeNow = timer.getCurrentTimeInMillis(); + + if (node.getLatestNodeStateRequestTime() != null) { + eventLog.add(new NodeEvent(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster); + } else { + eventLog.add(new NodeEvent(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster); + } + + if (node.getReportedState().getState().equals(State.STOPPING)) { + log.log(LogLevel.DEBUG, "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down"); + NodeState ns = node.getReportedState().clone(); + ns.setState(State.DOWN); + handleNewReportedNodeState(currentClusterState, node, ns.clone(), nodeListener); + } else { + log.log(LogLevel.DEBUG, "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok"); + } + + stateMayHaveChanged = true; + } + + /** + * Propose a new state for a node. This may happen due to an administrator action, orchestration, or + * a configuration change. + * + * If the newly proposed state differs from the state the node currently has in the system, + * a cluster state regeneration will be triggered. + */ + public void proposeNewNodeState(final ClusterState currentClusterState, final NodeInfo node, final NodeState proposedState) { + final NodeState currentState = currentClusterState.getNodeState(node.getNode()); + final NodeState currentReported = node.getReportedState(); + + if (currentState.getState().equals(proposedState.getState())) { + return; + } + stateMayHaveChanged = true; + + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Got new wanted nodestate for %s: %s", node, currentState.getTextualDifference(proposedState))); + } + // Should be checked earlier before state was set in cluster + assert(proposedState.getState().validWantedNodeState(node.getNode().getType())); + long timeNow = timer.getCurrentTimeInMillis(); + if (proposedState.above(currentReported)) { + eventLog.add(new NodeEvent(node, String.format("Wanted state %s, but we cannot force node into that " + + "state yet as it is currently in %s", proposedState, currentReported), + NodeEvent.Type.REPORTED, timeNow), isMaster); + return; + } + if ( ! proposedState.similarTo(currentState)) { + eventLog.add(new NodeEvent(node, String.format("Node state set to %s.", proposedState), + NodeEvent.Type.WANTED, timeNow), isMaster); + } + } + + public void handleNewRpcAddress(NodeInfo node) { + setHostName(node); + String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress(); + eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); + } + + public void handleReturnedRpcAddress(NodeInfo node) { + setHostName(node); + String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress(); + eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); + } + + private void setHostName(NodeInfo node) { + String rpcAddress = node.getRpcAddress(); + if (rpcAddress == null) { + // This may happen if we haven't seen the node in Slobrok yet. + return; + } + + Spec address = new Spec(rpcAddress); + if (address.malformed()) { + return; + } + + hostnames.put(node.getNodeIndex(), address.host()); + } + + void reconfigureFromOptions(FleetControllerOptions options) { + setMaxPrematureCrashes(options.maxPrematureCrashes); + setStableStateTimePeriod(options.stableStateTimePeriod); + setMaxInitProgressTime(options.maxInitProgressTime); + setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); + setMaxTransitionTime(options.maxTransitionTime); + } + + private void removeHostName(NodeInfo node) { + hostnames.remove(node.getNodeIndex()); + } + + Map getHostnames() { + return Collections.unmodifiableMap(hostnames); + } + + // TODO too many hidden behavior dependencies between this and the actually + // generated cluster state. Still a bit of a mine field... + // TODO remove all node state mutation from this function entirely in favor of ClusterStateGenerator! + // `--> this will require adding more event edges and premature crash handling to it. Which is fine. + public boolean watchTimers(final ContentCluster cluster, + final ClusterState currentClusterState, + final NodeStateOrHostInfoChangeHandler nodeListener) + { + boolean triggeredAnyTimers = false; + final long currentTime = timer.getCurrentTimeInMillis(); + + for(NodeInfo node : cluster.getNodeInfo()) { + triggeredAnyTimers |= handleTimeDependentOpsForNode(currentClusterState, nodeListener, currentTime, node); + } + + if (triggeredAnyTimers) { + stateMayHaveChanged = true; + } + return triggeredAnyTimers; + } + + private boolean handleTimeDependentOpsForNode(final ClusterState currentClusterState, + final NodeStateOrHostInfoChangeHandler nodeListener, + final long currentTime, + final NodeInfo node) + { + final NodeState currentStateInSystem = currentClusterState.getNodeState(node.getNode()); + final NodeState lastReportedState = node.getReportedState(); + boolean triggeredAnyTimers = false; + + triggeredAnyTimers = reportDownIfOutdatedSlobrokNode( + currentClusterState, nodeListener, currentTime, node, lastReportedState); + + if (nodeStillUnavailableAfterTransitionTimeExceeded( + currentTime, node, currentStateInSystem, lastReportedState)) + { + eventLog.add(new NodeEvent(node, String.format( + "%d milliseconds without contact. Marking node down.", + currentTime - node.getTransitionTime()), + NodeEvent.Type.CURRENT, currentTime), isMaster); + triggeredAnyTimers = true; + } + + if (nodeInitProgressHasTimedOut(currentTime, node, currentStateInSystem, lastReportedState)) { + eventLog.add(new NodeEvent(node, String.format( + "%d milliseconds without initialize progress. Marking node down. " + + "Premature crash count is now %d.", + currentTime - node.getInitProgressTime(), + node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, currentTime), isMaster); + handlePrematureCrash(node, nodeListener); + triggeredAnyTimers = true; + } + + if (mayResetCrashCounterOnStableUpNode(currentTime, node, lastReportedState)) { + node.setPrematureCrashCount(0); + log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been up for a long time."); + triggeredAnyTimers = true; + } else if (mayResetCrashCounterOnStableDownNode(currentTime, node, lastReportedState)) { + node.setPrematureCrashCount(0); + log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been down for a long time."); + triggeredAnyTimers = true; + } + + return triggeredAnyTimers; + } + + private boolean nodeInitProgressHasTimedOut(long currentTime, NodeInfo node, NodeState currentStateInSystem, NodeState lastReportedState) { + return !currentStateInSystem.getState().equals(State.DOWN) + && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) + && lastReportedState.getState().equals(State.INITIALIZING) + && maxInitProgressTime != 0 + && node.getInitProgressTime() + maxInitProgressTime <= currentTime + && node.getNode().getType().equals(NodeType.STORAGE); + } + + private boolean mayResetCrashCounterOnStableDownNode(long currentTime, NodeInfo node, NodeState lastReportedState) { + return node.getDownStableStateTime() + stableStateTimePeriod <= currentTime + && lastReportedState.getState().equals(State.DOWN) + && node.getPrematureCrashCount() <= maxPrematureCrashes + && node.getPrematureCrashCount() != 0; + } + + private boolean mayResetCrashCounterOnStableUpNode(long currentTime, NodeInfo node, NodeState lastReportedState) { + return node.getUpStableStateTime() + stableStateTimePeriod <= currentTime + && lastReportedState.getState().equals(State.UP) + && node.getPrematureCrashCount() <= maxPrematureCrashes + && node.getPrematureCrashCount() != 0; + } + + private boolean nodeStillUnavailableAfterTransitionTimeExceeded( + long currentTime, + NodeInfo node, + NodeState currentStateInSystem, + NodeState lastReportedState) + { + return currentStateInSystem.getState().equals(State.MAINTENANCE) + && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) + && (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated()) + && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime; + } + + private boolean reportDownIfOutdatedSlobrokNode(ClusterState currentClusterState, + NodeStateOrHostInfoChangeHandler nodeListener, + long currentTime, + NodeInfo node, + NodeState lastReportedState) + { + if (node.isRpcAddressOutdated() + && !lastReportedState.getState().equals(State.DOWN) + && node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime) + { + final String desc = String.format( + "Set node down as it has been out of slobrok for %d ms which " + + "is more than the max limit of %d ms.", + currentTime - node.getRpcAddressOutdatedTimestamp(), + maxSlobrokDisconnectGracePeriod); + node.abortCurrentNodeStateRequests(); + NodeState state = lastReportedState.clone(); + state.setState(State.DOWN); + if (!state.hasDescription()) { + state.setDescription(desc); + } + eventLog.add(new NodeEvent(node, desc, NodeEvent.Type.CURRENT, currentTime), isMaster); + handleNewReportedNodeState(currentClusterState, node, state.clone(), nodeListener); + node.setReportedState(state, currentTime); + return true; + } + return false; + } + + private boolean isControlledShutdown(NodeState state) { + return (state.getState() == State.STOPPING + && (state.getDescription().contains("Received signal 15 (SIGTERM - Termination signal)") + || state.getDescription().contains("controlled shutdown"))); + } + + /** + * Modify a node's cross-state information in the cluster based on a newly arrived reported state. + * + * @param node the node we are computing the state of + * @param currentState the current state of the node + * @param reportedState the new state reported by (or, in the case of down - inferred from) the node + * @param nodeListener this listener is notified for some of the system state changes that this will return + */ + private void updateNodeInfoFromReportedState(final NodeInfo node, + final NodeState currentState, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener) { + final long timeNow = timer.getCurrentTimeInMillis(); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Finding new cluster state entry for %s switching state %s", + node, currentState.getTextualDifference(reportedState))); + } + + if (handleReportedNodeCrashEdge(node, currentState, reportedState, nodeListener, timeNow)) { + return; + } + if (initializationProgressHasIncreased(currentState, reportedState)) { + node.setInitProgressTime(timeNow); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Reset initialize timer on " + node + " to " + node.getInitProgressTime()); + } + } + if (handleImplicitCrashEdgeFromReverseInitProgress(node, currentState, reportedState, nodeListener, timeNow)) { + return; + } + markNodeUnstableIfDownEdgeDuringInit(node, currentState, reportedState, nodeListener, timeNow); + } + + // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up. + private void markNodeUnstableIfDownEdgeDuringInit(final NodeInfo node, + final NodeState currentState, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener, + final long timeNow) { + if (currentState.getState().equals(State.INITIALIZING) + && reportedState.getState().oneOf("ds") + && !isControlledShutdown(reportedState)) + { + eventLog.add(new NodeEvent(node, String.format("Stop or crash during initialization. " + + "Premature crash count is now %d.", node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, timeNow), isMaster); + handlePrematureCrash(node, nodeListener); + } + } + + // TODO do we need this when we have startup timestamps? at least it's unit tested. + // TODO this seems fairly contrived... + // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up. + private boolean handleImplicitCrashEdgeFromReverseInitProgress(final NodeInfo node, + final NodeState currentState, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener, + final long timeNow) { + if (currentState.getState().equals(State.INITIALIZING) && + (reportedState.getState().equals(State.INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress())) + { + eventLog.add(new NodeEvent(node, String.format( + "Stop or crash during initialization detected from reverse initializing progress." + + " Progress was %g but is now %g. Premature crash count is now %d.", + currentState.getInitProgress(), reportedState.getInitProgress(), + node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, timeNow), isMaster); + node.setRecentlyObservedUnstableDuringInit(true); + handlePrematureCrash(node, nodeListener); + return true; + } + return false; + } + + private boolean handleReportedNodeCrashEdge(NodeInfo node, NodeState currentState, + NodeState reportedState, NodeStateOrHostInfoChangeHandler nodeListener, + long timeNow) { + if (nodeUpToDownEdge(node, currentState, reportedState)) { + node.setTransitionTime(timeNow); + if (node.getUpStableStateTime() + stableStateTimePeriod > timeNow && !isControlledShutdown(reportedState)) { + log.log(LogLevel.DEBUG, "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + timeNow); + eventLog.add(new NodeEvent(node, + String.format("Stopped or possibly crashed after %d ms, which is before " + + "stable state time period. Premature crash count is now %d.", + timeNow - node.getUpStableStateTime(), node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, + timeNow), isMaster); + if (handlePrematureCrash(node, nodeListener)) { + return true; + } + } + } + return false; + } + + private boolean initializationProgressHasIncreased(NodeState currentState, NodeState reportedState) { + return reportedState.getState().equals(State.INITIALIZING) && + (!currentState.getState().equals(State.INITIALIZING) || + reportedState.getInitProgress() > currentState.getInitProgress()); + } + + private boolean nodeUpToDownEdge(NodeInfo node, NodeState currentState, NodeState reportedState) { + return currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis") + && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING)); + } + + private boolean handlePrematureCrash(NodeInfo node, NodeStateOrHostInfoChangeHandler changeListener) { + node.setPrematureCrashCount(node.getPrematureCrashCount() + 1); + if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) { + NodeState wantedState = new NodeState(node.getNode().getType(), State.DOWN) + .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row"); + NodeState oldState = node.getWantedState(); + node.setWantedState(wantedState); + if ( ! oldState.equals(wantedState)) { + changeListener.handleNewWantedNodeState(node, wantedState); + } + return true; + } + return false; + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java new file mode 100644 index 00000000000..f5a67ca9434 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java @@ -0,0 +1,140 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Keeps track of the active cluster state and handles the transition edges between + * one state to the next. In particular, it ensures that states have strictly increasing + * version numbers. + * + * Wraps ClusterStateView to ensure its knowledge of available nodes stays up to date. + */ +public class StateVersionTracker { + + // We always increment the version _before_ publishing, so the effective first cluster + // state version when starting from 1 will be 2. This matches legacy behavior and a bunch + // of existing tests expect it. + private int currentVersion = 1; + private int lastZooKeeperVersion = 0; + + // The lowest published distribution bit count for the lifetime of this controller. + // TODO this mirrors legacy behavior, but should be moved into stable ZK state. + private int lowestObservedDistributionBits = 16; + + private ClusterState currentUnversionedState = ClusterState.emptyState(); + private AnnotatedClusterState latestCandidateState = AnnotatedClusterState.emptyState(); + private AnnotatedClusterState currentClusterState = latestCandidateState; + + private final MetricUpdater metricUpdater; + private ClusterStateView clusterStateView; + + private final LinkedList clusterStateHistory = new LinkedList<>(); + private int maxHistoryEntryCount = 50; + + StateVersionTracker(final MetricUpdater metricUpdater) { + this.metricUpdater = metricUpdater; + clusterStateView = ClusterStateView.create(currentUnversionedState, metricUpdater); + } + + void setVersionRetrievedFromZooKeeper(final int version) { + this.currentVersion = Math.max(1, version); + this.lastZooKeeperVersion = this.currentVersion; + } + + /** + * Sets limit on how many cluster states can be kept in the in-memory queue. Once + * the list exceeds this limit, the oldest state is repeatedly removed until the limit + * is no longer exceeded. + * + * Takes effect upon the next invocation of promoteCandidateToVersionedState(). + */ + void setMaxHistoryEntryCount(final int maxHistoryEntryCount) { + this.maxHistoryEntryCount = maxHistoryEntryCount; + } + + int getCurrentVersion() { + return this.currentVersion; + } + + boolean hasReceivedNewVersionFromZooKeeper() { + return currentVersion <= lastZooKeeperVersion; + } + + int getLowestObservedDistributionBits() { + return lowestObservedDistributionBits; + } + + AnnotatedClusterState getAnnotatedVersionedClusterState() { + return currentClusterState; + } + + public ClusterState getVersionedClusterState() { + return currentClusterState.getClusterState(); + } + + public void updateLatestCandidateState(final AnnotatedClusterState candidate) { + assert(latestCandidateState.getClusterState().getVersion() == 0); + latestCandidateState = candidate; + } + + /** + * Returns the last state provided to updateLatestCandidateState, which _may or may not_ be + * a published state. Primary use case for this function is a caller which is interested in + * changes that may not be reflected in the published state. The best example of this would + * be node state changes when a cluster is marked as Down. + */ + public AnnotatedClusterState getLatestCandidateState() { + return latestCandidateState; + } + + public List getClusterStateHistory() { + return Collections.unmodifiableList(clusterStateHistory); + } + + boolean candidateChangedEnoughFromCurrentToWarrantPublish() { + return !currentUnversionedState.similarToIgnoringInitProgress(latestCandidateState.getClusterState()); + } + + void promoteCandidateToVersionedState(final long currentTimeMs) { + final int newVersion = currentVersion + 1; + updateStatesForNewVersion(latestCandidateState, newVersion); + currentVersion = newVersion; + + recordCurrentStateInHistoryAtTime(currentTimeMs); + } + + private void updateStatesForNewVersion(final AnnotatedClusterState newState, final int newVersion) { + currentClusterState = new AnnotatedClusterState( + newState.getClusterState().clone(), // Because we mutate version below + newState.getClusterStateReason(), + newState.getNodeStateReasons()); + currentClusterState.getClusterState().setVersion(newVersion); + currentUnversionedState = newState.getClusterState().clone(); + lowestObservedDistributionBits = Math.min( + lowestObservedDistributionBits, + newState.getClusterState().getDistributionBitCount()); + // TODO should this take place in updateLatestCandidateState instead? I.e. does it require a consolidated state? + clusterStateView = ClusterStateView.create(currentClusterState.getClusterState(), metricUpdater); + } + + private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) { + clusterStateHistory.addFirst(new ClusterStateHistoryEntry( + currentClusterState.getClusterState(), currentTimeMs)); + while (clusterStateHistory.size() > maxHistoryEntryCount) { + clusterStateHistory.removeLast(); + } + } + + void handleUpdatedHostInfo(final Map hostnames, final NodeInfo node, final HostInfo hostInfo) { + // TODO the wiring here isn't unit tested. Need mockable integration points. + clusterStateView.handleUpdatedHostInfo(hostnames, node, hostInfo); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java deleted file mode 100644 index 7edff399633..00000000000 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java +++ /dev/null @@ -1,941 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.clustercontroller.core; - -import com.yahoo.jrt.Spec; -import com.yahoo.log.LogLevel; -import com.yahoo.vdslib.distribution.ConfiguredNode; -import com.yahoo.vdslib.distribution.Distribution; -import com.yahoo.vdslib.state.*; -import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; -import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; -import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; -import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; - -import java.util.*; -import java.util.logging.Logger; -import java.text.ParseException; -import java.util.stream.Collectors; - -/** - * This class get node state updates and uses them to decide the cluster state. - */ -// TODO: Remove all current state from this and make it rely on state from ClusterInfo instead -// TODO: Do this ASAP! SystemStateGenerator should ideally behave as a pure function! -public class SystemStateGenerator { - - private static Logger log = Logger.getLogger(SystemStateGenerator.class.getName()); - - private final Timer timer; - private final EventLogInterface eventLog; - private ClusterStateView currentClusterStateView; - private ClusterStateView nextClusterStateView; - private Distribution distribution; - private boolean nextStateViewChanged = false; - private boolean isMaster = false; - - private Map maxTransitionTime = new TreeMap<>(); - private int maxInitProgressTime = 5000; - private int maxPrematureCrashes = 4; - private long stableStateTimePeriod = 60 * 60 * 1000; - private static final int maxHistorySize = 50; - private Set nodes; - private Map hostnames = new HashMap<>(); - private int minDistributorNodesUp = 1; - private int minStorageNodesUp = 1; - private double minRatioOfDistributorNodesUp = 0.50; - private double minRatioOfStorageNodesUp = 0.50; - private double minNodeRatioPerGroup = 0.0; - private int maxSlobrokDisconnectGracePeriod = 1000; - private int idealDistributionBits = 16; - private static final boolean disableUnstableNodes = true; - - private final LinkedList systemStateHistory = new LinkedList<>(); - - /** - * @param metricUpdater may be null, in which case no metrics will be recorded. - */ - public SystemStateGenerator(Timer timer, EventLogInterface eventLog, MetricUpdater metricUpdater) { - try { - currentClusterStateView = ClusterStateView.create("", metricUpdater); - nextClusterStateView = ClusterStateView.create("", metricUpdater); - } catch (ParseException e) { - throw new RuntimeException("Parsing empty string should always work"); - } - this.timer = timer; - this.eventLog = eventLog; - maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000); - maxTransitionTime.put(NodeType.STORAGE, 5000); - } - - public void handleAllDistributorsInSync(DatabaseHandler database, - DatabaseHandler.Context dbContext) throws InterruptedException { - int startTimestampsReset = 0; - for (NodeType nodeType : NodeType.getTypes()) { - for (ConfiguredNode configuredNode : nodes) { - Node node = new Node(nodeType, configuredNode.index()); - NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node); - NodeState nodeState = nextClusterStateView.getClusterState().getNodeState(node); - if (nodeInfo != null && nodeState != null) { - if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) { - log.log(LogLevel.DEBUG, "Storing away new start timestamp for node " + node); - nodeInfo.setStartTimestamp(nodeState.getStartTimestamp()); - } - if (nodeState.getStartTimestamp() > 0) { - log.log(LogLevel.DEBUG, "Resetting timestamp in cluster state for node " + node); - nodeState.setStartTimestamp(0); - nextClusterStateView.getClusterState().setNodeState(node, nodeState); - ++startTimestampsReset; - } - } else { - log.log(LogLevel.DEBUG, node + ": " + - (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " + - (nodeState == null ? "null" : nodeState.getStartTimestamp())); - } - } - } - if (startTimestampsReset > 0) { - eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset + - " start timestamps as all available distributors have seen newest cluster state.", timer.getCurrentTimeInMillis())); - nextStateViewChanged = true; - database.saveStartTimestamps(dbContext); - } else { - log.log(LogLevel.DEBUG, "Found no start timestamps to reset in cluster state."); - } - } - - public void setMaxTransitionTime(Map map) { maxTransitionTime = map; } - public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; } - public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; } - public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; } - - public ClusterStateView currentClusterStateView() { return currentClusterStateView; } - - /** Returns an immutable list of the historical states this has generated */ - public List systemStateHistory() { - return Collections.unmodifiableList(systemStateHistory); - } - - public void setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { - minDistributorNodesUp = minDistNodes; - minStorageNodesUp = minStorNodes; - minRatioOfDistributorNodesUp = minDistRatio; - minRatioOfStorageNodesUp = minStorRatio; - nextStateViewChanged = true; - } - - public void setMinNodeRatioPerGroup(double upRatio) { - this.minNodeRatioPerGroup = upRatio; - nextStateViewChanged = true; - } - - /** Sets the nodes of this and attempts to keep the node state in sync */ - public void setNodes(ClusterInfo newClusterInfo) { - this.nodes = new HashSet<>(newClusterInfo.getConfiguredNodes().values()); - - for (ConfiguredNode node : this.nodes) { - NodeInfo newNodeInfo = newClusterInfo.getStorageNodeInfo(node.index()); - NodeState currentState = currentClusterStateView.getClusterState().getNodeState(new Node(NodeType.STORAGE, node.index())); - if (currentState.getState() == State.RETIRED || currentState.getState() == State.UP) { // then correct to configured state - proposeNewNodeState(newNodeInfo, new NodeState(NodeType.STORAGE, node.retired() ? State.RETIRED : State.UP)); - } - } - - // Ensure that any nodes that have been removed from the config are also - // promptly removed from the next (and subsequent) generated cluster states. - pruneAllNodesNotContainedInConfig(); - - nextStateViewChanged = true; - } - - private void pruneAllNodesNotContainedInConfig() { - Set configuredIndices = this.nodes.stream().map(ConfiguredNode::index).collect(Collectors.toSet()); - final ClusterState candidateNextState = nextClusterStateView.getClusterState(); - pruneNodesNotContainedInConfig(candidateNextState, configuredIndices, NodeType.DISTRIBUTOR); - pruneNodesNotContainedInConfig(candidateNextState, configuredIndices, NodeType.STORAGE); - } - - public void setDistribution(Distribution distribution) { - this.distribution = distribution; - nextStateViewChanged = true; - } - - public void setMaster(boolean isMaster) { - this.isMaster = isMaster; - } - public void setMaxSlobrokDisconnectGracePeriod(int millisecs) { maxSlobrokDisconnectGracePeriod = millisecs; } - - public void setDistributionBits(int bits) { - if (bits == idealDistributionBits) return; - idealDistributionBits = bits; - int currentDistributionBits = calculateMinDistributionBitCount(); - if (currentDistributionBits != nextClusterStateView.getClusterState().getDistributionBitCount()) { - nextClusterStateView.getClusterState().setDistributionBits(currentDistributionBits); - nextStateViewChanged = true; - } - } - - public int getDistributionBits() { return idealDistributionBits; } - - public int calculateMinDistributionBitCount() { - int currentDistributionBits = idealDistributionBits; - int minNode = -1; - for (ConfiguredNode node : nodes) { - NodeState ns = nextClusterStateView.getClusterState().getNodeState(new Node(NodeType.STORAGE, node.index())); - if (ns.getState().oneOf("iur")) { - if (ns.getMinUsedBits() < currentDistributionBits) { - currentDistributionBits = ns.getMinUsedBits(); - minNode = node.index(); - } - } - } - if (minNode == -1) { - log.log(LogLevel.DEBUG, "Distribution bit count should still be default as all available nodes have at least split to " + idealDistributionBits + " bits"); - } else { - log.log(LogLevel.DEBUG, "Distribution bit count is limited to " + currentDistributionBits + " due to storage node " + minNode); - } - return currentDistributionBits; - } - - public ClusterState getClusterState() { return currentClusterStateView.getClusterState(); } - - /** - * Return the current cluster state, but if the cluster is down, modify the node states with the - * actual node states from the temporary next state. - */ - public ClusterState getConsolidatedClusterState() { - ClusterState currentState = currentClusterStateView.getClusterState(); - if (currentState.getClusterState().equals(State.UP)) { - return currentState; - } - - ClusterState nextState = nextClusterStateView.getClusterState(); - if (!currentState.getClusterState().equals(nextState.getClusterState())) { - log.warning("Expected current cluster state object to have same global state as the under creation instance."); - } - ClusterState state = nextState.clone(); - state.setVersion(currentState.getVersion()); - state.setOfficial(false); - return state; - } - - private Optional getDownDueToTooFewNodesEvent(ClusterState nextClusterState) { - int upStorageCount = 0, upDistributorCount = 0; - int dcount = nodes.size(); - int scount = nodes.size(); - for (NodeType type : NodeType.getTypes()) { - for (ConfiguredNode node : nodes) { - NodeState ns = nextClusterState.getNodeState(new Node(type, node.index())); - if (ns.getState() == State.UP || ns.getState() == State.RETIRED || ns.getState() == State.INITIALIZING) { - if (type.equals(NodeType.STORAGE)) - ++upStorageCount; - else - ++upDistributorCount; - } - } - } - - long timeNow = timer.getCurrentTimeInMillis(); - if (upStorageCount < minStorageNodesUp) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + minStorageNodesUp + " storage nodes available (" + upStorageCount + "). Setting cluster state down.", - timeNow)); - } - if (upDistributorCount < minDistributorNodesUp) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + minDistributorNodesUp + " distributor nodes available (" + upDistributorCount + "). Setting cluster state down.", - timeNow)); - } - if (minRatioOfStorageNodesUp * scount > upStorageCount) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + (100 * minRatioOfStorageNodesUp) + " % of storage nodes are available (" - + upStorageCount + "/" + scount + "). Setting cluster state down.", - timeNow)); - } - if (minRatioOfDistributorNodesUp * dcount > upDistributorCount) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + (100 * minRatioOfDistributorNodesUp) + " % of distributor nodes are available (" - + upDistributorCount + "/" + dcount + "). Setting cluster state down.", - timeNow)); - } - return Optional.empty(); - } - - private static Node storageNode(int index) { - return new Node(NodeType.STORAGE, index); - } - - private void performImplicitStorageNodeStateTransitions(ClusterState candidateState, ContentCluster cluster) { - if (distribution == null) { - return; // FIXME due to tests that don't bother setting distr config! Never happens in prod. - } - // First clear the states of any nodes that according to reported/wanted state alone - // should have their states cleared. We might still take these down again based on the - // decisions of the group availability calculator, but this way we ensure that groups - // that no longer should be down will have their nodes implicitly made available again. - // TODO this will be void once SystemStateGenerator has been rewritten to be stateless. - final Set clearedNodes = clearDownStateForStorageNodesThatCanBeUp(candidateState, cluster); - - final GroupAvailabilityCalculator calc = new GroupAvailabilityCalculator.Builder() - .withMinNodeRatioPerGroup(minNodeRatioPerGroup) - .withDistribution(distribution) - .build(); - final Set nodesToTakeDown = calc.nodesThatShouldBeDown(candidateState); - markNodesAsDownDueToGroupUnavailability(cluster, candidateState, nodesToTakeDown, clearedNodes); - - clearedNodes.removeAll(nodesToTakeDown); - logEventsForNodesThatWereTakenUp(clearedNodes, cluster); - } - - private void logEventsForNodesThatWereTakenUp(Set newlyUpNodes, ContentCluster cluster) { - newlyUpNodes.forEach(i -> { - final NodeInfo info = cluster.getNodeInfo(storageNode(i)); // Should always be non-null here. - // TODO the fact that this only happens for group up events is implementation specific - // should generalize this if we get other such events. - eventLog.addNodeOnlyEvent(new NodeEvent(info, - "Group availability restored; taking node back up", - NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), LogLevel.INFO); - }); - } - - private void markNodesAsDownDueToGroupUnavailability(ContentCluster cluster, - ClusterState candidateState, - Set nodesToTakeDown, - Set clearedNodes) - { - for (Integer idx : nodesToTakeDown) { - final Node node = storageNode(idx); - NodeState newState = new NodeState(NodeType.STORAGE, State.DOWN); - newState.setDescription("group node availability below configured threshold"); - candidateState.setNodeState(node, newState); - - logNodeGroupDownEdgeEventOnce(clearedNodes, node, cluster); - } - } - - private void logNodeGroupDownEdgeEventOnce(Set clearedNodes, Node node, ContentCluster cluster) { - final NodeInfo nodeInfo = cluster.getNodeInfo(node); - // If clearedNodes contains the index it means we're just re-downing a node - // that was previously down. If this is the case, we'd cause a duplicate - // event if we logged it now as well. - if (nodeInfo != null && !clearedNodes.contains(node.getIndex())) { - eventLog.addNodeOnlyEvent(new NodeEvent(nodeInfo, - "Setting node down as the total availability of its group is " + - "below the configured threshold", - NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), LogLevel.INFO); - } - } - - private NodeState baselineNodeState(NodeInfo info) { - NodeState reported = info.getReportedState(); - NodeState wanted = info.getWantedState(); - - final NodeState baseline = reported.clone(); - if (wanted.getState() != State.UP) { - baseline.setDescription(wanted.getDescription()); - if (reported.above(wanted)) { - baseline.setState(wanted.getState()); - } - } - // Don't reintroduce start timestamp to the node's state if it has already been - // observed by all distributors. This matches how handleNewReportedNodeState() sets timestamps. - // TODO make timestamp semantics clearer. Non-obvious what the two different timestamp stores imply. - // For posterity: reported.getStartTimestamp() is the start timestamp the node itself has stated. - // info.getStartTimestamp() is the timestamp written as having been observed by all distributors - // (which is done in handleAllDistributorsInSync()). - if (reported.getStartTimestamp() <= info.getStartTimestamp()) { - baseline.setStartTimestamp(0); - } - - return baseline; - } - - // Returns set of nodes whose state was cleared - private Set clearDownStateForStorageNodesThatCanBeUp( - ClusterState candidateState, ContentCluster cluster) - { - final int nodeCount = candidateState.getNodeCount(NodeType.STORAGE); - final Set clearedNodes = new HashSet<>(); - for (int i = 0; i < nodeCount; ++i) { - final Node node = storageNode(i); - final NodeInfo info = cluster.getNodeInfo(node); - final NodeState currentState = candidateState.getNodeState(node); - if (mayClearCurrentNodeState(currentState, info)) { - candidateState.setNodeState(node, baselineNodeState(info)); - clearedNodes.add(i); - } - } - return clearedNodes; - } - - private boolean mayClearCurrentNodeState(NodeState currentState, NodeInfo info) { - if (currentState.getState() != State.DOWN) { - return false; - } - if (info == null) { - // Nothing known about node in cluster info; we definitely don't want it - // to be taken up at this point. - return false; - } - // There exists an edge in watchTimers where a node in Maintenance is implicitly - // transitioned into Down without being Down in either reported or wanted states - // iff isRpcAddressOutdated() is true. To avoid getting into an edge where we - // inadvertently clear this state because its reported/wanted states seem fine, - // we must also check if that particular edge could have happened. I.e. whether - // the node's RPC address is marked as outdated. - // It also makes sense in general to not allow taking a node back up automatically - // if its RPC connectivity appears to be bad. - if (info.isRpcAddressOutdated()) { - return false; - } - // Rationale: we can only enter this statement if the _current_ (generated) state - // of the node is Down. Aside from the group take-down logic, there should not exist - // any other edges in the cluster controller state transition logic where a node - // may be set Down while both its reported state and wanted state imply that a better - // state should already have been chosen. Consequently we allow the node to have its - // Down-state cleared. - return (info.getReportedState().getState() != State.DOWN - && !info.getWantedState().getState().oneOf("d")); - } - - private ClusterStateView createNextVersionOfClusterStateView(ContentCluster cluster) { - // If you change this method, see *) in notifyIfNewSystemState - ClusterStateView candidateClusterStateView = nextClusterStateView.cloneForNewState(); - ClusterState candidateClusterState = candidateClusterStateView.getClusterState(); - - int currentDistributionBits = calculateMinDistributionBitCount(); - if (currentDistributionBits != nextClusterStateView.getClusterState().getDistributionBitCount()) { - candidateClusterState.setDistributionBits(currentDistributionBits); - } - performImplicitStorageNodeStateTransitions(candidateClusterState, cluster); - - return candidateClusterStateView; - } - - private void pruneNodesNotContainedInConfig(ClusterState candidateClusterState, - Set configuredIndices, - NodeType nodeType) - { - final int nodeCount = candidateClusterState.getNodeCount(nodeType); - for (int i = 0; i < nodeCount; ++i) { - final Node node = new Node(nodeType, i); - final NodeState currentState = candidateClusterState.getNodeState(node); - if (!configuredIndices.contains(i) && !currentState.getState().equals(State.DOWN)) { - log.log(LogLevel.INFO, "Removing node " + node + " from state as it is no longer present in config"); - candidateClusterState.setNodeState(node, new NodeState(nodeType, State.DOWN)); - } - } - } - - private void recordNewClusterStateHasBeenChosen( - ClusterState currentClusterState, ClusterState newClusterState, Event clusterEvent) { - long timeNow = timer.getCurrentTimeInMillis(); - - if (!currentClusterState.getClusterState().equals(State.UP) && - newClusterState.getClusterState().equals(State.UP)) { - eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Enough nodes available for system to become up.", timeNow), isMaster); - } else if (currentClusterState.getClusterState().equals(State.UP) && - ! newClusterState.getClusterState().equals(State.UP)) { - assert(clusterEvent != null); - eventLog.add(clusterEvent, isMaster); - } - - if (newClusterState.getDistributionBitCount() != currentClusterState.getDistributionBitCount()) { - eventLog.add(new ClusterEvent( - ClusterEvent.Type.SYSTEMSTATE, - "Altering distribution bits in system from " - + currentClusterState.getDistributionBitCount() + " to " + - currentClusterState.getDistributionBitCount(), - timeNow), isMaster); - } - - eventLog.add(new ClusterEvent( - ClusterEvent.Type.SYSTEMSTATE, - "New cluster state version " + newClusterState.getVersion() + ". Change from last: " + - currentClusterState.getTextualDifference(newClusterState), - timeNow), isMaster); - - log.log(LogLevel.DEBUG, "Created new cluster state version: " + newClusterState.toString(true)); - systemStateHistory.addFirst(new SystemStateHistoryEntry(newClusterState, timeNow)); - if (systemStateHistory.size() > maxHistorySize) { - systemStateHistory.removeLast(); - } - } - - private void mergeIntoNextClusterState(ClusterState sourceState) { - final ClusterState nextState = nextClusterStateView.getClusterState(); - final int nodeCount = sourceState.getNodeCount(NodeType.STORAGE); - for (int i = 0; i < nodeCount; ++i) { - final Node node = storageNode(i); - final NodeState stateInSource = sourceState.getNodeState(node); - final NodeState stateInTarget = nextState.getNodeState(node); - if (stateInSource.getState() != stateInTarget.getState()) { - nextState.setNodeState(node, stateInSource); - } - } - } - - public boolean notifyIfNewSystemState(ContentCluster cluster, SystemStateListener stateListener) { - if ( ! nextStateViewChanged) return false; - - ClusterStateView newClusterStateView = createNextVersionOfClusterStateView(cluster); - - ClusterState newClusterState = newClusterStateView.getClusterState(); - // Creating the next version of the state may implicitly take down nodes, so our checks - // for taking the entire cluster down must happen _after_ this - Optional clusterDown = getDownDueToTooFewNodesEvent(newClusterState); - newClusterState.setClusterState(clusterDown.isPresent() ? State.DOWN : State.UP); - - if (newClusterState.similarTo(currentClusterStateView.getClusterState())) { - log.log(LogLevel.DEBUG, - "State hasn't changed enough to warrant new cluster state. Not creating new state: " + - currentClusterStateView.getClusterState().getTextualDifference(newClusterState)); - return false; - } - - // Update the version of newClusterState now. This cannot be done prior to similarTo(), - // since it makes the cluster states different. From now on, the new cluster state is immutable. - newClusterState.setVersion(currentClusterStateView.getClusterState().getVersion() + 1); - - recordNewClusterStateHasBeenChosen(currentClusterStateView.getClusterState(), - newClusterStateView.getClusterState(), clusterDown.orElse(null)); - - // *) Ensure next state is still up to date. - // This should make nextClusterStateView a deep-copy of currentClusterStateView. - // If more than the distribution bits and state are deep-copied in - // createNextVersionOfClusterStateView(), we need to add corresponding statements here. - // This seems like a hack... - nextClusterStateView.getClusterState().setDistributionBits(newClusterState.getDistributionBitCount()); - nextClusterStateView.getClusterState().setClusterState(newClusterState.getClusterState()); - mergeIntoNextClusterState(newClusterState); - - currentClusterStateView = newClusterStateView; - nextStateViewChanged = false; - - stateListener.handleNewSystemState(currentClusterStateView.getClusterState()); - - return true; - } - - public void setLatestSystemStateVersion(int version) { - currentClusterStateView.getClusterState().setVersion(Math.max(1, version)); - nextStateViewChanged = true; - } - - private void setNodeState(NodeInfo node, NodeState newState) { - NodeState oldState = nextClusterStateView.getClusterState().getNodeState(node.getNode()); - - // Correct UP to RETIRED if the node wants to be retired - if (newState.above(node.getWantedState())) - newState.setState(node.getWantedState().getState()); - - // Keep old description if a new one is not set and we're not going up or in initializing mode - if ( ! newState.getState().oneOf("ui") && oldState.hasDescription()) { - newState.setDescription(oldState.getDescription()); - } - - // Keep disk information if not set in new state - if (newState.getDiskCount() == 0 && oldState.getDiskCount() != 0) { - newState.setDiskCount(oldState.getDiskCount()); - for (int i=0; i node.getStartTimestamp()) { - alteredState.setStartTimestamp(reportedState.getStartTimestamp()); - } else { - alteredState.setStartTimestamp(0); - } - if (!alteredState.similarTo(currentState)) { - setNodeState(node, alteredState); - } else if (!alteredState.equals(currentState)) { - if (currentState.getState().equals(State.INITIALIZING) && alteredState.getState().equals(State.INITIALIZING) && - Math.abs(currentState.getInitProgress() - alteredState.getInitProgress()) > 0.000000001) - { - log.log(LogLevel.DEBUG, "Only silently updating init progress for " + node + " in cluster state because new " - + "state is too similar to tag new version: " + currentState.getTextualDifference(alteredState)); - currentState.setInitProgress(alteredState.getInitProgress()); - nextState.setNodeState(node.getNode(), currentState); - - NodeState currentNodeState = clusterState.getNodeState(node.getNode()); - if (currentNodeState.getState().equals(State.INITIALIZING)) { - currentNodeState.setInitProgress(alteredState.getInitProgress()); - clusterState.setNodeState(node.getNode(), currentNodeState); - } - } else if (alteredState.getMinUsedBits() != currentState.getMinUsedBits()) { - log.log(LogLevel.DEBUG, "Altering node state to reflect that min distribution bit count have changed from " - + currentState.getMinUsedBits() + " to " + alteredState.getMinUsedBits()); - int oldCount = currentState.getMinUsedBits(); - currentState.setMinUsedBits(alteredState.getMinUsedBits()); - nextState.setNodeState(node.getNode(), currentState); - int minDistBits = calculateMinDistributionBitCount(); - if (minDistBits < nextState.getDistributionBitCount() - || (nextState.getDistributionBitCount() < this.idealDistributionBits && minDistBits >= this.idealDistributionBits)) - { - // If this will actually affect global cluster state. - eventLog.add(new NodeEvent(node, "Altered min distribution bit count from " + oldCount - + " to " + currentState.getMinUsedBits() + ". Updated cluster state.", NodeEvent.Type.CURRENT, currentTime), isMaster); - nextStateViewChanged = true; - } else { - log.log(LogLevel.DEBUG, "Altered min distribution bit count from " + oldCount - + " to " + currentState.getMinUsedBits() + ". No effect for cluster state with ideal " + this.idealDistributionBits - + ", new " + minDistBits + ", old " + nextState.getDistributionBitCount() + " though."); - clusterState.setNodeState(node.getNode(), currentState); - } - } else { - log.log(LogLevel.DEBUG, "Not altering state of " + node + " in cluster state because new state is too similar: " - + currentState.getTextualDifference(alteredState)); - } - } else if (alteredState.getDescription().contains("Listing buckets")) { - currentState.setDescription(alteredState.getDescription()); - nextState.setNodeState(node.getNode(), currentState); - NodeState currentNodeState = clusterState.getNodeState(node.getNode()); - currentNodeState.setDescription(alteredState.getDescription()); - clusterState.setNodeState(node.getNode(), currentNodeState); - } - } - } - - public void handleNewNode(NodeInfo node) { - setHostName(node); - String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress(); - eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); - } - - public void handleMissingNode(NodeInfo node, NodeStateOrHostInfoChangeHandler nodeListener) { - removeHostName(node); - - long timeNow = timer.getCurrentTimeInMillis(); - - if (node.getLatestNodeStateRequestTime() != null) { - eventLog.add(new NodeEvent(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster); - } else { - eventLog.add(new NodeEvent(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster); - } - if (node.getReportedState().getState().equals(State.STOPPING)) { - log.log(LogLevel.DEBUG, "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down"); - NodeState ns = node.getReportedState().clone(); - ns.setState(State.DOWN); - handleNewReportedNodeState(node, ns.clone(), nodeListener); - node.setReportedState(ns, timer.getCurrentTimeInMillis()); // Must reset it to null to get connection attempts counted - } else { - log.log(LogLevel.DEBUG, "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok"); - } - } - - /** - * Propose a new state for a node. This may happen due to an administrator action, orchestration, or - * a configuration change. - */ - public void proposeNewNodeState(NodeInfo node, NodeState proposedState) { - NodeState currentState = nextClusterStateView.getClusterState().getNodeState(node.getNode()); - NodeState currentReported = node.getReportedState(); // TODO: Is there a reason to have both of this and the above? - - NodeState newCurrentState = currentReported.clone(); - - newCurrentState.setState(proposedState.getState()).setDescription(proposedState.getDescription()); - - if (currentState.getState().equals(newCurrentState.getState())) return; - - log.log(LogLevel.DEBUG, "Got new wanted nodestate for " + node + ": " + currentState.getTextualDifference(proposedState)); - // Should be checked earlier before state was set in cluster - assert(newCurrentState.getState().validWantedNodeState(node.getNode().getType())); - long timeNow = timer.getCurrentTimeInMillis(); - if (newCurrentState.above(currentReported)) { - eventLog.add(new NodeEvent(node, "Wanted state " + newCurrentState + ", but we cannot force node into that state yet as it is currently in " + currentReported, NodeEvent.Type.REPORTED, timeNow), isMaster); - return; - } - if ( ! newCurrentState.similarTo(currentState)) { - eventLog.add(new NodeEvent(node, "Node state set to " + newCurrentState + ".", NodeEvent.Type.WANTED, timeNow), isMaster); - } - setNodeState(node, newCurrentState); - } - - public void handleNewRpcAddress(NodeInfo node) { - setHostName(node); - String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress(); - eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); - } - - public void handleReturnedRpcAddress(NodeInfo node) { - setHostName(node); - String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress(); - eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); - } - - private void setHostName(NodeInfo node) { - String rpcAddress = node.getRpcAddress(); - if (rpcAddress == null) { - // This may happen if we haven't seen the node in Slobrok yet. - return; - } - - Spec address = new Spec(rpcAddress); - if (address.malformed()) { - return; - } - - hostnames.put(node.getNodeIndex(), address.host()); - } - - private void removeHostName(NodeInfo node) { - hostnames.remove(node.getNodeIndex()); - } - - public boolean watchTimers(ContentCluster cluster, NodeStateOrHostInfoChangeHandler nodeListener) { - boolean triggeredAnyTimers = false; - long currentTime = timer.getCurrentTimeInMillis(); - for(NodeInfo node : cluster.getNodeInfo()) { - NodeState currentStateInSystem = nextClusterStateView.getClusterState().getNodeState(node.getNode()); - NodeState lastReportedState = node.getReportedState(); - - // If we haven't had slobrok contact in a given amount of time and node is still not considered down, - // mark it down. - if (node.isRpcAddressOutdated() - && !lastReportedState.getState().equals(State.DOWN) - && node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime) - { - StringBuilder sb = new StringBuilder().append("Set node down as it has been out of slobrok for ") - .append(currentTime - node.getRpcAddressOutdatedTimestamp()).append(" ms which is more than the max limit of ") - .append(maxSlobrokDisconnectGracePeriod).append(" ms."); - node.abortCurrentNodeStateRequests(); - NodeState state = lastReportedState.clone(); - state.setState(State.DOWN); - if (!state.hasDescription()) state.setDescription(sb.toString()); - eventLog.add(new NodeEvent(node, sb.toString(), NodeEvent.Type.CURRENT, currentTime), isMaster); - handleNewReportedNodeState(node, state.clone(), nodeListener); - node.setReportedState(state, currentTime); - triggeredAnyTimers = true; - } - - // If node is still unavailable after transition time, mark it down - if (currentStateInSystem.getState().equals(State.MAINTENANCE) - && ( ! nextStateViewChanged || ! this.nextClusterStateView.getClusterState().getNodeState(node.getNode()).getState().equals(State.DOWN)) - && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) - && (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated()) - && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime) - { - eventLog.add(new NodeEvent(node, (currentTime - node.getTransitionTime()) - + " milliseconds without contact. Marking node down.", NodeEvent.Type.CURRENT, currentTime), isMaster); - NodeState newState = new NodeState(node.getNode().getType(), State.DOWN).setDescription( - (currentTime - node.getTransitionTime()) + " ms without contact. Too long to keep in maintenance. Marking node down"); - // Keep old description if there is one as it is likely closer to the cause of the problem - if (currentStateInSystem.hasDescription()) newState.setDescription(currentStateInSystem.getDescription()); - setNodeState(node, newState); - triggeredAnyTimers = true; - } - - // If node hasn't increased its initializing progress within initprogresstime, mark it down. - if (!currentStateInSystem.getState().equals(State.DOWN) - && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) - && lastReportedState.getState().equals(State.INITIALIZING) - && maxInitProgressTime != 0 - && node.getInitProgressTime() + maxInitProgressTime <= currentTime - && node.getNode().getType().equals(NodeType.STORAGE)) - { - eventLog.add(new NodeEvent(node, (currentTime - node.getInitProgressTime()) + " milliseconds " - + "without initialize progress. Marking node down." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", NodeEvent.Type.CURRENT, currentTime), isMaster); - NodeState newState = new NodeState(node.getNode().getType(), State.DOWN).setDescription( - (currentTime - node.getInitProgressTime()) + " ms without initialize progress. Assuming node has deadlocked."); - setNodeState(node, newState); - handlePrematureCrash(node, nodeListener); - triggeredAnyTimers = true; - } - if (node.getUpStableStateTime() + stableStateTimePeriod <= currentTime - && lastReportedState.getState().equals(State.UP) - && node.getPrematureCrashCount() <= maxPrematureCrashes - && node.getPrematureCrashCount() != 0) - { - node.setPrematureCrashCount(0); - log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been up for a long time."); - triggeredAnyTimers = true; - } else if (node.getDownStableStateTime() + stableStateTimePeriod <= currentTime - && lastReportedState.getState().equals(State.DOWN) - && node.getPrematureCrashCount() <= maxPrematureCrashes - && node.getPrematureCrashCount() != 0) - { - node.setPrematureCrashCount(0); - log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been down for a long time."); - triggeredAnyTimers = true; - } - } - return triggeredAnyTimers; - } - - private boolean isControlledShutdown(NodeState state) { - return (state.getState() == State.STOPPING && (state.getDescription().contains("Received signal 15 (SIGTERM - Termination signal)") - || state.getDescription().contains("controlled shutdown"))); - } - - /** - * Decide the state assigned to a new node given the state it reported - * - * @param node the node we are computing the state of - * @param currentState the current state of the node - * @param reportedState the new state reported by (or, in the case of down - inferred from) the node - * @param nodeListener this listener is notified for some of the system state changes that this will return - * @return the node node state, or null to keep the nodes current state - */ - private NodeState decideNodeStateGivenReportedState(NodeInfo node, NodeState currentState, NodeState reportedState, - NodeStateOrHostInfoChangeHandler nodeListener) { - long timeNow = timer.getCurrentTimeInMillis(); - - log.log(LogLevel.DEBUG, "Finding new cluster state entry for " + node + " switching state " + currentState.getTextualDifference(reportedState)); - - // Set nodes in maintenance if 1) down, or 2) initializing but set retired, to avoid migrating data - // to the retired node while it is initializing - if (currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis") - && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING))) - { - long currentTime = timer.getCurrentTimeInMillis(); - node.setTransitionTime(currentTime); - if (node.getUpStableStateTime() + stableStateTimePeriod > currentTime && !isControlledShutdown(reportedState)) { - log.log(LogLevel.DEBUG, "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + currentTime); - eventLog.add(new NodeEvent(node, - "Stopped or possibly crashed after " + (currentTime - node.getUpStableStateTime()) - + " ms, which is before stable state time period." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", - NodeEvent.Type.CURRENT, - timeNow), isMaster); - if (handlePrematureCrash(node, nodeListener)) return null; - } - if (maxTransitionTime.get(node.getNode().getType()) != 0) { - return new NodeState(node.getNode().getType(), State.MAINTENANCE).setDescription(reportedState.getDescription()); - } - } - - // If we got increasing initialization progress, reset initialize timer - if (reportedState.getState().equals(State.INITIALIZING) && - (!currentState.getState().equals(State.INITIALIZING) || - reportedState.getInitProgress() > currentState.getInitProgress())) - { - node.setInitProgressTime(timer.getCurrentTimeInMillis()); - log.log(LogLevel.DEBUG, "Reset initialize timer on " + node + " to " + node.getInitProgressTime()); - } - - // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up. - if (currentState.getState().equals(State.INITIALIZING) && - (reportedState.getState().equals(State.INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress())) - { - eventLog.add(new NodeEvent(node, "Stop or crash during initialization detected from reverse initializing progress." - + " Progress was " + currentState.getInitProgress() + " but is now " + reportedState.getInitProgress() + "." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", - NodeEvent.Type.CURRENT, timeNow), isMaster); - return (handlePrematureCrash(node, nodeListener) ? null : new NodeState(node.getNode().getType(), State.DOWN).setDescription( - "Got reverse intialize progress. Assuming node have prematurely crashed")); - } - - // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up. - if (currentState.getState().equals(State.INITIALIZING) && reportedState.getState().oneOf("ds") && !isControlledShutdown(reportedState)) - { - eventLog.add(new NodeEvent(node, "Stop or crash during initialization." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", - NodeEvent.Type.CURRENT, timeNow), isMaster); - return (handlePrematureCrash(node, nodeListener) ? null : new NodeState(node.getNode().getType(), State.DOWN).setDescription(reportedState.getDescription())); - } - - // Ignore further unavailable states when node is set in maintenance - if (currentState.getState().equals(State.MAINTENANCE) && reportedState.getState().oneOf("dis")) - { - if (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING) - || reportedState.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001) { - log.log(LogLevel.DEBUG, "Ignoring down and initializing reports while in maintenance mode on " + node + "."); - return null; - } - } - - // Hide initializing state if node has been unstable. (Not for distributors as these own buckets while initializing) - if ((currentState.getState().equals(State.DOWN) || currentState.getState().equals(State.UP)) && - reportedState.getState().equals(State.INITIALIZING) && node.getPrematureCrashCount() > 0 && - !node.isDistributor()) - { - log.log(LogLevel.DEBUG, "Not setting " + node + " initializing again as it crashed prematurely earlier."); - return new NodeState(node.getNode().getType(), State.DOWN).setDescription("Not setting node back up as it failed prematurely at last attempt"); - } - // Hide initializing state in cluster state if initialize progress is so low that we haven't listed buckets yet - if (!node.isDistributor() && reportedState.getState().equals(State.INITIALIZING) && - reportedState.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001) - { - log.log(LogLevel.DEBUG, "Not setting " + node + " initializing in cluster state quite yet, as initializing progress still indicate it is listing buckets."); - return new NodeState(node.getNode().getType(), State.DOWN).setDescription("Listing buckets. Progress " + (100 * reportedState.getInitProgress()) + " %."); - } - return reportedState.clone(); - } - - public boolean handlePrematureCrash(NodeInfo node, NodeStateOrHostInfoChangeHandler changeListener) { - node.setPrematureCrashCount(node.getPrematureCrashCount() + 1); - if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) { - NodeState wantedState = new NodeState(node.getNode().getType(), State.DOWN) - .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row"); - NodeState oldState = node.getWantedState(); - node.setWantedState(wantedState); - if ( ! oldState.equals(wantedState)) { - changeListener.handleNewWantedNodeState(node, wantedState); - } - return true; - } - return false; - } - - public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo hostInfo) { - // Only pass the host info to the latest cluster state view. - currentClusterStateView.handleUpdatedHostInfo(hostnames, nodeInfo, hostInfo); - } - - public class SystemStateHistoryEntry { - - private final ClusterState state; - private final long time; - - SystemStateHistoryEntry(ClusterState state, long time) { - this.state = state; - this.time = time; - } - - public ClusterState state() { return state; } - - public long time() { return time; } - - } - -} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index a21ed994d5d..c4e7c6897e1 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -248,6 +248,8 @@ public class DatabaseHandler { log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion + " into zookeeper."); + // TODO guard version write with a CaS predicated on the version we last read/wrote. + // TODO Drop leadership status if there is a mismatch, as it implies we're racing with another leader. if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) { currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion; pendingStore.lastSystemStateVersion = null; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java index cd9c66d18f0..f952f842151 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java @@ -51,7 +51,7 @@ public class MasterDataGatherer { public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeChildrenChanged: // Fleetcontrollers have either connected or disconnected to ZooKeeper - log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": A change occured in the list of registered fleetcontrollers. Requesting new information"); + log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": A change occurred in the list of registered fleetcontrollers. Requesting new information"); session.getChildren(zooKeeperRoot + "indexes", this, childListener, null); break; case NodeDataChanged: // A fleetcontroller has changed what node it is voting for @@ -160,7 +160,7 @@ public class MasterDataGatherer { } } - /** Calling restart, ignores what we currently know and starts another circly. Typically called after reconnecting to ZooKeeperServer. */ + /** Calling restart, ignores what we currently know and starts another cycle. Typically called after reconnecting to ZooKeeperServer. */ public void restart() { synchronized (nextMasterData) { masterData = new TreeMap(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java index 6de9205bbe3..9428370faf5 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java @@ -2,19 +2,19 @@ package com.yahoo.vespa.clustercontroller.core.status; import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.StateVersionTracker; import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageResponse; import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageServer; -import com.yahoo.vespa.clustercontroller.core.SystemStateGenerator; public class ClusterStateRequestHandler implements StatusPageServer.RequestHandler { - private final SystemStateGenerator systemStateGenerator; + private final StateVersionTracker stateVersionTracker; - public ClusterStateRequestHandler(SystemStateGenerator systemStateGenerator) { - this.systemStateGenerator = systemStateGenerator; + public ClusterStateRequestHandler(StateVersionTracker stateVersionTracker) { + this.stateVersionTracker = stateVersionTracker; } @Override public StatusPageResponse handle(StatusPageServer.HttpRequest request) { - ClusterState cs = systemStateGenerator.getClusterState(); + ClusterState cs = stateVersionTracker.getVersionedClusterState(); StatusPageResponse response = new StatusPageResponse(); response.setContentType("text/plain"); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java index 85db0ac0ef9..ec75ba3532d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java @@ -17,21 +17,22 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa private final Timer timer; private final ContentCluster cluster; private final MasterElectionHandler masterElectionHandler; - private final SystemStateGenerator systemStateGenerator; + private final StateVersionTracker stateVersionTracker; private final EventLog eventLog; private final long startedTime; private final RunDataExtractor data; private boolean showLocalSystemStatesInLog = true; public LegacyIndexPageRequestHandler(Timer timer, boolean showLocalSystemStatesInLog, ContentCluster cluster, - MasterElectionHandler masterElectionHandler, SystemStateGenerator systemStateGenerator, + MasterElectionHandler masterElectionHandler, + StateVersionTracker stateVersionTracker, EventLog eventLog, long startedTime, RunDataExtractor data) { this.timer = timer; this.showLocalSystemStatesInLog = showLocalSystemStatesInLog; this.cluster = cluster; this.masterElectionHandler = masterElectionHandler; - this.systemStateGenerator = systemStateGenerator; + this.stateVersionTracker = stateVersionTracker; this.eventLog = eventLog; this.startedTime = startedTime; this.data = data; @@ -63,7 +64,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa new VdsClusterHtmlRendrer(), content, timer, - systemStateGenerator.getClusterState(), + stateVersionTracker.getVersionedClusterState(), data.getOptions().storageDistribution, data.getOptions(), eventLog, @@ -71,7 +72,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa // Overview of current config data.getOptions().writeHtmlState(content, request); // Current cluster state and cluster state history - writeHtmlState(systemStateGenerator, content, request); + writeHtmlState(stateVersionTracker, content, request); } else { // Overview of current config data.getOptions().writeHtmlState(content, request); @@ -84,7 +85,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa return response; } - public void writeHtmlState(SystemStateGenerator systemStateGenerator, StringBuilder sb, StatusPageServer.HttpRequest request) { + public void writeHtmlState(StateVersionTracker stateVersionTracker, StringBuilder sb, StatusPageServer.HttpRequest request) { boolean showLocal = showLocalSystemStatesInLog; if (request.hasQueryParameter("showlocal")) { showLocal = true; @@ -93,9 +94,9 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa } sb.append("

Cluster states

\n") - .append("

Current cluster state:
").append(systemStateGenerator.currentClusterStateView().toString()).append("

\n"); + .append("

Current cluster state:
").append(stateVersionTracker.getVersionedClusterState().toString()).append("

\n"); - if ( ! systemStateGenerator.systemStateHistory().isEmpty()) { + if ( ! stateVersionTracker.getClusterStateHistory().isEmpty()) { TimeZone tz = TimeZone.getTimeZone("UTC"); sb.append("

Cluster state history

\n"); if (showLocal) { @@ -106,10 +107,10 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa .append(" Cluster state\n") .append("\n"); // Write cluster state history in reverse order (newest on top) - Iterator stateIterator = systemStateGenerator.systemStateHistory().iterator(); - SystemStateGenerator.SystemStateHistoryEntry current = null; + Iterator stateIterator = stateVersionTracker.getClusterStateHistory().iterator(); + ClusterStateHistoryEntry current = null; while (stateIterator.hasNext()) { - SystemStateGenerator.SystemStateHistoryEntry nextEntry = stateIterator.next(); + ClusterStateHistoryEntry nextEntry = stateIterator.next(); if (nextEntry.state().isOfficial() || showLocal) { if (current != null) writeClusterStateEntry(current, nextEntry, sb, tz); current = nextEntry; @@ -120,7 +121,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa } } - private void writeClusterStateEntry(SystemStateGenerator.SystemStateHistoryEntry entry, SystemStateGenerator.SystemStateHistoryEntry last, StringBuilder sb, TimeZone tz) { + private void writeClusterStateEntry(ClusterStateHistoryEntry entry, ClusterStateHistoryEntry last, StringBuilder sb, TimeZone tz) { sb.append("").append(RealTimer.printDate(entry.time(), tz)) .append("").append(entry.state().isOfficial() ? "" : ""); sb.append(entry.state()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java deleted file mode 100644 index fa8128753f6..00000000000 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.clustercontroller.core.status; - -import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageResponse; -import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageServer; - -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.IOException; - -/** - * HTTP request handler for serving a single JAR resource as if it were - * a regular file hosted on the server. Always serves the content verbatim - * (i.e. as a byte stream), specifying a Content-Type provided when creating - * the handler. - * - * @author Tor Brede Vekterli - * @since 5.28 - */ -public class StaticResourceRequestHandler implements StatusPageServer.RequestHandler { - private final byte[] resourceData; - private final String contentType; - - public StaticResourceRequestHandler(String resourcePath, - String contentType) - throws IOException - { - this.resourceData = loadResource(resourcePath); - this.contentType = contentType; - } - - private byte[] loadResource(String resourcePath) throws IOException { - InputStream resourceStream = getClass().getClassLoader().getResourceAsStream(resourcePath); - if (resourceStream == null) { - throw new IOException("No resource with path '" + resourcePath + "' could be found"); - } - return readStreamData(resourceStream); - } - - @Override - public StatusPageResponse handle(StatusPageServer.HttpRequest request) { - final StatusPageResponse response = new StatusPageResponse(); - response.setClientCachingEnabled(true); - response.setContentType(contentType); - try { - response.getOutputStream().write(resourceData); - } catch (IOException e) { - response.setResponseCode(StatusPageResponse.ResponseCode.INTERNAL_SERVER_ERROR); - } - return response; - } - - private byte[] readStreamData(InputStream resourceStream) throws IOException { - final byte[] buf = new byte[4096]; - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - while (true) { - int read = resourceStream.read(buf); - if (read < 0) { - break; - } - outputStream.write(buf, 0, read); - } - outputStream.close(); - return outputStream.toByteArray(); - } -} -- cgit v1.2.3