diff options
42 files changed, 3865 insertions, 1396 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java new file mode 100644 index 00000000000..05a66ddbf2b --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java @@ -0,0 +1,69 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; + +public class AnnotatedClusterState { + private final ClusterState clusterState; + private final Map<Node, NodeStateReason> nodeStateReasons; + private final Optional<ClusterStateReason> clusterStateReason; + + public AnnotatedClusterState(ClusterState clusterState, + Optional<ClusterStateReason> clusterStateReason, + Map<Node, NodeStateReason> nodeStateReasons) + { + this.clusterState = clusterState; + this.clusterStateReason = clusterStateReason; + this.nodeStateReasons = nodeStateReasons; + } + + public static AnnotatedClusterState emptyState() { + return new AnnotatedClusterState(ClusterState.emptyState(), Optional.empty(), emptyNodeStateReasons()); + } + + static Map<Node, NodeStateReason> emptyNodeStateReasons() { + return Collections.emptyMap(); + } + + public ClusterState getClusterState() { + return clusterState; + } + + public Map<Node, NodeStateReason> getNodeStateReasons() { + return Collections.unmodifiableMap(nodeStateReasons); + } + + public Optional<ClusterStateReason> getClusterStateReason() { + return clusterStateReason; + } + + @Override + public String toString() { + return clusterState.toString(); + } + + public String toString(boolean verbose) { + return clusterState.toString(verbose); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AnnotatedClusterState that = (AnnotatedClusterState) o; + return Objects.equals(clusterState, that.clusterState) && + Objects.equals(nodeStateReasons, that.nodeStateReasons) && + Objects.equals(clusterStateReason, that.clusterStateReason); + } + + @Override + public int hashCode() { + return Objects.hash(clusterState, nodeStateReasons, clusterStateReason); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java new file mode 100644 index 00000000000..e6fbed71153 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java @@ -0,0 +1,345 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeMap; + +/** + * Pure functional cluster state generator which deterministically constructs a full + * cluster state given the state of the content cluster, a set of cluster controller + * configuration parameters and the current time. + * + * State version tracking is considered orthogonal to state generation. Therefore, + * cluster state version is _not_ set here; its incrementing must be handled by the + * caller. + */ +public class ClusterStateGenerator { + + static class Params { + public ContentCluster cluster; + public Map<NodeType, Integer> transitionTimes; + public long currentTimeInMillis = 0; + public int maxPrematureCrashes = 0; + public int minStorageNodesUp = 1; + public int minDistributorNodesUp = 1; + public double minRatioOfStorageNodesUp = 0.0; + public double minRatioOfDistributorNodesUp = 0.0; + public double minNodeRatioPerGroup = 0.0; + public int idealDistributionBits = 16; + public int highestObservedDistributionBitCount = 16; + public int lowestObservedDistributionBitCount = 16; + public int maxInitProgressTimeMs = 5000; + + Params() { + this.transitionTimes = buildTransitionTimeMap(0, 0); + } + + // FIXME de-dupe + static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTimeMs, int storageTransitionTimeMs) { + Map<com.yahoo.vdslib.state.NodeType, java.lang.Integer> maxTransitionTime = new TreeMap<>(); + maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.DISTRIBUTOR, distributorTransitionTimeMs); + maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.STORAGE, storageTransitionTimeMs); + return maxTransitionTime; + } + + Params cluster(ContentCluster cluster) { + this.cluster = cluster; + return this; + } + Params maxInitProgressTime(int maxTimeMs) { + this.maxInitProgressTimeMs = maxTimeMs; + return this; + } + Params transitionTimes(int timeMs) { + this.transitionTimes = buildTransitionTimeMap(timeMs, timeMs); + return this; + } + Params transitionTimes(Map<NodeType, Integer> timesMs) { + this.transitionTimes = timesMs; + return this; + } + Params currentTimeInMilllis(long currentTimeMs) { + this.currentTimeInMillis = currentTimeMs; + return this; + } + Params maxPrematureCrashes(int count) { + this.maxPrematureCrashes = count; + return this; + } + Params minStorageNodesUp(int nodes) { + this.minStorageNodesUp = nodes; + return this; + } + Params minDistributorNodesUp(int nodes) { + this.minDistributorNodesUp = nodes; + return this; + } + Params minRatioOfStorageNodesUp(double minRatio) { + this.minRatioOfStorageNodesUp = minRatio; + return this; + } + Params minRatioOfDistributorNodesUp(double minRatio) { + this.minRatioOfDistributorNodesUp = minRatio; + return this; + } + Params minNodeRatioPerGroup(double minRatio) { + this.minNodeRatioPerGroup = minRatio; + return this; + } + Params idealDistributionBits(int distributionBits) { + this.idealDistributionBits = distributionBits; + return this; + } + Params highestObservedDistributionBitCount(int bitCount) { + this.highestObservedDistributionBitCount = bitCount; + return this; + } + Params lowestObservedDistributionBitCount(int bitCount) { + this.lowestObservedDistributionBitCount = bitCount; + return this; + } + + /** + * Infer parameters from controller options. Important: does _not_ set cluster; + * it must be explicitly set afterwards on the returned parameter object before + * being used to compute states. + */ + static Params fromOptions(FleetControllerOptions opts) { + return new Params() + .maxPrematureCrashes(opts.maxPrematureCrashes) + .minStorageNodesUp(opts.minStorageNodesUp) + .minDistributorNodesUp(opts.minDistributorNodesUp) + .minRatioOfStorageNodesUp(opts.minRatioOfStorageNodesUp) + .minRatioOfDistributorNodesUp(opts.minRatioOfDistributorNodesUp) + .minNodeRatioPerGroup(opts.minNodeRatioPerGroup) + .idealDistributionBits(opts.distributionBits) + .transitionTimes(opts.maxTransitionTime); + } + } + + static AnnotatedClusterState generatedStateFrom(final Params params) { + final ContentCluster cluster = params.cluster; + final ClusterState workingState = ClusterState.emptyState(); + final Map<Node, NodeStateReason> nodeStateReasons = new HashMap<>(); + + for (final NodeInfo nodeInfo : cluster.getNodeInfo()) { + final NodeState nodeState = computeEffectiveNodeState(nodeInfo, params); + workingState.setNodeState(nodeInfo.getNode(), nodeState); + } + + takeDownGroupsWithTooLowAvailability(workingState, nodeStateReasons, params); + + final Optional<ClusterStateReason> reasonToBeDown = clusterDownReason(workingState, params); + if (reasonToBeDown.isPresent()) { + workingState.setClusterState(State.DOWN); + } + workingState.setDistributionBits(inferDistributionBitCount(cluster, workingState, params)); + + return new AnnotatedClusterState(workingState, reasonToBeDown, nodeStateReasons); + } + + private static boolean nodeIsConsideredTooUnstable(final NodeInfo nodeInfo, final Params params) { + return (params.maxPrematureCrashes != 0 + && nodeInfo.getPrematureCrashCount() > params.maxPrematureCrashes); + } + + private static void applyWantedStateToBaselineState(final NodeState baseline, final NodeState wanted) { + // Only copy state and description from Wanted state; this preserves auxiliary + // information such as disk states and startup timestamp. + baseline.setState(wanted.getState()); + baseline.setDescription(wanted.getDescription()); + } + + private static NodeState computeEffectiveNodeState(final NodeInfo nodeInfo, final Params params) { + final NodeState reported = nodeInfo.getReportedState(); + final NodeState wanted = nodeInfo.getWantedState(); + final NodeState baseline = reported.clone(); + + if (nodeIsConsideredTooUnstable(nodeInfo, params)) { + baseline.setState(State.DOWN); + } + if (startupTimestampAlreadyObservedByAllNodes(nodeInfo, baseline)) { + baseline.setStartTimestamp(0); + } + if (nodeInfo.isStorage()) { + applyStorageSpecificStateTransforms(nodeInfo, params, reported, wanted, baseline); + } + if (baseline.above(wanted)) { + applyWantedStateToBaselineState(baseline, wanted); + } + + return baseline; + } + + private static void applyStorageSpecificStateTransforms(NodeInfo nodeInfo, Params params, NodeState reported, + NodeState wanted, NodeState baseline) + { + if (reported.getState() == State.INITIALIZING) { + if (timedOutWithoutNewInitProgress(reported, nodeInfo, params) + || shouldForceInitToDown(reported) + || nodeInfo.recentlyObservedUnstableDuringInit()) + { + baseline.setState(State.DOWN); + } + if (shouldForceInitToMaintenance(reported, wanted)) { + baseline.setState(State.MAINTENANCE); + } + } + // TODO ensure that maintenance cannot override Down for any other cases + if (withinTemporalMaintenancePeriod(nodeInfo, baseline, params) && wanted.getState() != State.DOWN) { + baseline.setState(State.MAINTENANCE); + } + } + + // TODO remove notion of init timeout progress? Seems redundant when we've already got RPC timeouts + private static boolean timedOutWithoutNewInitProgress(final NodeState reported, final NodeInfo nodeInfo, final Params params) { + if (reported.getState() != State.INITIALIZING) { + return false; + } + if (params.maxInitProgressTimeMs <= 0) { + return false; // No upper bound for max init time; auto-down for all intents and purposes disabled. + } + return nodeInfo.getInitProgressTime() + params.maxInitProgressTimeMs <= params.currentTimeInMillis; + } + + // Init while listing buckets should be treated as Down, as distributors expect a storage node + // in Init mode to have a bucket set readily available. Clients also expect a node in Init to + // be able to receive operations. + // Precondition: reported.getState() == State.INITIALIZING + private static boolean shouldForceInitToDown(final NodeState reported) { + return reported.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001; + } + + // Special case: since each node is published with a single state, if we let a Retired node + // be published with Initializing, it'd start receiving feed and merges. Avoid this by + // having it be in maintenance instead for the duration of the init period. + private static boolean shouldForceInitToMaintenance(final NodeState reported, final NodeState wanted) { + return reported.getState() == State.INITIALIZING && wanted.getState() == State.RETIRED; + } + + private static boolean startupTimestampAlreadyObservedByAllNodes(final NodeInfo nodeInfo, final NodeState baseline) { + return baseline.getStartTimestamp() == nodeInfo.getStartTimestamp(); // TODO rename NodeInfo getter/setter + } + + /** + * Determines whether a given storage node should be implicitly set as being + * in a maintenance state despite its reported state being Down. This is + * predominantly a case when contact has just been lost with a node, but we + * do not want to immediately set it to Down just yet (where "yet" is a configurable + * amount of time; see params.transitionTime). This is to prevent common node + * restart/upgrade scenarios from triggering redistribution and data replication + * that would be useless work if the node comes back up immediately afterwards. + * + * Only makes sense to call for storage nodes, since distributors don't support + * being in maintenance mode. + */ + private static boolean withinTemporalMaintenancePeriod(final NodeInfo nodeInfo, + final NodeState baseline, + final Params params) + { + final Integer transitionTime = params.transitionTimes.get(nodeInfo.getNode().getType()); + if (transitionTime == 0 || !baseline.getState().oneOf("sd")) { + return false; + } + return nodeInfo.getTransitionTime() + transitionTime > params.currentTimeInMillis; + } + + private static void takeDownGroupsWithTooLowAvailability(final ClusterState workingState, + Map<Node, NodeStateReason> nodeStateReasons, + final Params params) + { + final GroupAvailabilityCalculator calc = new GroupAvailabilityCalculator.Builder() + .withMinNodeRatioPerGroup(params.minNodeRatioPerGroup) + .withDistribution(params.cluster.getDistribution()) + .build(); + final Set<Integer> nodesToTakeDown = calc.nodesThatShouldBeDown(workingState); + + for (Integer idx : nodesToTakeDown) { + final Node node = storageNode(idx); + final NodeState newState = new NodeState(NodeType.STORAGE, State.DOWN); + newState.setDescription("group node availability below configured threshold"); + workingState.setNodeState(node, newState); + nodeStateReasons.put(node, NodeStateReason.GROUP_IS_DOWN); + } + } + + private static Node storageNode(int index) { + return new Node(NodeType.STORAGE, index); + } + + // TODO we'll want to explicitly persist a bit lower bound in ZooKeeper and ensure we + // never go below it (this is _not_ the case today). Nodes that have min bits lower than + // this will just have to start splitting out in the background before being allowed + // to join the cluster. + + private static int inferDistributionBitCount(final ContentCluster cluster, + final ClusterState state, + final Params params) + { + int bitCount = params.idealDistributionBits; + final Optional<Integer> minBits = cluster.getConfiguredNodes().values().stream() + .map(configuredNode -> cluster.getNodeInfo(storageNode(configuredNode.index()))) + .filter(node -> state.getNodeState(node.getNode()).getState().oneOf("iur")) + .map(nodeInfo -> nodeInfo.getReportedState().getMinUsedBits()) + .min(Integer::compare); + + if (minBits.isPresent() && minBits.get() < bitCount) { + bitCount = minBits.get(); + } + if (bitCount > params.lowestObservedDistributionBitCount && bitCount < params.idealDistributionBits) { + bitCount = params.lowestObservedDistributionBitCount; + } + + return bitCount; + } + + private static boolean nodeStateIsConsideredAvailable(final NodeState ns) { + return (ns.getState() == State.UP + || ns.getState() == State.RETIRED + || ns.getState() == State.INITIALIZING); + } + + private static long countAvailableNodesOfType(final NodeType type, + final ContentCluster cluster, + final ClusterState state) + { + return cluster.getConfiguredNodes().values().stream() + .map(node -> state.getNodeState(new Node(type, node.index()))) + .filter(ClusterStateGenerator::nodeStateIsConsideredAvailable) + .count(); + } + + private static Optional<ClusterStateReason> clusterDownReason(final ClusterState state, final Params params) { + final ContentCluster cluster = params.cluster; + + final long upStorageCount = countAvailableNodesOfType(NodeType.STORAGE, cluster, state); + final long upDistributorCount = countAvailableNodesOfType(NodeType.DISTRIBUTOR, cluster, state); + // There's a 1-1 relationship between distributors and storage nodes, so don't need to + // keep track of separate node counts for computing availability ratios. + final long nodeCount = cluster.getConfiguredNodes().size(); + + if (upStorageCount < params.minStorageNodesUp) { + return Optional.of(ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE); + } + if (upDistributorCount < params.minDistributorNodesUp) { + return Optional.of(ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE); + } + if (params.minRatioOfStorageNodesUp * nodeCount > upStorageCount) { + return Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO); + } + if (params.minRatioOfDistributorNodesUp * nodeCount > upDistributorCount) { + return Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO); + } + return Optional.empty(); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java new file mode 100644 index 00000000000..3963fcaa45b --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java @@ -0,0 +1,46 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +import java.util.Objects; + +public class ClusterStateHistoryEntry { + + private final ClusterState state; + private final long time; + + ClusterStateHistoryEntry(final ClusterState state, final long time) { + this.state = state; + this.time = time; + } + + public ClusterState state() { + return state; + } + + public long time() { + return time; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ClusterStateHistoryEntry that = (ClusterStateHistoryEntry) o; + return time == that.time && + Objects.equals(state, that.state); + } + + @Override + public int hashCode() { + return Objects.hash(state, time); + } + + // String representation only used for test expectation failures and debugging output. + // Actual status page history entry rendering emits formatted date/time. + public String toString() { + return String.format("state '%s' at time %d", state, time); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java new file mode 100644 index 00000000000..3557ed1ceb8 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java @@ -0,0 +1,15 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +/** + * Explicit reasons for why a cluster has been assigned a particular global state. + * This only includes reasons that aren't directly possible to infer from diffing + * two cluster states; i.e. distribution bit changes aren't listed here because + * they are obvious from direct inspection. + */ +public enum ClusterStateReason { + TOO_FEW_STORAGE_NODES_AVAILABLE, + TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE, + TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO, + TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO, +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java index 328acfb4dbe..644d6b28b05 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java @@ -41,6 +41,10 @@ public class ClusterStateView { return new ClusterStateView(clusterState, createNewAggregator(clusterState, metricUpdater), metricUpdater); } + public static ClusterStateView create(final ClusterState clusterState, final MetricUpdater metricUpdater) { + return new ClusterStateView(clusterState, createNewAggregator(clusterState, metricUpdater), metricUpdater); + } + private static ClusterStatsAggregator createNewAggregator(ClusterState clusterState, MetricUpdater metricUpdater) { Set<Integer> upDistributors = getIndicesOfUpNodes(clusterState, NodeType.DISTRIBUTOR); Set<Integer> upStorageNodes = getIndicesOfUpNodes(clusterState, NodeType.STORAGE); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java new file mode 100644 index 00000000000..2e5d99f2e67 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java @@ -0,0 +1,143 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +/** + * Responsible for inferring the difference between two cluster states and their + * state annotations and producing a set of events that describe the changes between + * the two. Diffing the states directly provides a clear picture of _what_ has changed, + * while the annotations are generally required to explain _why_ the changes happened + * in the first place. + * + * Events are primarily used for administrative/user visibility into what's happening + * in the cluster and are output to the Vespa log as well as kept in a circular history + * buffer per node and for the cluster as a whole. + */ +public class EventDiffCalculator { + + static class Params { + ContentCluster cluster; + AnnotatedClusterState fromState; + AnnotatedClusterState toState; + long currentTime; + + public Params cluster(ContentCluster cluster) { + this.cluster = cluster; + return this; + } + public Params fromState(AnnotatedClusterState clusterState) { + this.fromState = clusterState; + return this; + } + public Params toState(AnnotatedClusterState clusterState) { + this.toState = clusterState; + return this; + } + public Params currentTimeMs(long time) { + this.currentTime = time; + return this; + } + } + + public static List<Event> computeEventDiff(final Params params) { + final List<Event> events = new ArrayList<>(); + + emitPerNodeDiffEvents(params, events); + emitWholeClusterDiffEvent(params, events); + return events; + } + + private static ClusterEvent createClusterEvent(String description, Params params) { + return new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, description, params.currentTime); + } + + private static boolean clusterDownBecause(final Params params, ClusterStateReason wantedReason) { + final Optional<ClusterStateReason> actualReason = params.toState.getClusterStateReason(); + return actualReason.isPresent() && actualReason.get().equals(wantedReason); + } + + private static void emitWholeClusterDiffEvent(final Params params, final List<Event> events) { + final ClusterState fromState = params.fromState.getClusterState(); + final ClusterState toState = params.toState.getClusterState(); + + if (clusterHasTransitionedToUpState(fromState, toState)) { + events.add(createClusterEvent("Enough nodes available for system to become up", params)); + } else if (clusterHasTransitionedToDownState(fromState, toState)) { + if (clusterDownBecause(params, ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE)) { + events.add(createClusterEvent("Too few storage nodes available in cluster. Setting cluster state down", params)); + } else if (clusterDownBecause(params, ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE)) { + events.add(createClusterEvent("Too few distributor nodes available in cluster. Setting cluster state down", params)); + } else if (clusterDownBecause(params, ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO)) { + events.add(createClusterEvent("Too low ratio of available storage nodes. Setting cluster state down", params)); + } else if (clusterDownBecause(params, ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO)) { + events.add(createClusterEvent("Too low ratio of available distributor nodes. Setting cluster state down", params)); + } else { + events.add(createClusterEvent("Cluster is down", params)); + } + } + } + + private static NodeEvent createNodeEvent(NodeInfo nodeInfo, String description, Params params) { + return new NodeEvent(nodeInfo, description, NodeEvent.Type.CURRENT, params.currentTime); + } + + private static void emitPerNodeDiffEvents(final Params params, final List<Event> events) { + final ContentCluster cluster = params.cluster; + final ClusterState fromState = params.fromState.getClusterState(); + final ClusterState toState = params.toState.getClusterState(); + + for (ConfiguredNode node : cluster.getConfiguredNodes().values()) { + for (NodeType nodeType : NodeType.getTypes()) { + final Node n = new Node(nodeType, node.index()); + emitSingleNodeEvents(params, events, cluster, fromState, toState, n); + } + } + } + + private static void emitSingleNodeEvents(Params params, List<Event> events, ContentCluster cluster, ClusterState fromState, ClusterState toState, Node n) { + final NodeState nodeFrom = fromState.getNodeState(n); + final NodeState nodeTo = toState.getNodeState(n); + if (!nodeTo.equals(nodeFrom)) { + final NodeInfo info = cluster.getNodeInfo(n); + events.add(createNodeEvent(info, String.format("Altered node state in cluster state from '%s' to '%s'", + nodeFrom.toString(true), nodeTo.toString(true)), params)); + + NodeStateReason prevReason = params.fromState.getNodeStateReasons().get(n); + NodeStateReason currReason = params.toState.getNodeStateReasons().get(n); + if (isGroupDownEdge(prevReason, currReason)) { + events.add(createNodeEvent(info, "Group node availability is below configured threshold", params)); + } else if (isGroupUpEdge(prevReason, currReason)) { + events.add(createNodeEvent(info, "Group node availability has been restored", params)); + } + } + } + + private static boolean isGroupUpEdge(NodeStateReason prevReason, NodeStateReason currReason) { + return prevReason == NodeStateReason.GROUP_IS_DOWN && currReason != NodeStateReason.GROUP_IS_DOWN; + } + + private static boolean isGroupDownEdge(NodeStateReason prevReason, NodeStateReason currReason) { + return prevReason != NodeStateReason.GROUP_IS_DOWN && currReason == NodeStateReason.GROUP_IS_DOWN; + } + + private static boolean clusterHasTransitionedToUpState(ClusterState prevState, ClusterState currentState) { + return prevState.getClusterState() != State.UP && currentState.getClusterState() == State.UP; + } + + private static boolean clusterHasTransitionedToDownState(ClusterState prevState, ClusterState currentState) { + return prevState.getClusterState() != State.DOWN && currentState.getClusterState() == State.DOWN; + } + + public static Params params() { return new Params(); } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index ceeeddf49fa..b21cae4ed71 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -7,6 +7,7 @@ import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; import com.yahoo.vespa.clustercontroller.core.listeners.*; @@ -37,8 +38,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final ContentCluster cluster; private final Communicator communicator; private final NodeStateGatherer stateGatherer; - private final SystemStateGenerator systemStateGenerator; + private final StateChangeHandler stateChangeHandler; private final SystemStateBroadcaster systemStateBroadcaster; + private final StateVersionTracker stateVersionTracker; private final StatusPageServerInterface statusPageServer; private final RpcServer rpcServer; private final DatabaseHandler database; @@ -59,7 +61,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final List<com.yahoo.vdslib.state.ClusterState> newStates = new ArrayList<>(); private long configGeneration = -1; private long nextConfigGeneration = -1; - private List<RemoteClusterControllerTask> remoteTasks = new ArrayList<>(); + private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>(); private final MetricUpdater metricUpdater; private boolean isMaster = false; @@ -69,7 +71,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private final RunDataExtractor dataExtractor = new RunDataExtractor() { @Override - public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return systemStateGenerator.getClusterState(); } + public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); } @Override public FleetControllerOptions getOptions() { return options; } @Override @@ -87,7 +89,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd RpcServer server, NodeLookup nodeLookup, DatabaseHandler database, - SystemStateGenerator systemStateGenerator, + StateChangeHandler stateChangeHandler, SystemStateBroadcaster systemStateBroadcaster, MasterElectionHandler masterElectionHandler, MetricUpdater metricUpdater, @@ -103,8 +105,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd this.communicator = communicator; this.database = database; this.stateGatherer = nodeStateGatherer; - this.systemStateGenerator = systemStateGenerator; + this.stateChangeHandler = stateChangeHandler; this.systemStateBroadcaster = systemStateBroadcaster; + this.stateVersionTracker = new StateVersionTracker(metricUpdater); this.metricUpdater = metricUpdater; this.statusPageServer = statusPage; @@ -120,12 +123,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd new NodeHealthRequestHandler(dataExtractor)); this.statusRequestRouter.addHandler( "^/clusterstate", - new ClusterStateRequestHandler(systemStateGenerator)); + new ClusterStateRequestHandler(stateVersionTracker)); this.statusRequestRouter.addHandler( "^/$", new LegacyIndexPageRequestHandler( timer, options.showLocalSystemStatesInEventLog, cluster, - masterElectionHandler, systemStateGenerator, + masterElectionHandler, stateVersionTracker, eventLog, timer.getCurrentTimeInMillis(), dataExtractor)); propagateOptions(); @@ -169,7 +172,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.nodeStateRequestRoundTripTimeMaxSeconds); DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); NodeLookup lookUp = new SlobrokClient(timer); - SystemStateGenerator stateGenerator = new SystemStateGenerator(timer, log, metricUpdater); + StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); FleetController controller = new FleetController( @@ -246,7 +249,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd public com.yahoo.vdslib.state.ClusterState getSystemState() { synchronized(monitor) { - return systemStateGenerator.getClusterState(); + return stateVersionTracker.getVersionedClusterState(); } } @@ -299,41 +302,41 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd @Override public void handleNewNodeState(NodeInfo node, NodeState newState) { verifyInControllerThread(); - systemStateGenerator.handleNewReportedNodeState(node, newState, this); + stateChangeHandler.handleNewReportedNodeState(stateVersionTracker.getVersionedClusterState(), node, newState, this); } @Override public void handleNewWantedNodeState(NodeInfo node, NodeState newState) { verifyInControllerThread(); wantedStateChanged = true; - systemStateGenerator.proposeNewNodeState(node, newState); + stateChangeHandler.proposeNewNodeState(stateVersionTracker.getVersionedClusterState(), node, newState); } @Override public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo newHostInfo) { verifyInControllerThread(); - systemStateGenerator.handleUpdatedHostInfo(nodeInfo, newHostInfo); + stateVersionTracker.handleUpdatedHostInfo(stateChangeHandler.getHostnames(), nodeInfo, newHostInfo); } @Override public void handleNewNode(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleNewNode(node); + stateChangeHandler.handleNewNode(node); } @Override public void handleMissingNode(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleMissingNode(node, this); + stateChangeHandler.handleMissingNode(stateVersionTracker.getVersionedClusterState(), node, this); } @Override public void handleNewRpcAddress(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleNewRpcAddress(node); + stateChangeHandler.handleNewRpcAddress(node); } @Override public void handleReturnedRpcAddress(NodeInfo node) { verifyInControllerThread(); - systemStateGenerator.handleReturnedRpcAddress(node); + stateChangeHandler.handleReturnedRpcAddress(node); } public void handleNewSystemState(com.yahoo.vdslib.state.ClusterState state) { @@ -370,7 +373,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd /** Called when all distributors have acked newest cluster state version. */ public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { - systemStateGenerator.handleAllDistributorsInSync(database, context); + Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); + stateChangeHandler.handleAllDistributorsInSync( + stateVersionTracker.getVersionedClusterState(), nodes, database, context); } private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) { @@ -409,17 +414,11 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd database.setZooKeeperSessionTimeout(options.zooKeeperSessionTimeout); stateGatherer.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); stateGatherer.setNodeStateRequestTimeout(options.nodeStateRequestTimeoutMS); - systemStateGenerator.setNodes(cluster.clusterInfo()); - systemStateGenerator.setMaxTransitionTime(options.maxTransitionTime); - systemStateGenerator.setMaxInitProgressTime(options.maxInitProgressTime); - systemStateGenerator.setMaxPrematureCrashes(options.maxPrematureCrashes); - systemStateGenerator.setStableStateTimePeriod(options.stableStateTimePeriod); - systemStateGenerator.setMinNodesUp(options.minDistributorNodesUp, options.minStorageNodesUp, - options.minRatioOfDistributorNodesUp, options.minRatioOfStorageNodesUp); - systemStateGenerator.setMinNodeRatioPerGroup(options.minNodeRatioPerGroup); - systemStateGenerator.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); - systemStateGenerator.setDistributionBits(options.distributionBits); - systemStateGenerator.setDistribution(options.storageDistribution); + + // TODO: remove as many temporal parameter dependencies as possible here. Currently duplication of state. + stateChangeHandler.reconfigureFromOptions(options); + stateChangeHandler.setStateChangedFlag(); // Always trigger state recomputation after reconfig + masterElectionHandler.setFleetControllerCount(options.fleetControllerCount); masterElectionHandler.setMasterZooKeeperCooldownPeriod(options.masterZooKeeperCooldownPeriod); @@ -491,7 +490,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd didWork = database.doNextZooKeeperTask(databaseContext); didWork |= updateMasterElectionState(); didWork |= handleLeadershipEdgeTransitions(); - systemStateGenerator.setMaster(isMaster); + stateChangeHandler.setMaster(isMaster); // Process zero or more getNodeState responses that we have received. didWork |= stateGatherer.processResponses(this); @@ -510,10 +509,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd didWork |= processAnyPendingStatusPageRequest(); if (rpcServer != null) { - didWork |= rpcServer.handleRpcRequests(cluster, systemStateGenerator.getClusterState(), this, this); + didWork |= rpcServer.handleRpcRequests(cluster, consolidatedClusterState(), this, this); } - processAllQueuedRemoteTasks(); + didWork |= processNextQueuedRemoteTask(); processingCycle = false; ++cycleCount; @@ -606,25 +605,52 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } } - private void processAllQueuedRemoteTasks() { + private boolean processNextQueuedRemoteTask() { if ( ! remoteTasks.isEmpty()) { - RemoteClusterControllerTask.Context context = new RemoteClusterControllerTask.Context(); - context.cluster = cluster; - context.currentState = systemStateGenerator.getConsolidatedClusterState(); - context.masterInfo = masterElectionHandler; - context.nodeStateOrHostInfoChangeHandler = this; - context.nodeAddedOrRemovedListener = this; - for (RemoteClusterControllerTask task : remoteTasks) { - log.finest("Processing remote task " + task.getClass().getName()); - task.doRemoteFleetControllerTask(context); - task.notifyCompleted(); - log.finest("Done processing remote task " + task.getClass().getName()); - } - log.fine("Completed processing remote tasks"); - remoteTasks.clear(); + final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); + final RemoteClusterControllerTask task = remoteTasks.poll(); + log.finest("Processing remote task " + task.getClass().getName()); + task.doRemoteFleetControllerTask(context); + task.notifyCompleted(); + log.finest("Done processing remote task " + task.getClass().getName()); + return true; } + return false; } + private RemoteClusterControllerTask.Context createRemoteTaskProcessingContext() { + final RemoteClusterControllerTask.Context context = new RemoteClusterControllerTask.Context(); + context.cluster = cluster; + context.currentState = consolidatedClusterState(); + context.masterInfo = masterElectionHandler; + context.nodeStateOrHostInfoChangeHandler = this; + context.nodeAddedOrRemovedListener = this; + return context; + } + + /** + * A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are + * up or down even when the whole cluster is down. The regular, published cluster state is not + * normally updated to reflect node events when the cluster is down. + */ + ClusterState consolidatedClusterState() { + final ClusterState publishedState = stateVersionTracker.getVersionedClusterState(); + if (publishedState.getClusterState() == State.UP) { + return publishedState; // Short-circuit; already represents latest node state + } + // Latest candidate state contains the most up to date state information, even if it may not + // have been published yet. + final ClusterState current = stateVersionTracker.getLatestCandidateState().getClusterState().clone(); + current.setVersion(publishedState.getVersion()); + return current; + } + + /* + System test observations: + - a node that stops normally (U -> S) then goes down erroneously triggers premature crash handling + - long time before content node state convergence (though this seems to be the case for legacy impl as well) + */ + private boolean resyncLocallyCachedState() throws InterruptedException { boolean didWork = false; // Let non-master state gatherers update wanted states once in a while, so states generated and shown are close to valid. @@ -637,31 +663,99 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Send getNodeState requests to zero or more nodes. didWork |= stateGatherer.sendMessages(cluster, communicator, this); - didWork |= systemStateGenerator.watchTimers(cluster, this); - didWork |= systemStateGenerator.notifyIfNewSystemState(cluster, this); + // Important: timer events must use a consolidated state, or they might trigger edge events multiple times. + didWork |= stateChangeHandler.watchTimers(cluster, consolidatedClusterState(), this); + + didWork |= recomputeClusterStateIfRequired(); if ( ! isStateGatherer) { if ( ! isMaster) { eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis())); // Update versions to use so what is shown is closer to what is reality on the master - systemStateGenerator.setLatestSystemStateVersion(database.getLatestSystemStateVersion()); + stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); } } isStateGatherer = true; return didWork; } + private boolean recomputeClusterStateIfRequired() { + if (mustRecomputeCandidateClusterState()) { + stateChangeHandler.unsetStateChangedFlag(); + final AnnotatedClusterState candidate = computeCurrentAnnotatedState(); + stateVersionTracker.updateLatestCandidateState(candidate); + + if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish() + || stateVersionTracker.hasReceivedNewVersionFromZooKeeper()) + { + final long timeNowMs = timer.getCurrentTimeInMillis(); + final AnnotatedClusterState before = stateVersionTracker.getAnnotatedVersionedClusterState(); + + stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); + emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs); + handleNewSystemState(stateVersionTracker.getVersionedClusterState()); + return true; + } + } + return false; + } + + private AnnotatedClusterState computeCurrentAnnotatedState() { + ClusterStateGenerator.Params params = ClusterStateGenerator.Params.fromOptions(options); + params.currentTimeInMilllis(timer.getCurrentTimeInMillis()) + .cluster(cluster) + .lowestObservedDistributionBitCount(stateVersionTracker.getLowestObservedDistributionBits()); + return ClusterStateGenerator.generatedStateFrom(params); + } + + private void emitEventsForAlteredStateEdges(final AnnotatedClusterState fromState, + final AnnotatedClusterState toState, + final long timeNowMs) { + final List<Event> deltaEvents = EventDiffCalculator.computeEventDiff( + EventDiffCalculator.params() + .cluster(cluster) + .fromState(fromState) + .toState(toState) + .currentTimeMs(timeNowMs)); + for (Event event : deltaEvents) { + eventLog.add(event, isMaster); + } + + emitStateAppliedEvents(timeNowMs, fromState.getClusterState(), toState.getClusterState()); + } + + private void emitStateAppliedEvents(long timeNowMs, ClusterState fromClusterState, ClusterState toClusterState) { + eventLog.add(new ClusterEvent( + ClusterEvent.Type.SYSTEMSTATE, + "New cluster state version " + toClusterState.getVersion() + ". Change from last: " + + fromClusterState.getTextualDifference(toClusterState), + timeNowMs), isMaster); + + if (toClusterState.getDistributionBitCount() != fromClusterState.getDistributionBitCount()) { + eventLog.add(new ClusterEvent( + ClusterEvent.Type.SYSTEMSTATE, + "Altering distribution bits in system from " + + fromClusterState.getDistributionBitCount() + " to " + + toClusterState.getDistributionBitCount(), + timeNowMs), isMaster); + } + } + + private boolean mustRecomputeCandidateClusterState() { + return stateChangeHandler.stateMayHaveChanged() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper(); + } + private boolean handleLeadershipEdgeTransitions() throws InterruptedException { boolean didWork = false; if (masterElectionHandler.isMaster()) { if ( ! isMaster) { metricUpdater.becameMaster(); // If we just became master, restore wanted states from database - systemStateGenerator.setLatestSystemStateVersion(database.getLatestSystemStateVersion()); + stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); didWork = database.loadStartTimestamps(cluster); didWork |= database.loadWantedStates(databaseContext); eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to " - + systemStateGenerator.getClusterState().getVersion() + " to be in line.", timer.getCurrentTimeInMillis())); + + stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis())); long currentTime = timer.getCurrentTimeInMillis(); firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast; log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be " @@ -693,6 +787,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } catch (InterruptedException e) { log.log(LogLevel.DEBUG, "Event thread stopped by interrupt exception: " + e); } catch (Throwable t) { + t.printStackTrace(); log.log(LogLevel.ERROR, "Fatal error killed fleet controller", t); synchronized (monitor) { running = false; } System.exit(1); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java index 74b15b61ac3..e24e5f6914e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java @@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.stream.Stream; @@ -105,6 +106,9 @@ class GroupAvailabilityCalculator { } public Set<Integer> nodesThatShouldBeDown(ClusterState state) { + if (distribution == null) { // FIXME: for tests that don't set distribution properly! + return Collections.emptySet(); + } if (isFlatCluster(distribution.getRootGroup())) { // Implicit group takedown only applies to hierarchic cluster setups. return new HashSet<>(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java index 6c48bdf12d0..1a48b088ca3 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java @@ -240,7 +240,7 @@ public class MasterElectionHandler implements MasterInterface { .append(".</p>"); } else if (masterGoneFromZooKeeperTime + masterZooKeeperCooldownPeriod > timer.getCurrentTimeInMillis()) { long time = timer.getCurrentTimeInMillis() - masterGoneFromZooKeeperTime; - sb.append("<p>There is currently no master. Only " + (time / 1000) + " seconds have past since") + sb.append("<p>There is currently no master. Only " + (time / 1000) + " seconds have passed since") .append(" old master disappeared. At least " + (masterZooKeeperCooldownPeriod / 1000) + " must pass") .append(" before electing new master unless all possible master candidates are online.</p>"); } @@ -249,7 +249,7 @@ public class MasterElectionHandler implements MasterInterface { sb.append("<p>As we are number ").append(nextInLineCount) .append(" in line for taking over as master, we're gathering state from nodes.</p>"); sb.append("<p><font color=\"red\">As we are not the master, we don't know about nodes current system state" - + " or wanted states, so some statistics below are a bit incorrect. Look at status page on master " + + " or wanted states, so some statistics below may be stale. Look at status page on master " + "for updated data.</font></p>"); } if (index * 2 > totalCount) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java index d9d83c705b1..944cbd02082 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java @@ -45,4 +45,8 @@ public class NodeEvent implements Event { public String getCategory() { return type.toString(); } + + public Type getType() { + return type; + } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index c261a4bb194..87a32e1e088 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -35,6 +35,18 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { /** Whether this node has been configured to be retired and should therefore always return retired as its wanted state */ private boolean configuredRetired; + /** + * Node has been observed transitioning from Init to Down at least once during the last "premature crash count" + * period. Gets reset whenever the crash count is reset to zero after a period of stability. + * + * Flag can also be explicitly toggled by external code, such as if a reported node state + * handler discovers "reverse" init progress. This indicates a "silent" down edge and should be + * handled as such. + * + * It is an explicit choice that we only do this on an edge to Down (and not Stopping). Stopping implies + * an administrative action, not that the node itself is unstable. + */ + private boolean recentlyObservedUnstableDuringInit; /** The time we set the current state last. */ private long nextAttemptTime; @@ -97,6 +109,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { this.version = getLatestVersion(); this.connectionVersion = getLatestVersion(); this.configuredRetired = configuredRetired; + this.recentlyObservedUnstableDuringInit = false; this.rpcAddress = rpcAddress; this.lastSeenInSlobrok = null; this.nextAttemptTime = 0; @@ -132,7 +145,17 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public int getConnectionAttemptCount() { return connectionAttemptCount; } + public boolean recentlyObservedUnstableDuringInit() { + return recentlyObservedUnstableDuringInit; + } + public void setRecentlyObservedUnstableDuringInit(boolean unstable) { + recentlyObservedUnstableDuringInit = unstable; + } + public void setPrematureCrashCount(int count) { + if (count == 0) { + recentlyObservedUnstableDuringInit = false; + } if (prematureCrashCount != count) { prematureCrashCount = count; log.log(LogLevel.DEBUG, "Premature crash count on " + toString() + " set to " + count); @@ -213,6 +236,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public ContentCluster getCluster() { return cluster; } /** Returns true if the node is currentl registered in slobrok */ + // FIXME why is this called "isRpcAddressOutdated" then??? public boolean isRpcAddressOutdated() { return lastSeenInSlobrok != null; } public Long getRpcAddressOutdatedTimestamp() { return lastSeenInSlobrok; } @@ -277,8 +301,10 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { if (state.getState().equals(State.DOWN) && !reportedState.getState().oneOf("d")) { downStableStateTime = time; log.log(LogLevel.DEBUG, "Down stable state on " + toString() + " altered to " + time); - } - else if (state.getState().equals(State.UP) && !reportedState.getState().oneOf("u")) { + if (reportedState.getState() == State.INITIALIZING) { + recentlyObservedUnstableDuringInit = true; + } + } else if (state.getState().equals(State.UP) && !reportedState.getState().oneOf("u")) { upStableStateTime = time; log.log(LogLevel.DEBUG, "Up stable state on " + toString() + " altered to " + time); } @@ -403,7 +429,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public void setSystemStateVersionSent(ClusterState state) { if (state == null) throw new Error("Should not clear info for last version sent"); if (systemStateVersionSent.containsKey(state.getVersion())) { - throw new IllegalStateException("We have already sent cluster state version " + version + " to " + node); + throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node); } systemStateVersionSent.put(state.getVersion(), state); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java new file mode 100644 index 00000000000..da338626d5d --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java @@ -0,0 +1,10 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +public enum NodeStateReason { + // FIXME some of these reasons may be unnecessary as they are reported implicitly by reported/wanted state changes + NODE_TOO_UNSTABLE, + WITHIN_MAINTENANCE_GRACE_PERIOD, + FORCED_INTO_MAINTENANCE, + GROUP_IS_DOWN +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java new file mode 100644 index 00000000000..83ba274c422 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java @@ -0,0 +1,530 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.jrt.Spec; +import com.yahoo.log.LogLevel; +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.*; +import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; + +import java.util.*; +import java.util.logging.Logger; + +/** + * This class gets node state updates and timer events and uses these to decide + * whether a new cluster state should be generated. + * + * TODO refactor logic out into smaller, separate components. Still state duplication + * between ClusterStateGenerator and StateChangeHandler, especially for temporal + * state transition configuration parameters. + */ +public class StateChangeHandler { + + private static Logger log = Logger.getLogger(StateChangeHandler.class.getName()); + + private final Timer timer; + private final EventLogInterface eventLog; + private boolean stateMayHaveChanged = false; + private boolean isMaster = false; + + private Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); + private int maxInitProgressTime = 5000; + private int maxPrematureCrashes = 4; + private long stableStateTimePeriod = 60 * 60 * 1000; + private Map<Integer, String> hostnames = new HashMap<>(); + private int maxSlobrokDisconnectGracePeriod = 1000; + private static final boolean disableUnstableNodes = true; + + /** + * @param metricUpdater may be null, in which case no metrics will be recorded. + */ + public StateChangeHandler(Timer timer, EventLogInterface eventLog, MetricUpdater metricUpdater) { + this.timer = timer; + this.eventLog = eventLog; + maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000); + maxTransitionTime.put(NodeType.STORAGE, 5000); + } + + public void handleAllDistributorsInSync(final ClusterState currentState, + final Set<ConfiguredNode> nodes, + final DatabaseHandler database, + final DatabaseHandler.Context dbContext) throws InterruptedException { + int startTimestampsReset = 0; + log.log(LogLevel.DEBUG, String.format("handleAllDistributorsInSync invoked for state version %d", currentState.getVersion())); + for (NodeType nodeType : NodeType.getTypes()) { + for (ConfiguredNode configuredNode : nodes) { + final Node node = new Node(nodeType, configuredNode.index()); + final NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node); + final NodeState nodeState = currentState.getNodeState(node); + if (nodeInfo != null && nodeState != null) { + if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Storing away new start timestamp for node %s (%d)", + node, nodeState.getStartTimestamp())); + } + nodeInfo.setStartTimestamp(nodeState.getStartTimestamp()); + } + if (nodeState.getStartTimestamp() > 0) { + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Resetting timestamp in cluster state for node %s", node)); + } + ++startTimestampsReset; + } + } else if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, node + ": " + + (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " + + (nodeState == null ? "null" : nodeState.getStartTimestamp())); + } + } + } + if (startTimestampsReset > 0) { + eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset + + " start timestamps as all available distributors have seen newest cluster state.", + timer.getCurrentTimeInMillis())); + stateMayHaveChanged = true; + database.saveStartTimestamps(dbContext); + } else { + log.log(LogLevel.DEBUG, "Found no start timestamps to reset in cluster state."); + } + } + + public boolean stateMayHaveChanged() { + return stateMayHaveChanged; + } + + public void setStateChangedFlag() { stateMayHaveChanged = true; } + public void unsetStateChangedFlag() { + stateMayHaveChanged = false; + } + + public void setMaster(boolean isMaster) { + this.isMaster = isMaster; + } + + public void setMaxTransitionTime(Map<NodeType, Integer> map) { maxTransitionTime = map; } + public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; } + public void setMaxSlobrokDisconnectGracePeriod(int millisecs) { + maxSlobrokDisconnectGracePeriod = millisecs; + } + public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; } + public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; } + + // TODO nodeListener is only used via updateNodeInfoFromReportedState -> handlePrematureCrash + // TODO this will recursively invoke proposeNewNodeState, which will presumably (i.e. hopefully) be a no-op... + public void handleNewReportedNodeState(final ClusterState currentClusterState, + final NodeInfo node, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener) + { + final NodeState currentState = currentClusterState.getNodeState(node.getNode()); + final LogLevel level = (currentState.equals(reportedState) && node.getVersion() == 0) ? LogLevel.SPAM : LogLevel.DEBUG; + if (log.isLoggable(level)) { + log.log(level, String.format("Got nodestate reply from %s: %s (Current state is %s)", + node, node.getReportedState().getTextualDifference(reportedState), currentState.toString(true))); + } + final long currentTime = timer.getCurrentTimeInMillis(); + + if (reportedState.getState().equals(State.DOWN)) { + node.setTimeOfFirstFailingConnectionAttempt(currentTime); + } + + // *** LOGGING ONLY + if ( ! reportedState.similarTo(node.getReportedState())) { + if (reportedState.getState().equals(State.DOWN)) { + eventLog.addNodeOnlyEvent(new NodeEvent(node, "Failed to get node state: " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.INFO); + } else { + eventLog.addNodeOnlyEvent(new NodeEvent(node, "Now reporting state " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.DEBUG); + } + } + + if (reportedState.equals(node.getReportedState()) && ! reportedState.getState().equals(State.INITIALIZING)) { + return; + } + + updateNodeInfoFromReportedState(node, currentState, reportedState, nodeListener); + + if (reportedState.getMinUsedBits() != currentState.getMinUsedBits()) { + final int oldCount = currentState.getMinUsedBits(); + final int newCount = reportedState.getMinUsedBits(); + log.log(LogLevel.DEBUG, + String.format("Altering node state to reflect that min distribution bit count has changed from %d to %d", + oldCount, newCount)); + eventLog.add(new NodeEvent(node, String.format("Altered min distribution bit count from %d to %d", oldCount, newCount), + NodeEvent.Type.CURRENT, currentTime), isMaster); + } else if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Not altering state of %s in cluster state because new state is too similar: %s", + node, currentState.getTextualDifference(reportedState))); + } + + stateMayHaveChanged = true; + } + + public void handleNewNode(NodeInfo node) { + setHostName(node); + String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress(); + eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); + } + + public void handleMissingNode(final ClusterState currentClusterState, + final NodeInfo node, + final NodeStateOrHostInfoChangeHandler nodeListener) + { + removeHostName(node); + + final long timeNow = timer.getCurrentTimeInMillis(); + + if (node.getLatestNodeStateRequestTime() != null) { + eventLog.add(new NodeEvent(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster); + } else { + eventLog.add(new NodeEvent(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster); + } + + if (node.getReportedState().getState().equals(State.STOPPING)) { + log.log(LogLevel.DEBUG, "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down"); + NodeState ns = node.getReportedState().clone(); + ns.setState(State.DOWN); + handleNewReportedNodeState(currentClusterState, node, ns.clone(), nodeListener); + } else { + log.log(LogLevel.DEBUG, "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok"); + } + + stateMayHaveChanged = true; + } + + /** + * Propose a new state for a node. This may happen due to an administrator action, orchestration, or + * a configuration change. + * + * If the newly proposed state differs from the state the node currently has in the system, + * a cluster state regeneration will be triggered. + */ + public void proposeNewNodeState(final ClusterState currentClusterState, final NodeInfo node, final NodeState proposedState) { + final NodeState currentState = currentClusterState.getNodeState(node.getNode()); + final NodeState currentReported = node.getReportedState(); + + if (currentState.getState().equals(proposedState.getState())) { + return; + } + stateMayHaveChanged = true; + + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Got new wanted nodestate for %s: %s", node, currentState.getTextualDifference(proposedState))); + } + // Should be checked earlier before state was set in cluster + assert(proposedState.getState().validWantedNodeState(node.getNode().getType())); + long timeNow = timer.getCurrentTimeInMillis(); + if (proposedState.above(currentReported)) { + eventLog.add(new NodeEvent(node, String.format("Wanted state %s, but we cannot force node into that " + + "state yet as it is currently in %s", proposedState, currentReported), + NodeEvent.Type.REPORTED, timeNow), isMaster); + return; + } + if ( ! proposedState.similarTo(currentState)) { + eventLog.add(new NodeEvent(node, String.format("Node state set to %s.", proposedState), + NodeEvent.Type.WANTED, timeNow), isMaster); + } + } + + public void handleNewRpcAddress(NodeInfo node) { + setHostName(node); + String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress(); + eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); + } + + public void handleReturnedRpcAddress(NodeInfo node) { + setHostName(node); + String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress(); + eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); + } + + private void setHostName(NodeInfo node) { + String rpcAddress = node.getRpcAddress(); + if (rpcAddress == null) { + // This may happen if we haven't seen the node in Slobrok yet. + return; + } + + Spec address = new Spec(rpcAddress); + if (address.malformed()) { + return; + } + + hostnames.put(node.getNodeIndex(), address.host()); + } + + void reconfigureFromOptions(FleetControllerOptions options) { + setMaxPrematureCrashes(options.maxPrematureCrashes); + setStableStateTimePeriod(options.stableStateTimePeriod); + setMaxInitProgressTime(options.maxInitProgressTime); + setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod); + setMaxTransitionTime(options.maxTransitionTime); + } + + private void removeHostName(NodeInfo node) { + hostnames.remove(node.getNodeIndex()); + } + + Map<Integer, String> getHostnames() { + return Collections.unmodifiableMap(hostnames); + } + + // TODO too many hidden behavior dependencies between this and the actually + // generated cluster state. Still a bit of a mine field... + // TODO remove all node state mutation from this function entirely in favor of ClusterStateGenerator! + // `--> this will require adding more event edges and premature crash handling to it. Which is fine. + public boolean watchTimers(final ContentCluster cluster, + final ClusterState currentClusterState, + final NodeStateOrHostInfoChangeHandler nodeListener) + { + boolean triggeredAnyTimers = false; + final long currentTime = timer.getCurrentTimeInMillis(); + + for(NodeInfo node : cluster.getNodeInfo()) { + triggeredAnyTimers |= handleTimeDependentOpsForNode(currentClusterState, nodeListener, currentTime, node); + } + + if (triggeredAnyTimers) { + stateMayHaveChanged = true; + } + return triggeredAnyTimers; + } + + private boolean handleTimeDependentOpsForNode(final ClusterState currentClusterState, + final NodeStateOrHostInfoChangeHandler nodeListener, + final long currentTime, + final NodeInfo node) + { + final NodeState currentStateInSystem = currentClusterState.getNodeState(node.getNode()); + final NodeState lastReportedState = node.getReportedState(); + boolean triggeredAnyTimers = false; + + triggeredAnyTimers = reportDownIfOutdatedSlobrokNode( + currentClusterState, nodeListener, currentTime, node, lastReportedState); + + if (nodeStillUnavailableAfterTransitionTimeExceeded( + currentTime, node, currentStateInSystem, lastReportedState)) + { + eventLog.add(new NodeEvent(node, String.format( + "%d milliseconds without contact. Marking node down.", + currentTime - node.getTransitionTime()), + NodeEvent.Type.CURRENT, currentTime), isMaster); + triggeredAnyTimers = true; + } + + if (nodeInitProgressHasTimedOut(currentTime, node, currentStateInSystem, lastReportedState)) { + eventLog.add(new NodeEvent(node, String.format( + "%d milliseconds without initialize progress. Marking node down. " + + "Premature crash count is now %d.", + currentTime - node.getInitProgressTime(), + node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, currentTime), isMaster); + handlePrematureCrash(node, nodeListener); + triggeredAnyTimers = true; + } + + if (mayResetCrashCounterOnStableUpNode(currentTime, node, lastReportedState)) { + node.setPrematureCrashCount(0); + log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been up for a long time."); + triggeredAnyTimers = true; + } else if (mayResetCrashCounterOnStableDownNode(currentTime, node, lastReportedState)) { + node.setPrematureCrashCount(0); + log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been down for a long time."); + triggeredAnyTimers = true; + } + + return triggeredAnyTimers; + } + + private boolean nodeInitProgressHasTimedOut(long currentTime, NodeInfo node, NodeState currentStateInSystem, NodeState lastReportedState) { + return !currentStateInSystem.getState().equals(State.DOWN) + && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) + && lastReportedState.getState().equals(State.INITIALIZING) + && maxInitProgressTime != 0 + && node.getInitProgressTime() + maxInitProgressTime <= currentTime + && node.getNode().getType().equals(NodeType.STORAGE); + } + + private boolean mayResetCrashCounterOnStableDownNode(long currentTime, NodeInfo node, NodeState lastReportedState) { + return node.getDownStableStateTime() + stableStateTimePeriod <= currentTime + && lastReportedState.getState().equals(State.DOWN) + && node.getPrematureCrashCount() <= maxPrematureCrashes + && node.getPrematureCrashCount() != 0; + } + + private boolean mayResetCrashCounterOnStableUpNode(long currentTime, NodeInfo node, NodeState lastReportedState) { + return node.getUpStableStateTime() + stableStateTimePeriod <= currentTime + && lastReportedState.getState().equals(State.UP) + && node.getPrematureCrashCount() <= maxPrematureCrashes + && node.getPrematureCrashCount() != 0; + } + + private boolean nodeStillUnavailableAfterTransitionTimeExceeded( + long currentTime, + NodeInfo node, + NodeState currentStateInSystem, + NodeState lastReportedState) + { + return currentStateInSystem.getState().equals(State.MAINTENANCE) + && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) + && (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated()) + && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime; + } + + private boolean reportDownIfOutdatedSlobrokNode(ClusterState currentClusterState, + NodeStateOrHostInfoChangeHandler nodeListener, + long currentTime, + NodeInfo node, + NodeState lastReportedState) + { + if (node.isRpcAddressOutdated() + && !lastReportedState.getState().equals(State.DOWN) + && node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime) + { + final String desc = String.format( + "Set node down as it has been out of slobrok for %d ms which " + + "is more than the max limit of %d ms.", + currentTime - node.getRpcAddressOutdatedTimestamp(), + maxSlobrokDisconnectGracePeriod); + node.abortCurrentNodeStateRequests(); + NodeState state = lastReportedState.clone(); + state.setState(State.DOWN); + if (!state.hasDescription()) { + state.setDescription(desc); + } + eventLog.add(new NodeEvent(node, desc, NodeEvent.Type.CURRENT, currentTime), isMaster); + handleNewReportedNodeState(currentClusterState, node, state.clone(), nodeListener); + node.setReportedState(state, currentTime); + return true; + } + return false; + } + + private boolean isControlledShutdown(NodeState state) { + return (state.getState() == State.STOPPING + && (state.getDescription().contains("Received signal 15 (SIGTERM - Termination signal)") + || state.getDescription().contains("controlled shutdown"))); + } + + /** + * Modify a node's cross-state information in the cluster based on a newly arrived reported state. + * + * @param node the node we are computing the state of + * @param currentState the current state of the node + * @param reportedState the new state reported by (or, in the case of down - inferred from) the node + * @param nodeListener this listener is notified for some of the system state changes that this will return + */ + private void updateNodeInfoFromReportedState(final NodeInfo node, + final NodeState currentState, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener) { + final long timeNow = timer.getCurrentTimeInMillis(); + if (log.isLoggable(LogLevel.DEBUG)) { + log.log(LogLevel.DEBUG, String.format("Finding new cluster state entry for %s switching state %s", + node, currentState.getTextualDifference(reportedState))); + } + + if (handleReportedNodeCrashEdge(node, currentState, reportedState, nodeListener, timeNow)) { + return; + } + if (initializationProgressHasIncreased(currentState, reportedState)) { + node.setInitProgressTime(timeNow); + if (log.isLoggable(LogLevel.SPAM)) { + log.log(LogLevel.SPAM, "Reset initialize timer on " + node + " to " + node.getInitProgressTime()); + } + } + if (handleImplicitCrashEdgeFromReverseInitProgress(node, currentState, reportedState, nodeListener, timeNow)) { + return; + } + markNodeUnstableIfDownEdgeDuringInit(node, currentState, reportedState, nodeListener, timeNow); + } + + // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up. + private void markNodeUnstableIfDownEdgeDuringInit(final NodeInfo node, + final NodeState currentState, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener, + final long timeNow) { + if (currentState.getState().equals(State.INITIALIZING) + && reportedState.getState().oneOf("ds") + && !isControlledShutdown(reportedState)) + { + eventLog.add(new NodeEvent(node, String.format("Stop or crash during initialization. " + + "Premature crash count is now %d.", node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, timeNow), isMaster); + handlePrematureCrash(node, nodeListener); + } + } + + // TODO do we need this when we have startup timestamps? at least it's unit tested. + // TODO this seems fairly contrived... + // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up. + private boolean handleImplicitCrashEdgeFromReverseInitProgress(final NodeInfo node, + final NodeState currentState, + final NodeState reportedState, + final NodeStateOrHostInfoChangeHandler nodeListener, + final long timeNow) { + if (currentState.getState().equals(State.INITIALIZING) && + (reportedState.getState().equals(State.INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress())) + { + eventLog.add(new NodeEvent(node, String.format( + "Stop or crash during initialization detected from reverse initializing progress." + + " Progress was %g but is now %g. Premature crash count is now %d.", + currentState.getInitProgress(), reportedState.getInitProgress(), + node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, timeNow), isMaster); + node.setRecentlyObservedUnstableDuringInit(true); + handlePrematureCrash(node, nodeListener); + return true; + } + return false; + } + + private boolean handleReportedNodeCrashEdge(NodeInfo node, NodeState currentState, + NodeState reportedState, NodeStateOrHostInfoChangeHandler nodeListener, + long timeNow) { + if (nodeUpToDownEdge(node, currentState, reportedState)) { + node.setTransitionTime(timeNow); + if (node.getUpStableStateTime() + stableStateTimePeriod > timeNow && !isControlledShutdown(reportedState)) { + log.log(LogLevel.DEBUG, "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + timeNow); + eventLog.add(new NodeEvent(node, + String.format("Stopped or possibly crashed after %d ms, which is before " + + "stable state time period. Premature crash count is now %d.", + timeNow - node.getUpStableStateTime(), node.getPrematureCrashCount() + 1), + NodeEvent.Type.CURRENT, + timeNow), isMaster); + if (handlePrematureCrash(node, nodeListener)) { + return true; + } + } + } + return false; + } + + private boolean initializationProgressHasIncreased(NodeState currentState, NodeState reportedState) { + return reportedState.getState().equals(State.INITIALIZING) && + (!currentState.getState().equals(State.INITIALIZING) || + reportedState.getInitProgress() > currentState.getInitProgress()); + } + + private boolean nodeUpToDownEdge(NodeInfo node, NodeState currentState, NodeState reportedState) { + return currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis") + && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING)); + } + + private boolean handlePrematureCrash(NodeInfo node, NodeStateOrHostInfoChangeHandler changeListener) { + node.setPrematureCrashCount(node.getPrematureCrashCount() + 1); + if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) { + NodeState wantedState = new NodeState(node.getNode().getType(), State.DOWN) + .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row"); + NodeState oldState = node.getWantedState(); + node.setWantedState(wantedState); + if ( ! oldState.equals(wantedState)) { + changeListener.handleNewWantedNodeState(node, wantedState); + } + return true; + } + return false; + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java new file mode 100644 index 00000000000..f5a67ca9434 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java @@ -0,0 +1,140 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; + +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +/** + * Keeps track of the active cluster state and handles the transition edges between + * one state to the next. In particular, it ensures that states have strictly increasing + * version numbers. + * + * Wraps ClusterStateView to ensure its knowledge of available nodes stays up to date. + */ +public class StateVersionTracker { + + // We always increment the version _before_ publishing, so the effective first cluster + // state version when starting from 1 will be 2. This matches legacy behavior and a bunch + // of existing tests expect it. + private int currentVersion = 1; + private int lastZooKeeperVersion = 0; + + // The lowest published distribution bit count for the lifetime of this controller. + // TODO this mirrors legacy behavior, but should be moved into stable ZK state. + private int lowestObservedDistributionBits = 16; + + private ClusterState currentUnversionedState = ClusterState.emptyState(); + private AnnotatedClusterState latestCandidateState = AnnotatedClusterState.emptyState(); + private AnnotatedClusterState currentClusterState = latestCandidateState; + + private final MetricUpdater metricUpdater; + private ClusterStateView clusterStateView; + + private final LinkedList<ClusterStateHistoryEntry> clusterStateHistory = new LinkedList<>(); + private int maxHistoryEntryCount = 50; + + StateVersionTracker(final MetricUpdater metricUpdater) { + this.metricUpdater = metricUpdater; + clusterStateView = ClusterStateView.create(currentUnversionedState, metricUpdater); + } + + void setVersionRetrievedFromZooKeeper(final int version) { + this.currentVersion = Math.max(1, version); + this.lastZooKeeperVersion = this.currentVersion; + } + + /** + * Sets limit on how many cluster states can be kept in the in-memory queue. Once + * the list exceeds this limit, the oldest state is repeatedly removed until the limit + * is no longer exceeded. + * + * Takes effect upon the next invocation of promoteCandidateToVersionedState(). + */ + void setMaxHistoryEntryCount(final int maxHistoryEntryCount) { + this.maxHistoryEntryCount = maxHistoryEntryCount; + } + + int getCurrentVersion() { + return this.currentVersion; + } + + boolean hasReceivedNewVersionFromZooKeeper() { + return currentVersion <= lastZooKeeperVersion; + } + + int getLowestObservedDistributionBits() { + return lowestObservedDistributionBits; + } + + AnnotatedClusterState getAnnotatedVersionedClusterState() { + return currentClusterState; + } + + public ClusterState getVersionedClusterState() { + return currentClusterState.getClusterState(); + } + + public void updateLatestCandidateState(final AnnotatedClusterState candidate) { + assert(latestCandidateState.getClusterState().getVersion() == 0); + latestCandidateState = candidate; + } + + /** + * Returns the last state provided to updateLatestCandidateState, which _may or may not_ be + * a published state. Primary use case for this function is a caller which is interested in + * changes that may not be reflected in the published state. The best example of this would + * be node state changes when a cluster is marked as Down. + */ + public AnnotatedClusterState getLatestCandidateState() { + return latestCandidateState; + } + + public List<ClusterStateHistoryEntry> getClusterStateHistory() { + return Collections.unmodifiableList(clusterStateHistory); + } + + boolean candidateChangedEnoughFromCurrentToWarrantPublish() { + return !currentUnversionedState.similarToIgnoringInitProgress(latestCandidateState.getClusterState()); + } + + void promoteCandidateToVersionedState(final long currentTimeMs) { + final int newVersion = currentVersion + 1; + updateStatesForNewVersion(latestCandidateState, newVersion); + currentVersion = newVersion; + + recordCurrentStateInHistoryAtTime(currentTimeMs); + } + + private void updateStatesForNewVersion(final AnnotatedClusterState newState, final int newVersion) { + currentClusterState = new AnnotatedClusterState( + newState.getClusterState().clone(), // Because we mutate version below + newState.getClusterStateReason(), + newState.getNodeStateReasons()); + currentClusterState.getClusterState().setVersion(newVersion); + currentUnversionedState = newState.getClusterState().clone(); + lowestObservedDistributionBits = Math.min( + lowestObservedDistributionBits, + newState.getClusterState().getDistributionBitCount()); + // TODO should this take place in updateLatestCandidateState instead? I.e. does it require a consolidated state? + clusterStateView = ClusterStateView.create(currentClusterState.getClusterState(), metricUpdater); + } + + private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) { + clusterStateHistory.addFirst(new ClusterStateHistoryEntry( + currentClusterState.getClusterState(), currentTimeMs)); + while (clusterStateHistory.size() > maxHistoryEntryCount) { + clusterStateHistory.removeLast(); + } + } + + void handleUpdatedHostInfo(final Map<Integer, String> hostnames, final NodeInfo node, final HostInfo hostInfo) { + // TODO the wiring here isn't unit tested. Need mockable integration points. + clusterStateView.handleUpdatedHostInfo(hostnames, node, hostInfo); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java deleted file mode 100644 index 7edff399633..00000000000 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java +++ /dev/null @@ -1,941 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.clustercontroller.core; - -import com.yahoo.jrt.Spec; -import com.yahoo.log.LogLevel; -import com.yahoo.vdslib.distribution.ConfiguredNode; -import com.yahoo.vdslib.distribution.Distribution; -import com.yahoo.vdslib.state.*; -import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; -import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; -import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; -import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; - -import java.util.*; -import java.util.logging.Logger; -import java.text.ParseException; -import java.util.stream.Collectors; - -/** - * This class get node state updates and uses them to decide the cluster state. - */ -// TODO: Remove all current state from this and make it rely on state from ClusterInfo instead -// TODO: Do this ASAP! SystemStateGenerator should ideally behave as a pure function! -public class SystemStateGenerator { - - private static Logger log = Logger.getLogger(SystemStateGenerator.class.getName()); - - private final Timer timer; - private final EventLogInterface eventLog; - private ClusterStateView currentClusterStateView; - private ClusterStateView nextClusterStateView; - private Distribution distribution; - private boolean nextStateViewChanged = false; - private boolean isMaster = false; - - private Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); - private int maxInitProgressTime = 5000; - private int maxPrematureCrashes = 4; - private long stableStateTimePeriod = 60 * 60 * 1000; - private static final int maxHistorySize = 50; - private Set<ConfiguredNode> nodes; - private Map<Integer, String> hostnames = new HashMap<>(); - private int minDistributorNodesUp = 1; - private int minStorageNodesUp = 1; - private double minRatioOfDistributorNodesUp = 0.50; - private double minRatioOfStorageNodesUp = 0.50; - private double minNodeRatioPerGroup = 0.0; - private int maxSlobrokDisconnectGracePeriod = 1000; - private int idealDistributionBits = 16; - private static final boolean disableUnstableNodes = true; - - private final LinkedList<SystemStateHistoryEntry> systemStateHistory = new LinkedList<>(); - - /** - * @param metricUpdater may be null, in which case no metrics will be recorded. - */ - public SystemStateGenerator(Timer timer, EventLogInterface eventLog, MetricUpdater metricUpdater) { - try { - currentClusterStateView = ClusterStateView.create("", metricUpdater); - nextClusterStateView = ClusterStateView.create("", metricUpdater); - } catch (ParseException e) { - throw new RuntimeException("Parsing empty string should always work"); - } - this.timer = timer; - this.eventLog = eventLog; - maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000); - maxTransitionTime.put(NodeType.STORAGE, 5000); - } - - public void handleAllDistributorsInSync(DatabaseHandler database, - DatabaseHandler.Context dbContext) throws InterruptedException { - int startTimestampsReset = 0; - for (NodeType nodeType : NodeType.getTypes()) { - for (ConfiguredNode configuredNode : nodes) { - Node node = new Node(nodeType, configuredNode.index()); - NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node); - NodeState nodeState = nextClusterStateView.getClusterState().getNodeState(node); - if (nodeInfo != null && nodeState != null) { - if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) { - log.log(LogLevel.DEBUG, "Storing away new start timestamp for node " + node); - nodeInfo.setStartTimestamp(nodeState.getStartTimestamp()); - } - if (nodeState.getStartTimestamp() > 0) { - log.log(LogLevel.DEBUG, "Resetting timestamp in cluster state for node " + node); - nodeState.setStartTimestamp(0); - nextClusterStateView.getClusterState().setNodeState(node, nodeState); - ++startTimestampsReset; - } - } else { - log.log(LogLevel.DEBUG, node + ": " + - (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " + - (nodeState == null ? "null" : nodeState.getStartTimestamp())); - } - } - } - if (startTimestampsReset > 0) { - eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset + - " start timestamps as all available distributors have seen newest cluster state.", timer.getCurrentTimeInMillis())); - nextStateViewChanged = true; - database.saveStartTimestamps(dbContext); - } else { - log.log(LogLevel.DEBUG, "Found no start timestamps to reset in cluster state."); - } - } - - public void setMaxTransitionTime(Map<NodeType, Integer> map) { maxTransitionTime = map; } - public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; } - public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; } - public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; } - - public ClusterStateView currentClusterStateView() { return currentClusterStateView; } - - /** Returns an immutable list of the historical states this has generated */ - public List<SystemStateHistoryEntry> systemStateHistory() { - return Collections.unmodifiableList(systemStateHistory); - } - - public void setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { - minDistributorNodesUp = minDistNodes; - minStorageNodesUp = minStorNodes; - minRatioOfDistributorNodesUp = minDistRatio; - minRatioOfStorageNodesUp = minStorRatio; - nextStateViewChanged = true; - } - - public void setMinNodeRatioPerGroup(double upRatio) { - this.minNodeRatioPerGroup = upRatio; - nextStateViewChanged = true; - } - - /** Sets the nodes of this and attempts to keep the node state in sync */ - public void setNodes(ClusterInfo newClusterInfo) { - this.nodes = new HashSet<>(newClusterInfo.getConfiguredNodes().values()); - - for (ConfiguredNode node : this.nodes) { - NodeInfo newNodeInfo = newClusterInfo.getStorageNodeInfo(node.index()); - NodeState currentState = currentClusterStateView.getClusterState().getNodeState(new Node(NodeType.STORAGE, node.index())); - if (currentState.getState() == State.RETIRED || currentState.getState() == State.UP) { // then correct to configured state - proposeNewNodeState(newNodeInfo, new NodeState(NodeType.STORAGE, node.retired() ? State.RETIRED : State.UP)); - } - } - - // Ensure that any nodes that have been removed from the config are also - // promptly removed from the next (and subsequent) generated cluster states. - pruneAllNodesNotContainedInConfig(); - - nextStateViewChanged = true; - } - - private void pruneAllNodesNotContainedInConfig() { - Set<Integer> configuredIndices = this.nodes.stream().map(ConfiguredNode::index).collect(Collectors.toSet()); - final ClusterState candidateNextState = nextClusterStateView.getClusterState(); - pruneNodesNotContainedInConfig(candidateNextState, configuredIndices, NodeType.DISTRIBUTOR); - pruneNodesNotContainedInConfig(candidateNextState, configuredIndices, NodeType.STORAGE); - } - - public void setDistribution(Distribution distribution) { - this.distribution = distribution; - nextStateViewChanged = true; - } - - public void setMaster(boolean isMaster) { - this.isMaster = isMaster; - } - public void setMaxSlobrokDisconnectGracePeriod(int millisecs) { maxSlobrokDisconnectGracePeriod = millisecs; } - - public void setDistributionBits(int bits) { - if (bits == idealDistributionBits) return; - idealDistributionBits = bits; - int currentDistributionBits = calculateMinDistributionBitCount(); - if (currentDistributionBits != nextClusterStateView.getClusterState().getDistributionBitCount()) { - nextClusterStateView.getClusterState().setDistributionBits(currentDistributionBits); - nextStateViewChanged = true; - } - } - - public int getDistributionBits() { return idealDistributionBits; } - - public int calculateMinDistributionBitCount() { - int currentDistributionBits = idealDistributionBits; - int minNode = -1; - for (ConfiguredNode node : nodes) { - NodeState ns = nextClusterStateView.getClusterState().getNodeState(new Node(NodeType.STORAGE, node.index())); - if (ns.getState().oneOf("iur")) { - if (ns.getMinUsedBits() < currentDistributionBits) { - currentDistributionBits = ns.getMinUsedBits(); - minNode = node.index(); - } - } - } - if (minNode == -1) { - log.log(LogLevel.DEBUG, "Distribution bit count should still be default as all available nodes have at least split to " + idealDistributionBits + " bits"); - } else { - log.log(LogLevel.DEBUG, "Distribution bit count is limited to " + currentDistributionBits + " due to storage node " + minNode); - } - return currentDistributionBits; - } - - public ClusterState getClusterState() { return currentClusterStateView.getClusterState(); } - - /** - * Return the current cluster state, but if the cluster is down, modify the node states with the - * actual node states from the temporary next state. - */ - public ClusterState getConsolidatedClusterState() { - ClusterState currentState = currentClusterStateView.getClusterState(); - if (currentState.getClusterState().equals(State.UP)) { - return currentState; - } - - ClusterState nextState = nextClusterStateView.getClusterState(); - if (!currentState.getClusterState().equals(nextState.getClusterState())) { - log.warning("Expected current cluster state object to have same global state as the under creation instance."); - } - ClusterState state = nextState.clone(); - state.setVersion(currentState.getVersion()); - state.setOfficial(false); - return state; - } - - private Optional<Event> getDownDueToTooFewNodesEvent(ClusterState nextClusterState) { - int upStorageCount = 0, upDistributorCount = 0; - int dcount = nodes.size(); - int scount = nodes.size(); - for (NodeType type : NodeType.getTypes()) { - for (ConfiguredNode node : nodes) { - NodeState ns = nextClusterState.getNodeState(new Node(type, node.index())); - if (ns.getState() == State.UP || ns.getState() == State.RETIRED || ns.getState() == State.INITIALIZING) { - if (type.equals(NodeType.STORAGE)) - ++upStorageCount; - else - ++upDistributorCount; - } - } - } - - long timeNow = timer.getCurrentTimeInMillis(); - if (upStorageCount < minStorageNodesUp) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + minStorageNodesUp + " storage nodes available (" + upStorageCount + "). Setting cluster state down.", - timeNow)); - } - if (upDistributorCount < minDistributorNodesUp) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + minDistributorNodesUp + " distributor nodes available (" + upDistributorCount + "). Setting cluster state down.", - timeNow)); - } - if (minRatioOfStorageNodesUp * scount > upStorageCount) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + (100 * minRatioOfStorageNodesUp) + " % of storage nodes are available (" - + upStorageCount + "/" + scount + "). Setting cluster state down.", - timeNow)); - } - if (minRatioOfDistributorNodesUp * dcount > upDistributorCount) { - return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Less than " + (100 * minRatioOfDistributorNodesUp) + " % of distributor nodes are available (" - + upDistributorCount + "/" + dcount + "). Setting cluster state down.", - timeNow)); - } - return Optional.empty(); - } - - private static Node storageNode(int index) { - return new Node(NodeType.STORAGE, index); - } - - private void performImplicitStorageNodeStateTransitions(ClusterState candidateState, ContentCluster cluster) { - if (distribution == null) { - return; // FIXME due to tests that don't bother setting distr config! Never happens in prod. - } - // First clear the states of any nodes that according to reported/wanted state alone - // should have their states cleared. We might still take these down again based on the - // decisions of the group availability calculator, but this way we ensure that groups - // that no longer should be down will have their nodes implicitly made available again. - // TODO this will be void once SystemStateGenerator has been rewritten to be stateless. - final Set<Integer> clearedNodes = clearDownStateForStorageNodesThatCanBeUp(candidateState, cluster); - - final GroupAvailabilityCalculator calc = new GroupAvailabilityCalculator.Builder() - .withMinNodeRatioPerGroup(minNodeRatioPerGroup) - .withDistribution(distribution) - .build(); - final Set<Integer> nodesToTakeDown = calc.nodesThatShouldBeDown(candidateState); - markNodesAsDownDueToGroupUnavailability(cluster, candidateState, nodesToTakeDown, clearedNodes); - - clearedNodes.removeAll(nodesToTakeDown); - logEventsForNodesThatWereTakenUp(clearedNodes, cluster); - } - - private void logEventsForNodesThatWereTakenUp(Set<Integer> newlyUpNodes, ContentCluster cluster) { - newlyUpNodes.forEach(i -> { - final NodeInfo info = cluster.getNodeInfo(storageNode(i)); // Should always be non-null here. - // TODO the fact that this only happens for group up events is implementation specific - // should generalize this if we get other such events. - eventLog.addNodeOnlyEvent(new NodeEvent(info, - "Group availability restored; taking node back up", - NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), LogLevel.INFO); - }); - } - - private void markNodesAsDownDueToGroupUnavailability(ContentCluster cluster, - ClusterState candidateState, - Set<Integer> nodesToTakeDown, - Set<Integer> clearedNodes) - { - for (Integer idx : nodesToTakeDown) { - final Node node = storageNode(idx); - NodeState newState = new NodeState(NodeType.STORAGE, State.DOWN); - newState.setDescription("group node availability below configured threshold"); - candidateState.setNodeState(node, newState); - - logNodeGroupDownEdgeEventOnce(clearedNodes, node, cluster); - } - } - - private void logNodeGroupDownEdgeEventOnce(Set<Integer> clearedNodes, Node node, ContentCluster cluster) { - final NodeInfo nodeInfo = cluster.getNodeInfo(node); - // If clearedNodes contains the index it means we're just re-downing a node - // that was previously down. If this is the case, we'd cause a duplicate - // event if we logged it now as well. - if (nodeInfo != null && !clearedNodes.contains(node.getIndex())) { - eventLog.addNodeOnlyEvent(new NodeEvent(nodeInfo, - "Setting node down as the total availability of its group is " + - "below the configured threshold", - NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), LogLevel.INFO); - } - } - - private NodeState baselineNodeState(NodeInfo info) { - NodeState reported = info.getReportedState(); - NodeState wanted = info.getWantedState(); - - final NodeState baseline = reported.clone(); - if (wanted.getState() != State.UP) { - baseline.setDescription(wanted.getDescription()); - if (reported.above(wanted)) { - baseline.setState(wanted.getState()); - } - } - // Don't reintroduce start timestamp to the node's state if it has already been - // observed by all distributors. This matches how handleNewReportedNodeState() sets timestamps. - // TODO make timestamp semantics clearer. Non-obvious what the two different timestamp stores imply. - // For posterity: reported.getStartTimestamp() is the start timestamp the node itself has stated. - // info.getStartTimestamp() is the timestamp written as having been observed by all distributors - // (which is done in handleAllDistributorsInSync()). - if (reported.getStartTimestamp() <= info.getStartTimestamp()) { - baseline.setStartTimestamp(0); - } - - return baseline; - } - - // Returns set of nodes whose state was cleared - private Set<Integer> clearDownStateForStorageNodesThatCanBeUp( - ClusterState candidateState, ContentCluster cluster) - { - final int nodeCount = candidateState.getNodeCount(NodeType.STORAGE); - final Set<Integer> clearedNodes = new HashSet<>(); - for (int i = 0; i < nodeCount; ++i) { - final Node node = storageNode(i); - final NodeInfo info = cluster.getNodeInfo(node); - final NodeState currentState = candidateState.getNodeState(node); - if (mayClearCurrentNodeState(currentState, info)) { - candidateState.setNodeState(node, baselineNodeState(info)); - clearedNodes.add(i); - } - } - return clearedNodes; - } - - private boolean mayClearCurrentNodeState(NodeState currentState, NodeInfo info) { - if (currentState.getState() != State.DOWN) { - return false; - } - if (info == null) { - // Nothing known about node in cluster info; we definitely don't want it - // to be taken up at this point. - return false; - } - // There exists an edge in watchTimers where a node in Maintenance is implicitly - // transitioned into Down without being Down in either reported or wanted states - // iff isRpcAddressOutdated() is true. To avoid getting into an edge where we - // inadvertently clear this state because its reported/wanted states seem fine, - // we must also check if that particular edge could have happened. I.e. whether - // the node's RPC address is marked as outdated. - // It also makes sense in general to not allow taking a node back up automatically - // if its RPC connectivity appears to be bad. - if (info.isRpcAddressOutdated()) { - return false; - } - // Rationale: we can only enter this statement if the _current_ (generated) state - // of the node is Down. Aside from the group take-down logic, there should not exist - // any other edges in the cluster controller state transition logic where a node - // may be set Down while both its reported state and wanted state imply that a better - // state should already have been chosen. Consequently we allow the node to have its - // Down-state cleared. - return (info.getReportedState().getState() != State.DOWN - && !info.getWantedState().getState().oneOf("d")); - } - - private ClusterStateView createNextVersionOfClusterStateView(ContentCluster cluster) { - // If you change this method, see *) in notifyIfNewSystemState - ClusterStateView candidateClusterStateView = nextClusterStateView.cloneForNewState(); - ClusterState candidateClusterState = candidateClusterStateView.getClusterState(); - - int currentDistributionBits = calculateMinDistributionBitCount(); - if (currentDistributionBits != nextClusterStateView.getClusterState().getDistributionBitCount()) { - candidateClusterState.setDistributionBits(currentDistributionBits); - } - performImplicitStorageNodeStateTransitions(candidateClusterState, cluster); - - return candidateClusterStateView; - } - - private void pruneNodesNotContainedInConfig(ClusterState candidateClusterState, - Set<Integer> configuredIndices, - NodeType nodeType) - { - final int nodeCount = candidateClusterState.getNodeCount(nodeType); - for (int i = 0; i < nodeCount; ++i) { - final Node node = new Node(nodeType, i); - final NodeState currentState = candidateClusterState.getNodeState(node); - if (!configuredIndices.contains(i) && !currentState.getState().equals(State.DOWN)) { - log.log(LogLevel.INFO, "Removing node " + node + " from state as it is no longer present in config"); - candidateClusterState.setNodeState(node, new NodeState(nodeType, State.DOWN)); - } - } - } - - private void recordNewClusterStateHasBeenChosen( - ClusterState currentClusterState, ClusterState newClusterState, Event clusterEvent) { - long timeNow = timer.getCurrentTimeInMillis(); - - if (!currentClusterState.getClusterState().equals(State.UP) && - newClusterState.getClusterState().equals(State.UP)) { - eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, - "Enough nodes available for system to become up.", timeNow), isMaster); - } else if (currentClusterState.getClusterState().equals(State.UP) && - ! newClusterState.getClusterState().equals(State.UP)) { - assert(clusterEvent != null); - eventLog.add(clusterEvent, isMaster); - } - - if (newClusterState.getDistributionBitCount() != currentClusterState.getDistributionBitCount()) { - eventLog.add(new ClusterEvent( - ClusterEvent.Type.SYSTEMSTATE, - "Altering distribution bits in system from " - + currentClusterState.getDistributionBitCount() + " to " + - currentClusterState.getDistributionBitCount(), - timeNow), isMaster); - } - - eventLog.add(new ClusterEvent( - ClusterEvent.Type.SYSTEMSTATE, - "New cluster state version " + newClusterState.getVersion() + ". Change from last: " + - currentClusterState.getTextualDifference(newClusterState), - timeNow), isMaster); - - log.log(LogLevel.DEBUG, "Created new cluster state version: " + newClusterState.toString(true)); - systemStateHistory.addFirst(new SystemStateHistoryEntry(newClusterState, timeNow)); - if (systemStateHistory.size() > maxHistorySize) { - systemStateHistory.removeLast(); - } - } - - private void mergeIntoNextClusterState(ClusterState sourceState) { - final ClusterState nextState = nextClusterStateView.getClusterState(); - final int nodeCount = sourceState.getNodeCount(NodeType.STORAGE); - for (int i = 0; i < nodeCount; ++i) { - final Node node = storageNode(i); - final NodeState stateInSource = sourceState.getNodeState(node); - final NodeState stateInTarget = nextState.getNodeState(node); - if (stateInSource.getState() != stateInTarget.getState()) { - nextState.setNodeState(node, stateInSource); - } - } - } - - public boolean notifyIfNewSystemState(ContentCluster cluster, SystemStateListener stateListener) { - if ( ! nextStateViewChanged) return false; - - ClusterStateView newClusterStateView = createNextVersionOfClusterStateView(cluster); - - ClusterState newClusterState = newClusterStateView.getClusterState(); - // Creating the next version of the state may implicitly take down nodes, so our checks - // for taking the entire cluster down must happen _after_ this - Optional<Event> clusterDown = getDownDueToTooFewNodesEvent(newClusterState); - newClusterState.setClusterState(clusterDown.isPresent() ? State.DOWN : State.UP); - - if (newClusterState.similarTo(currentClusterStateView.getClusterState())) { - log.log(LogLevel.DEBUG, - "State hasn't changed enough to warrant new cluster state. Not creating new state: " + - currentClusterStateView.getClusterState().getTextualDifference(newClusterState)); - return false; - } - - // Update the version of newClusterState now. This cannot be done prior to similarTo(), - // since it makes the cluster states different. From now on, the new cluster state is immutable. - newClusterState.setVersion(currentClusterStateView.getClusterState().getVersion() + 1); - - recordNewClusterStateHasBeenChosen(currentClusterStateView.getClusterState(), - newClusterStateView.getClusterState(), clusterDown.orElse(null)); - - // *) Ensure next state is still up to date. - // This should make nextClusterStateView a deep-copy of currentClusterStateView. - // If more than the distribution bits and state are deep-copied in - // createNextVersionOfClusterStateView(), we need to add corresponding statements here. - // This seems like a hack... - nextClusterStateView.getClusterState().setDistributionBits(newClusterState.getDistributionBitCount()); - nextClusterStateView.getClusterState().setClusterState(newClusterState.getClusterState()); - mergeIntoNextClusterState(newClusterState); - - currentClusterStateView = newClusterStateView; - nextStateViewChanged = false; - - stateListener.handleNewSystemState(currentClusterStateView.getClusterState()); - - return true; - } - - public void setLatestSystemStateVersion(int version) { - currentClusterStateView.getClusterState().setVersion(Math.max(1, version)); - nextStateViewChanged = true; - } - - private void setNodeState(NodeInfo node, NodeState newState) { - NodeState oldState = nextClusterStateView.getClusterState().getNodeState(node.getNode()); - - // Correct UP to RETIRED if the node wants to be retired - if (newState.above(node.getWantedState())) - newState.setState(node.getWantedState().getState()); - - // Keep old description if a new one is not set and we're not going up or in initializing mode - if ( ! newState.getState().oneOf("ui") && oldState.hasDescription()) { - newState.setDescription(oldState.getDescription()); - } - - // Keep disk information if not set in new state - if (newState.getDiskCount() == 0 && oldState.getDiskCount() != 0) { - newState.setDiskCount(oldState.getDiskCount()); - for (int i=0; i<oldState.getDiskCount(); ++i) { - newState.setDiskState(i, oldState.getDiskState(i)); - } - } - if (newState.equals(oldState)) { - return; - } - - eventLog.add(new NodeEvent(node, "Altered node state in cluster state from '" + oldState.toString(true) - + "' to '" + newState.toString(true) + "'.", - NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), isMaster); - nextClusterStateView.getClusterState().setNodeState(node.getNode(), newState); - nextStateViewChanged = true; - } - - public void handleNewReportedNodeState(NodeInfo node, NodeState reportedState, NodeStateOrHostInfoChangeHandler nodeListener) { - ClusterState nextState = nextClusterStateView.getClusterState(); - NodeState currentState = nextState.getNodeState(node.getNode()); - log.log(currentState.equals(reportedState) && node.getVersion() == 0 ? LogLevel.SPAM : LogLevel.DEBUG, - "Got nodestate reply from " + node + ": " - + node.getReportedState().getTextualDifference(reportedState) + " (Current state is " + currentState.toString(true) + ")"); - long currentTime = timer.getCurrentTimeInMillis(); - if (reportedState.getState().equals(State.DOWN)) { - node.setTimeOfFirstFailingConnectionAttempt(currentTime); - } - if ( ! reportedState.similarTo(node.getReportedState())) { - if (reportedState.getState().equals(State.DOWN)) { - eventLog.addNodeOnlyEvent(new NodeEvent(node, "Failed to get node state: " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.INFO); - } else { - eventLog.addNodeOnlyEvent(new NodeEvent(node, "Now reporting state " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.DEBUG); - } - } - if (reportedState.equals(node.getReportedState()) && ! reportedState.getState().equals(State.INITIALIZING)) - return; - - NodeState alteredState = decideNodeStateGivenReportedState(node, currentState, reportedState, nodeListener); - if (alteredState != null) { - ClusterState clusterState = currentClusterStateView.getClusterState(); - - if (alteredState.above(node.getWantedState())) { - log.log(LogLevel.DEBUG, "Cannot set node in state " + alteredState.getState() + " when wanted state is " + node.getWantedState()); - alteredState.setState(node.getWantedState().getState()); - } - if (reportedState.getStartTimestamp() > node.getStartTimestamp()) { - alteredState.setStartTimestamp(reportedState.getStartTimestamp()); - } else { - alteredState.setStartTimestamp(0); - } - if (!alteredState.similarTo(currentState)) { - setNodeState(node, alteredState); - } else if (!alteredState.equals(currentState)) { - if (currentState.getState().equals(State.INITIALIZING) && alteredState.getState().equals(State.INITIALIZING) && - Math.abs(currentState.getInitProgress() - alteredState.getInitProgress()) > 0.000000001) - { - log.log(LogLevel.DEBUG, "Only silently updating init progress for " + node + " in cluster state because new " - + "state is too similar to tag new version: " + currentState.getTextualDifference(alteredState)); - currentState.setInitProgress(alteredState.getInitProgress()); - nextState.setNodeState(node.getNode(), currentState); - - NodeState currentNodeState = clusterState.getNodeState(node.getNode()); - if (currentNodeState.getState().equals(State.INITIALIZING)) { - currentNodeState.setInitProgress(alteredState.getInitProgress()); - clusterState.setNodeState(node.getNode(), currentNodeState); - } - } else if (alteredState.getMinUsedBits() != currentState.getMinUsedBits()) { - log.log(LogLevel.DEBUG, "Altering node state to reflect that min distribution bit count have changed from " - + currentState.getMinUsedBits() + " to " + alteredState.getMinUsedBits()); - int oldCount = currentState.getMinUsedBits(); - currentState.setMinUsedBits(alteredState.getMinUsedBits()); - nextState.setNodeState(node.getNode(), currentState); - int minDistBits = calculateMinDistributionBitCount(); - if (minDistBits < nextState.getDistributionBitCount() - || (nextState.getDistributionBitCount() < this.idealDistributionBits && minDistBits >= this.idealDistributionBits)) - { - // If this will actually affect global cluster state. - eventLog.add(new NodeEvent(node, "Altered min distribution bit count from " + oldCount - + " to " + currentState.getMinUsedBits() + ". Updated cluster state.", NodeEvent.Type.CURRENT, currentTime), isMaster); - nextStateViewChanged = true; - } else { - log.log(LogLevel.DEBUG, "Altered min distribution bit count from " + oldCount - + " to " + currentState.getMinUsedBits() + ". No effect for cluster state with ideal " + this.idealDistributionBits - + ", new " + minDistBits + ", old " + nextState.getDistributionBitCount() + " though."); - clusterState.setNodeState(node.getNode(), currentState); - } - } else { - log.log(LogLevel.DEBUG, "Not altering state of " + node + " in cluster state because new state is too similar: " - + currentState.getTextualDifference(alteredState)); - } - } else if (alteredState.getDescription().contains("Listing buckets")) { - currentState.setDescription(alteredState.getDescription()); - nextState.setNodeState(node.getNode(), currentState); - NodeState currentNodeState = clusterState.getNodeState(node.getNode()); - currentNodeState.setDescription(alteredState.getDescription()); - clusterState.setNodeState(node.getNode(), currentNodeState); - } - } - } - - public void handleNewNode(NodeInfo node) { - setHostName(node); - String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress(); - eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); - } - - public void handleMissingNode(NodeInfo node, NodeStateOrHostInfoChangeHandler nodeListener) { - removeHostName(node); - - long timeNow = timer.getCurrentTimeInMillis(); - - if (node.getLatestNodeStateRequestTime() != null) { - eventLog.add(new NodeEvent(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster); - } else { - eventLog.add(new NodeEvent(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster); - } - if (node.getReportedState().getState().equals(State.STOPPING)) { - log.log(LogLevel.DEBUG, "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down"); - NodeState ns = node.getReportedState().clone(); - ns.setState(State.DOWN); - handleNewReportedNodeState(node, ns.clone(), nodeListener); - node.setReportedState(ns, timer.getCurrentTimeInMillis()); // Must reset it to null to get connection attempts counted - } else { - log.log(LogLevel.DEBUG, "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok"); - } - } - - /** - * Propose a new state for a node. This may happen due to an administrator action, orchestration, or - * a configuration change. - */ - public void proposeNewNodeState(NodeInfo node, NodeState proposedState) { - NodeState currentState = nextClusterStateView.getClusterState().getNodeState(node.getNode()); - NodeState currentReported = node.getReportedState(); // TODO: Is there a reason to have both of this and the above? - - NodeState newCurrentState = currentReported.clone(); - - newCurrentState.setState(proposedState.getState()).setDescription(proposedState.getDescription()); - - if (currentState.getState().equals(newCurrentState.getState())) return; - - log.log(LogLevel.DEBUG, "Got new wanted nodestate for " + node + ": " + currentState.getTextualDifference(proposedState)); - // Should be checked earlier before state was set in cluster - assert(newCurrentState.getState().validWantedNodeState(node.getNode().getType())); - long timeNow = timer.getCurrentTimeInMillis(); - if (newCurrentState.above(currentReported)) { - eventLog.add(new NodeEvent(node, "Wanted state " + newCurrentState + ", but we cannot force node into that state yet as it is currently in " + currentReported, NodeEvent.Type.REPORTED, timeNow), isMaster); - return; - } - if ( ! newCurrentState.similarTo(currentState)) { - eventLog.add(new NodeEvent(node, "Node state set to " + newCurrentState + ".", NodeEvent.Type.WANTED, timeNow), isMaster); - } - setNodeState(node, newCurrentState); - } - - public void handleNewRpcAddress(NodeInfo node) { - setHostName(node); - String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress(); - eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); - } - - public void handleReturnedRpcAddress(NodeInfo node) { - setHostName(node); - String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress(); - eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster); - } - - private void setHostName(NodeInfo node) { - String rpcAddress = node.getRpcAddress(); - if (rpcAddress == null) { - // This may happen if we haven't seen the node in Slobrok yet. - return; - } - - Spec address = new Spec(rpcAddress); - if (address.malformed()) { - return; - } - - hostnames.put(node.getNodeIndex(), address.host()); - } - - private void removeHostName(NodeInfo node) { - hostnames.remove(node.getNodeIndex()); - } - - public boolean watchTimers(ContentCluster cluster, NodeStateOrHostInfoChangeHandler nodeListener) { - boolean triggeredAnyTimers = false; - long currentTime = timer.getCurrentTimeInMillis(); - for(NodeInfo node : cluster.getNodeInfo()) { - NodeState currentStateInSystem = nextClusterStateView.getClusterState().getNodeState(node.getNode()); - NodeState lastReportedState = node.getReportedState(); - - // If we haven't had slobrok contact in a given amount of time and node is still not considered down, - // mark it down. - if (node.isRpcAddressOutdated() - && !lastReportedState.getState().equals(State.DOWN) - && node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime) - { - StringBuilder sb = new StringBuilder().append("Set node down as it has been out of slobrok for ") - .append(currentTime - node.getRpcAddressOutdatedTimestamp()).append(" ms which is more than the max limit of ") - .append(maxSlobrokDisconnectGracePeriod).append(" ms."); - node.abortCurrentNodeStateRequests(); - NodeState state = lastReportedState.clone(); - state.setState(State.DOWN); - if (!state.hasDescription()) state.setDescription(sb.toString()); - eventLog.add(new NodeEvent(node, sb.toString(), NodeEvent.Type.CURRENT, currentTime), isMaster); - handleNewReportedNodeState(node, state.clone(), nodeListener); - node.setReportedState(state, currentTime); - triggeredAnyTimers = true; - } - - // If node is still unavailable after transition time, mark it down - if (currentStateInSystem.getState().equals(State.MAINTENANCE) - && ( ! nextStateViewChanged || ! this.nextClusterStateView.getClusterState().getNodeState(node.getNode()).getState().equals(State.DOWN)) - && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) - && (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated()) - && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime) - { - eventLog.add(new NodeEvent(node, (currentTime - node.getTransitionTime()) - + " milliseconds without contact. Marking node down.", NodeEvent.Type.CURRENT, currentTime), isMaster); - NodeState newState = new NodeState(node.getNode().getType(), State.DOWN).setDescription( - (currentTime - node.getTransitionTime()) + " ms without contact. Too long to keep in maintenance. Marking node down"); - // Keep old description if there is one as it is likely closer to the cause of the problem - if (currentStateInSystem.hasDescription()) newState.setDescription(currentStateInSystem.getDescription()); - setNodeState(node, newState); - triggeredAnyTimers = true; - } - - // If node hasn't increased its initializing progress within initprogresstime, mark it down. - if (!currentStateInSystem.getState().equals(State.DOWN) - && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN)) - && lastReportedState.getState().equals(State.INITIALIZING) - && maxInitProgressTime != 0 - && node.getInitProgressTime() + maxInitProgressTime <= currentTime - && node.getNode().getType().equals(NodeType.STORAGE)) - { - eventLog.add(new NodeEvent(node, (currentTime - node.getInitProgressTime()) + " milliseconds " - + "without initialize progress. Marking node down." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", NodeEvent.Type.CURRENT, currentTime), isMaster); - NodeState newState = new NodeState(node.getNode().getType(), State.DOWN).setDescription( - (currentTime - node.getInitProgressTime()) + " ms without initialize progress. Assuming node has deadlocked."); - setNodeState(node, newState); - handlePrematureCrash(node, nodeListener); - triggeredAnyTimers = true; - } - if (node.getUpStableStateTime() + stableStateTimePeriod <= currentTime - && lastReportedState.getState().equals(State.UP) - && node.getPrematureCrashCount() <= maxPrematureCrashes - && node.getPrematureCrashCount() != 0) - { - node.setPrematureCrashCount(0); - log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been up for a long time."); - triggeredAnyTimers = true; - } else if (node.getDownStableStateTime() + stableStateTimePeriod <= currentTime - && lastReportedState.getState().equals(State.DOWN) - && node.getPrematureCrashCount() <= maxPrematureCrashes - && node.getPrematureCrashCount() != 0) - { - node.setPrematureCrashCount(0); - log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been down for a long time."); - triggeredAnyTimers = true; - } - } - return triggeredAnyTimers; - } - - private boolean isControlledShutdown(NodeState state) { - return (state.getState() == State.STOPPING && (state.getDescription().contains("Received signal 15 (SIGTERM - Termination signal)") - || state.getDescription().contains("controlled shutdown"))); - } - - /** - * Decide the state assigned to a new node given the state it reported - * - * @param node the node we are computing the state of - * @param currentState the current state of the node - * @param reportedState the new state reported by (or, in the case of down - inferred from) the node - * @param nodeListener this listener is notified for some of the system state changes that this will return - * @return the node node state, or null to keep the nodes current state - */ - private NodeState decideNodeStateGivenReportedState(NodeInfo node, NodeState currentState, NodeState reportedState, - NodeStateOrHostInfoChangeHandler nodeListener) { - long timeNow = timer.getCurrentTimeInMillis(); - - log.log(LogLevel.DEBUG, "Finding new cluster state entry for " + node + " switching state " + currentState.getTextualDifference(reportedState)); - - // Set nodes in maintenance if 1) down, or 2) initializing but set retired, to avoid migrating data - // to the retired node while it is initializing - if (currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis") - && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING))) - { - long currentTime = timer.getCurrentTimeInMillis(); - node.setTransitionTime(currentTime); - if (node.getUpStableStateTime() + stableStateTimePeriod > currentTime && !isControlledShutdown(reportedState)) { - log.log(LogLevel.DEBUG, "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + currentTime); - eventLog.add(new NodeEvent(node, - "Stopped or possibly crashed after " + (currentTime - node.getUpStableStateTime()) - + " ms, which is before stable state time period." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", - NodeEvent.Type.CURRENT, - timeNow), isMaster); - if (handlePrematureCrash(node, nodeListener)) return null; - } - if (maxTransitionTime.get(node.getNode().getType()) != 0) { - return new NodeState(node.getNode().getType(), State.MAINTENANCE).setDescription(reportedState.getDescription()); - } - } - - // If we got increasing initialization progress, reset initialize timer - if (reportedState.getState().equals(State.INITIALIZING) && - (!currentState.getState().equals(State.INITIALIZING) || - reportedState.getInitProgress() > currentState.getInitProgress())) - { - node.setInitProgressTime(timer.getCurrentTimeInMillis()); - log.log(LogLevel.DEBUG, "Reset initialize timer on " + node + " to " + node.getInitProgressTime()); - } - - // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up. - if (currentState.getState().equals(State.INITIALIZING) && - (reportedState.getState().equals(State.INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress())) - { - eventLog.add(new NodeEvent(node, "Stop or crash during initialization detected from reverse initializing progress." - + " Progress was " + currentState.getInitProgress() + " but is now " + reportedState.getInitProgress() + "." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", - NodeEvent.Type.CURRENT, timeNow), isMaster); - return (handlePrematureCrash(node, nodeListener) ? null : new NodeState(node.getNode().getType(), State.DOWN).setDescription( - "Got reverse intialize progress. Assuming node have prematurely crashed")); - } - - // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up. - if (currentState.getState().equals(State.INITIALIZING) && reportedState.getState().oneOf("ds") && !isControlledShutdown(reportedState)) - { - eventLog.add(new NodeEvent(node, "Stop or crash during initialization." - + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", - NodeEvent.Type.CURRENT, timeNow), isMaster); - return (handlePrematureCrash(node, nodeListener) ? null : new NodeState(node.getNode().getType(), State.DOWN).setDescription(reportedState.getDescription())); - } - - // Ignore further unavailable states when node is set in maintenance - if (currentState.getState().equals(State.MAINTENANCE) && reportedState.getState().oneOf("dis")) - { - if (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING) - || reportedState.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001) { - log.log(LogLevel.DEBUG, "Ignoring down and initializing reports while in maintenance mode on " + node + "."); - return null; - } - } - - // Hide initializing state if node has been unstable. (Not for distributors as these own buckets while initializing) - if ((currentState.getState().equals(State.DOWN) || currentState.getState().equals(State.UP)) && - reportedState.getState().equals(State.INITIALIZING) && node.getPrematureCrashCount() > 0 && - !node.isDistributor()) - { - log.log(LogLevel.DEBUG, "Not setting " + node + " initializing again as it crashed prematurely earlier."); - return new NodeState(node.getNode().getType(), State.DOWN).setDescription("Not setting node back up as it failed prematurely at last attempt"); - } - // Hide initializing state in cluster state if initialize progress is so low that we haven't listed buckets yet - if (!node.isDistributor() && reportedState.getState().equals(State.INITIALIZING) && - reportedState.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001) - { - log.log(LogLevel.DEBUG, "Not setting " + node + " initializing in cluster state quite yet, as initializing progress still indicate it is listing buckets."); - return new NodeState(node.getNode().getType(), State.DOWN).setDescription("Listing buckets. Progress " + (100 * reportedState.getInitProgress()) + " %."); - } - return reportedState.clone(); - } - - public boolean handlePrematureCrash(NodeInfo node, NodeStateOrHostInfoChangeHandler changeListener) { - node.setPrematureCrashCount(node.getPrematureCrashCount() + 1); - if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) { - NodeState wantedState = new NodeState(node.getNode().getType(), State.DOWN) - .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row"); - NodeState oldState = node.getWantedState(); - node.setWantedState(wantedState); - if ( ! oldState.equals(wantedState)) { - changeListener.handleNewWantedNodeState(node, wantedState); - } - return true; - } - return false; - } - - public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo hostInfo) { - // Only pass the host info to the latest cluster state view. - currentClusterStateView.handleUpdatedHostInfo(hostnames, nodeInfo, hostInfo); - } - - public class SystemStateHistoryEntry { - - private final ClusterState state; - private final long time; - - SystemStateHistoryEntry(ClusterState state, long time) { - this.state = state; - this.time = time; - } - - public ClusterState state() { return state; } - - public long time() { return time; } - - } - -} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java index a21ed994d5d..c4e7c6897e1 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java @@ -248,6 +248,8 @@ public class DatabaseHandler { log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex + ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion + " into zookeeper."); + // TODO guard version write with a CaS predicated on the version we last read/wrote. + // TODO Drop leadership status if there is a mismatch, as it implies we're racing with another leader. if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) { currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion; pendingStore.lastSystemStateVersion = null; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java index cd9c66d18f0..f952f842151 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java @@ -51,7 +51,7 @@ public class MasterDataGatherer { public void process(WatchedEvent watchedEvent) { switch (watchedEvent.getType()) { case NodeChildrenChanged: // Fleetcontrollers have either connected or disconnected to ZooKeeper - log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": A change occured in the list of registered fleetcontrollers. Requesting new information"); + log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": A change occurred in the list of registered fleetcontrollers. Requesting new information"); session.getChildren(zooKeeperRoot + "indexes", this, childListener, null); break; case NodeDataChanged: // A fleetcontroller has changed what node it is voting for @@ -160,7 +160,7 @@ public class MasterDataGatherer { } } - /** Calling restart, ignores what we currently know and starts another circly. Typically called after reconnecting to ZooKeeperServer. */ + /** Calling restart, ignores what we currently know and starts another cycle. Typically called after reconnecting to ZooKeeperServer. */ public void restart() { synchronized (nextMasterData) { masterData = new TreeMap<Integer, Integer>(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java index 6de9205bbe3..9428370faf5 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java @@ -2,19 +2,19 @@ package com.yahoo.vespa.clustercontroller.core.status; import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.StateVersionTracker; import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageResponse; import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageServer; -import com.yahoo.vespa.clustercontroller.core.SystemStateGenerator; public class ClusterStateRequestHandler implements StatusPageServer.RequestHandler { - private final SystemStateGenerator systemStateGenerator; + private final StateVersionTracker stateVersionTracker; - public ClusterStateRequestHandler(SystemStateGenerator systemStateGenerator) { - this.systemStateGenerator = systemStateGenerator; + public ClusterStateRequestHandler(StateVersionTracker stateVersionTracker) { + this.stateVersionTracker = stateVersionTracker; } @Override public StatusPageResponse handle(StatusPageServer.HttpRequest request) { - ClusterState cs = systemStateGenerator.getClusterState(); + ClusterState cs = stateVersionTracker.getVersionedClusterState(); StatusPageResponse response = new StatusPageResponse(); response.setContentType("text/plain"); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java index 85db0ac0ef9..ec75ba3532d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java @@ -17,21 +17,22 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa private final Timer timer; private final ContentCluster cluster; private final MasterElectionHandler masterElectionHandler; - private final SystemStateGenerator systemStateGenerator; + private final StateVersionTracker stateVersionTracker; private final EventLog eventLog; private final long startedTime; private final RunDataExtractor data; private boolean showLocalSystemStatesInLog = true; public LegacyIndexPageRequestHandler(Timer timer, boolean showLocalSystemStatesInLog, ContentCluster cluster, - MasterElectionHandler masterElectionHandler, SystemStateGenerator systemStateGenerator, + MasterElectionHandler masterElectionHandler, + StateVersionTracker stateVersionTracker, EventLog eventLog, long startedTime, RunDataExtractor data) { this.timer = timer; this.showLocalSystemStatesInLog = showLocalSystemStatesInLog; this.cluster = cluster; this.masterElectionHandler = masterElectionHandler; - this.systemStateGenerator = systemStateGenerator; + this.stateVersionTracker = stateVersionTracker; this.eventLog = eventLog; this.startedTime = startedTime; this.data = data; @@ -63,7 +64,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa new VdsClusterHtmlRendrer(), content, timer, - systemStateGenerator.getClusterState(), + stateVersionTracker.getVersionedClusterState(), data.getOptions().storageDistribution, data.getOptions(), eventLog, @@ -71,7 +72,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa // Overview of current config data.getOptions().writeHtmlState(content, request); // Current cluster state and cluster state history - writeHtmlState(systemStateGenerator, content, request); + writeHtmlState(stateVersionTracker, content, request); } else { // Overview of current config data.getOptions().writeHtmlState(content, request); @@ -84,7 +85,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa return response; } - public void writeHtmlState(SystemStateGenerator systemStateGenerator, StringBuilder sb, StatusPageServer.HttpRequest request) { + public void writeHtmlState(StateVersionTracker stateVersionTracker, StringBuilder sb, StatusPageServer.HttpRequest request) { boolean showLocal = showLocalSystemStatesInLog; if (request.hasQueryParameter("showlocal")) { showLocal = true; @@ -93,9 +94,9 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa } sb.append("<h2 id=\"clusterstates\">Cluster states</h2>\n") - .append("<p>Current cluster state:<br><code>").append(systemStateGenerator.currentClusterStateView().toString()).append("</code></p>\n"); + .append("<p>Current cluster state:<br><code>").append(stateVersionTracker.getVersionedClusterState().toString()).append("</code></p>\n"); - if ( ! systemStateGenerator.systemStateHistory().isEmpty()) { + if ( ! stateVersionTracker.getClusterStateHistory().isEmpty()) { TimeZone tz = TimeZone.getTimeZone("UTC"); sb.append("<h3 id=\"clusterstatehistory\">Cluster state history</h3>\n"); if (showLocal) { @@ -106,10 +107,10 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa .append(" <th>Cluster state</th>\n") .append("</tr>\n"); // Write cluster state history in reverse order (newest on top) - Iterator<SystemStateGenerator.SystemStateHistoryEntry> stateIterator = systemStateGenerator.systemStateHistory().iterator(); - SystemStateGenerator.SystemStateHistoryEntry current = null; + Iterator<ClusterStateHistoryEntry> stateIterator = stateVersionTracker.getClusterStateHistory().iterator(); + ClusterStateHistoryEntry current = null; while (stateIterator.hasNext()) { - SystemStateGenerator.SystemStateHistoryEntry nextEntry = stateIterator.next(); + ClusterStateHistoryEntry nextEntry = stateIterator.next(); if (nextEntry.state().isOfficial() || showLocal) { if (current != null) writeClusterStateEntry(current, nextEntry, sb, tz); current = nextEntry; @@ -120,7 +121,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa } } - private void writeClusterStateEntry(SystemStateGenerator.SystemStateHistoryEntry entry, SystemStateGenerator.SystemStateHistoryEntry last, StringBuilder sb, TimeZone tz) { + private void writeClusterStateEntry(ClusterStateHistoryEntry entry, ClusterStateHistoryEntry last, StringBuilder sb, TimeZone tz) { sb.append("<tr><td>").append(RealTimer.printDate(entry.time(), tz)) .append("</td><td>").append(entry.state().isOfficial() ? "" : "<font color=\"grey\">"); sb.append(entry.state()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java deleted file mode 100644 index fa8128753f6..00000000000 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -package com.yahoo.vespa.clustercontroller.core.status; - -import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageResponse; -import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageServer; - -import java.io.ByteArrayOutputStream; -import java.io.InputStream; -import java.io.IOException; - -/** - * HTTP request handler for serving a single JAR resource as if it were - * a regular file hosted on the server. Always serves the content verbatim - * (i.e. as a byte stream), specifying a Content-Type provided when creating - * the handler. - * - * @author <a href="mailto:vekterli@yahoo-inc.com">Tor Brede Vekterli</a> - * @since 5.28 - */ -public class StaticResourceRequestHandler implements StatusPageServer.RequestHandler { - private final byte[] resourceData; - private final String contentType; - - public StaticResourceRequestHandler(String resourcePath, - String contentType) - throws IOException - { - this.resourceData = loadResource(resourcePath); - this.contentType = contentType; - } - - private byte[] loadResource(String resourcePath) throws IOException { - InputStream resourceStream = getClass().getClassLoader().getResourceAsStream(resourcePath); - if (resourceStream == null) { - throw new IOException("No resource with path '" + resourcePath + "' could be found"); - } - return readStreamData(resourceStream); - } - - @Override - public StatusPageResponse handle(StatusPageServer.HttpRequest request) { - final StatusPageResponse response = new StatusPageResponse(); - response.setClientCachingEnabled(true); - response.setContentType(contentType); - try { - response.getOutputStream().write(resourceData); - } catch (IOException e) { - response.setResponseCode(StatusPageResponse.ResponseCode.INTERNAL_SERVER_ERROR); - } - return response; - } - - private byte[] readStreamData(InputStream resourceStream) throws IOException { - final byte[] buf = new byte[4096]; - final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - while (true) { - int read = resourceStream.read(buf); - if (read < 0) { - break; - } - outputStream.write(buf, 0, read); - } - outputStream.close(); - return outputStream.toByteArray(); - } -} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java index aca26000931..3eda886e721 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java @@ -3,21 +3,18 @@ package com.yahoo.vespa.clustercontroller.core; import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.distribution.Distribution; +import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; -import com.yahoo.vespa.clustercontroller.core.mocks.TestEventLog; import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter; -import com.yahoo.vespa.config.content.StorDistributionConfig; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.mockito.Mockito.mock; @@ -26,98 +23,163 @@ class ClusterFixture { public final Distribution distribution; public final FakeTimer timer; public final EventLogInterface eventLog; - public final SystemStateGenerator generator; + public final StateChangeHandler nodeStateChangeHandler; + public final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params(); - public ClusterFixture(ContentCluster cluster, Distribution distribution) { + ClusterFixture(ContentCluster cluster, Distribution distribution) { this.cluster = cluster; this.distribution = distribution; this.timer = new FakeTimer(); this.eventLog = mock(EventLogInterface.class); - this.generator = createGeneratorForFixtureCluster(); + this.nodeStateChangeHandler = createNodeStateChangeHandlerForCluster(); + this.params.cluster(this.cluster); } - public SystemStateGenerator createGeneratorForFixtureCluster() { + StateChangeHandler createNodeStateChangeHandlerForCluster() { final int controllerIndex = 0; MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), controllerIndex); - SystemStateGenerator generator = new SystemStateGenerator(timer, eventLog, metricUpdater); - generator.setNodes(cluster.clusterInfo()); - generator.setDistribution(distribution); - return generator; + return new StateChangeHandler(timer, eventLog, metricUpdater); } - public void bringEntireClusterUp() { + ClusterFixture bringEntireClusterUp() { cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { reportStorageNodeState(idx, State.UP); reportDistributorNodeState(idx, State.UP); }); + return this; } - public void reportStorageNodeState(final int index, State state) { - final Node node = new Node(NodeType.STORAGE, index); - final NodeState nodeState = new NodeState(NodeType.STORAGE, state); - nodeState.setDescription("mockdesc"); + ClusterFixture markEntireClusterDown() { + cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { + reportStorageNodeState(idx, State.DOWN); + reportDistributorNodeState(idx, State.DOWN); + }); + return this; + } + + private void doReportNodeState(final Node node, final NodeState nodeState) { + final ClusterState stateBefore = rawGeneratedClusterState(); + NodeStateOrHostInfoChangeHandler handler = mock(NodeStateOrHostInfoChangeHandler.class); NodeInfo nodeInfo = cluster.getNodeInfo(node); - generator.handleNewReportedNodeState(nodeInfo, nodeState, handler); + nodeStateChangeHandler.handleNewReportedNodeState(stateBefore, nodeInfo, nodeState, handler); nodeInfo.setReportedState(nodeState, timer.getCurrentTimeInMillis()); } - public void reportStorageNodeState(final int index, NodeState nodeState) { + ClusterFixture reportStorageNodeState(final int index, State state, String description) { final Node node = new Node(NodeType.STORAGE, index); - final NodeInfo nodeInfo = cluster.getNodeInfo(node); - final long mockTime = 1234; - NodeStateOrHostInfoChangeHandler changeListener = mock(NodeStateOrHostInfoChangeHandler.class); - generator.handleNewReportedNodeState(nodeInfo, nodeState, changeListener); - nodeInfo.setReportedState(nodeState, mockTime); + final NodeState nodeState = new NodeState(NodeType.STORAGE, state); + nodeState.setDescription(description); + doReportNodeState(node, nodeState); + return this; } - public void reportDistributorNodeState(final int index, State state) { + ClusterFixture reportStorageNodeState(final int index, State state) { + return reportStorageNodeState(index, state, "mockdesc"); + } + + ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) { + doReportNodeState(new Node(NodeType.STORAGE, index), nodeState); + return this; + } + + ClusterFixture reportDistributorNodeState(final int index, State state) { final Node node = new Node(NodeType.DISTRIBUTOR, index); final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state); - NodeStateOrHostInfoChangeHandler handler = mock(NodeStateOrHostInfoChangeHandler.class); + doReportNodeState(node, nodeState); + return this; + } + + ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) { + doReportNodeState(new Node(NodeType.DISTRIBUTOR, index), nodeState); + return this; + } + + private void doProposeWantedState(final Node node, final NodeState nodeState, String description) { + final ClusterState stateBefore = rawGeneratedClusterState(); + + nodeState.setDescription(description); NodeInfo nodeInfo = cluster.getNodeInfo(node); + nodeInfo.setWantedState(nodeState); - generator.handleNewReportedNodeState(nodeInfo, nodeState, handler); - nodeInfo.setReportedState(nodeState, timer.getCurrentTimeInMillis()); + nodeStateChangeHandler.proposeNewNodeState(stateBefore, nodeInfo, nodeState); } - public void proposeStorageNodeWantedState(final int index, State state) { + ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) { final Node node = new Node(NodeType.STORAGE, index); final NodeState nodeState = new NodeState(NodeType.STORAGE, state); + doProposeWantedState(node, nodeState, description); + return this; + } + + ClusterFixture proposeStorageNodeWantedState(final int index, State state) { + return proposeStorageNodeWantedState(index, state, "mockdesc"); + } + + ClusterFixture proposeDistributorWantedState(final int index, State state) { + final ClusterState stateBefore = rawGeneratedClusterState(); + final Node node = new Node(NodeType.DISTRIBUTOR, index); + final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state); nodeState.setDescription("mockdesc"); NodeInfo nodeInfo = cluster.getNodeInfo(node); nodeInfo.setWantedState(nodeState); - generator.proposeNewNodeState(nodeInfo, nodeState); + nodeStateChangeHandler.proposeNewNodeState(stateBefore, nodeInfo, nodeState); + return this; + } + ClusterFixture disableAutoClusterTakedown() { + setMinNodesUp(0, 0, 0.0, 0.0); + return this; } - public void disableAutoClusterTakedown() { - generator.setMinNodesUp(0, 0, 0.0, 0.0); + ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { + params.minStorageNodesUp(minStorNodes) + .minDistributorNodesUp(minDistNodes) + .minRatioOfStorageNodesUp(minStorRatio) + .minRatioOfDistributorNodesUp(minDistRatio); + return this; } - public void disableTransientMaintenanceModeOnDown() { - Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); - maxTransitionTime.put(NodeType.DISTRIBUTOR, 0); - maxTransitionTime.put(NodeType.STORAGE, 0); - generator.setMaxTransitionTime(maxTransitionTime); + ClusterFixture setMinNodeRatioPerGroup(double upRatio) { + params.minNodeRatioPerGroup(upRatio); + return this; } - public void enableTransientMaintenanceModeOnDown(final int transitionTime) { + static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) { Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); - maxTransitionTime.put(NodeType.DISTRIBUTOR, transitionTime); - maxTransitionTime.put(NodeType.STORAGE, transitionTime); - generator.setMaxTransitionTime(maxTransitionTime); + maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime); + maxTransitionTime.put(NodeType.STORAGE, storageTransitionTime); + return maxTransitionTime; } - public String generatedClusterState() { - return generator.getClusterState().toString(); + void disableTransientMaintenanceModeOnDown() { + this.params.transitionTimes(0); } - public String verboseGeneratedClusterState() { return generator.getClusterState().toString(true); } + void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) { + this.params.transitionTimes(transitionTimeMs); + } + + AnnotatedClusterState annotatedGeneratedClusterState() { + params.currentTimeInMilllis(timer.getCurrentTimeInMillis()); + return ClusterStateGenerator.generatedStateFrom(params); + } - public static ClusterFixture forFlatCluster(int nodeCount) { + ClusterState rawGeneratedClusterState() { + return annotatedGeneratedClusterState().getClusterState(); + } + + String generatedClusterState() { + return annotatedGeneratedClusterState().getClusterState().toString(); + } + + String verboseGeneratedClusterState() { + return annotatedGeneratedClusterState().getClusterState().toString(true); + } + + static ClusterFixture forFlatCluster(int nodeCount) { Collection<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(nodeCount); Distribution distribution = DistributionBuilder.forFlatCluster(nodeCount); @@ -126,11 +188,27 @@ class ClusterFixture { return new ClusterFixture(cluster, distribution); } - public static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) { + static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) { List<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(root.totalNodeCount()); Distribution distribution = DistributionBuilder.forHierarchicCluster(root); ContentCluster cluster = new ContentCluster("foo", nodes, distribution, 0, 0.0); return new ClusterFixture(cluster, distribution); } + + ClusterStateGenerator.Params generatorParams() { + return new ClusterStateGenerator.Params().cluster(cluster); + } + + ContentCluster cluster() { + return this.cluster; + } + + static Node storageNode(int index) { + return new Node(NodeType.STORAGE, index); + } + + static Node distributorNode(int index) { + return new Node(NodeType.DISTRIBUTOR, index); + } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java new file mode 100644 index 00000000000..b9b97c27949 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGeneratorTest.java @@ -0,0 +1,895 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.DiskState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; +import org.junit.Test; + +import java.util.List; +import java.util.Optional; + +import static com.yahoo.vespa.clustercontroller.core.matchers.HasStateReasonForNode.hasStateReasonForNode; +import static com.yahoo.vespa.clustercontroller.core.ClusterFixture.storageNode; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; + +public class ClusterStateGeneratorTest { + + private static AnnotatedClusterState generateFromFixtureWithDefaultParams(ClusterFixture fixture) { + final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params(); + params.cluster = fixture.cluster; + params.transitionTimes = ClusterFixture.buildTransitionTimeMap(0, 0); + params.currentTimeInMillis = 0; + return ClusterStateGenerator.generatedStateFrom(params); + } + + @Test + public void cluster_with_all_nodes_reported_down_has_state_down() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(6).markEntireClusterDown(); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.getClusterState().getClusterState(), is(State.DOWN)); + // The returned message in this case depends on which "is cluster down?" check + // kicks in first. Currently, the minimum storage node count does. + assertThat(state.getClusterStateReason(), equalTo(Optional.of(ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE))); + } + + @Test + public void cluster_with_all_nodes_up_state_correct_distributor_and_storage_count() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(6).bringEntireClusterUp(); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:6 storage:6")); + } + + @Test + public void distributor_reported_states_reflected_in_generated_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(9) + .bringEntireClusterUp() + .reportDistributorNodeState(2, State.DOWN) + .reportDistributorNodeState(4, State.STOPPING); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:9 .2.s:d .4.s:s storage:9")); + } + + // NOTE: initializing state tested separately since it involves init progress state info + @Test + public void storage_reported_states_reflected_in_generated_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(9) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.DOWN) + .reportStorageNodeState(4, State.STOPPING); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:9 storage:9 .0.s:d .4.s:s")); + } + + @Test + public void storage_reported_disk_state_included_in_generated_state() { + final NodeState stateWithDisks = new NodeState(NodeType.STORAGE, State.UP); + stateWithDisks.setDiskCount(7); + stateWithDisks.setDiskState(5, new DiskState(State.DOWN)); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(9) + .bringEntireClusterUp() + .reportStorageNodeState(2, stateWithDisks); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:9 storage:9 .2.d:7 .2.d.5.s:d")); + } + + @Test + public void worse_distributor_wanted_state_overrides_reported_state() { + // Maintenance mode is illegal for distributors and therefore not tested + final ClusterFixture fixture = ClusterFixture.forFlatCluster(7) + .bringEntireClusterUp() + .proposeDistributorWantedState(5, State.DOWN) // Down worse than Up + .reportDistributorNodeState(2, State.STOPPING) + .proposeDistributorWantedState(2, State.DOWN); // Down worse than Stopping + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:7 .2.s:d .5.s:d storage:7")); + } + + @Test + public void worse_storage_wanted_state_overrides_reported_state() { + // Does not test all maintenance mode overrides; see maintenance_mode_overrides_reported_state + // for that. + final ClusterFixture fixture = ClusterFixture.forFlatCluster(7) + .bringEntireClusterUp() + .reportStorageNodeState(2, State.STOPPING) + .proposeStorageNodeWantedState(2, State.MAINTENANCE) // Maintenance worse than Stopping + .proposeStorageNodeWantedState(4, State.RETIRED) // Retired is "worse" than Up + .proposeStorageNodeWantedState(5, State.DOWN); // Down worse than Up + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:7 storage:7 .2.s:m .4.s:r .5.s:d")); + } + + @Test + public void better_distributor_wanted_state_does_not_override_reported_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(7) + .bringEntireClusterUp() + .reportDistributorNodeState(0, State.DOWN) + .proposeDistributorWantedState(0, State.UP); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:7 .0.s:d storage:7")); + } + + @Test + public void better_storage_wanted_state_does_not_override_reported_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(7) + .bringEntireClusterUp() + .reportStorageNodeState(1, State.DOWN) + .proposeStorageNodeWantedState(1, State.UP) + .reportStorageNodeState(2, State.DOWN) + .proposeStorageNodeWantedState(2, State.RETIRED); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:7 storage:7 .1.s:d .2.s:d")); + } + + /** + * If we let a Retired node be published as Initializing when it is in init state, we run + * the risk of having both feed and merge ops be sent towards it, which is not what we want. + * Consequently we pretend such nodes are never in init state and just transition them + * directly from Maintenance -> Up. + */ + @Test + public void retired_node_in_init_state_is_set_to_maintenance() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(1, State.INITIALIZING) + .proposeStorageNodeWantedState(1, State.RETIRED); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:3 storage:3 .1.s:m")); + } + + /** + * A storage node will report itself as being in initializing mode immediately when + * starting up. It can only accept external operations once it has finished listing + * the set of buckets (but not necessarily their contents). As a consequence of this, + * we have to map reported init state while bucket listing mode to Down. This will + * prevent clients from thinking they can use the node and prevent distributors form + * trying to fetch yet non-existent bucket sets from it. + * + * Detecting the bucket-listing stage is currently done by inspecting its init progress + * value and triggering on a sufficiently low value. + */ + @Test + public void storage_node_in_init_mode_while_listing_buckets_is_marked_down() { + final NodeState initWhileListingBuckets = new NodeState(NodeType.STORAGE, State.INITIALIZING); + initWhileListingBuckets.setInitProgress(0.0); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(1, initWhileListingBuckets); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:3 storage:3 .1.s:d")); + } + + /** + * Implicit down while reported as init should not kick into effect if the Wanted state + * is set to Maintenance. + */ + @Test + public void implicit_down_while_listing_buckets_does_not_override_wanted_state() { + final NodeState initWhileListingBuckets = new NodeState(NodeType.STORAGE, State.INITIALIZING); + initWhileListingBuckets.setInitProgress(0.0); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(1, initWhileListingBuckets) + .proposeStorageNodeWantedState(1, State.MAINTENANCE); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:3 storage:3 .1.s:m")); + } + + @Test + public void distributor_nodes_in_init_mode_are_not_mapped_to_down() { + final NodeState initWhileListingBuckets = new NodeState(NodeType.DISTRIBUTOR, State.INITIALIZING); + initWhileListingBuckets.setInitProgress(0.0); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportDistributorNodeState(1, initWhileListingBuckets); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:3 .1.s:i .1.i:0.0 storage:3")); + } + + /** + * Maintenance mode overrides all reported states, even Down. + */ + @Test + public void maintenance_mode_wanted_state_overrides_reported_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(7) + .bringEntireClusterUp() + .proposeStorageNodeWantedState(0, State.MAINTENANCE) + .reportStorageNodeState(2, State.STOPPING) + .proposeStorageNodeWantedState(2, State.MAINTENANCE) + .reportStorageNodeState(3, State.DOWN) + .proposeStorageNodeWantedState(3, State.MAINTENANCE) + .reportStorageNodeState(4, State.INITIALIZING) + .proposeStorageNodeWantedState(4, State.MAINTENANCE); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:7 storage:7 .0.s:m .2.s:m .3.s:m .4.s:m")); + } + + @Test + public void wanted_state_description_carries_over_to_generated_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(7) + .bringEntireClusterUp() + .proposeStorageNodeWantedState(1, State.MAINTENANCE, "foo") + .proposeStorageNodeWantedState(2, State.DOWN, "bar") + .proposeStorageNodeWantedState(3, State.RETIRED, "baz"); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + // We have to use toString(true) to get verbose printing including the descriptions, + // as these are omitted by default. + assertThat(state.toString(true), equalTo("distributor:7 storage:7 .1.s:m .1.m:foo " + + ".2.s:d .2.m:bar .3.s:r .3.m:baz")); + } + + @Test + public void reported_disk_state_not_hidden_by_wanted_state() { + final NodeState stateWithDisks = new NodeState(NodeType.STORAGE, State.UP); + stateWithDisks.setDiskCount(5); + stateWithDisks.setDiskState(3, new DiskState(State.DOWN)); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(9) + .bringEntireClusterUp() + .reportStorageNodeState(2, stateWithDisks) + .proposeStorageNodeWantedState(2, State.RETIRED) + .reportStorageNodeState(3, stateWithDisks) + .proposeStorageNodeWantedState(3, State.MAINTENANCE); + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + // We do not publish disk states for nodes in Down state. This differs from how the + // legacy controller did things, but such states cannot be counted on for ideal state + // calculations either way. In particular, reported disk states are not persisted and + // only exist transiently in the cluster controller's memory. A controller restart is + // sufficient to clear all disk states that have been incidentally remembered for now + // downed nodes. + // The keen reader may choose to convince themselves of this independently by reading the + // code in com.yahoo.vdslib.distribution.Distribution#getIdealStorageNodes and observing + // how disk states for nodes that are in a down-state are never considered. + assertThat(state.toString(), equalTo("distributor:9 storage:9 .2.s:r .2.d:5 .2.d.3.s:d " + + ".3.s:m .3.d:5 .3.d.3.s:d")); + } + + @Test + public void config_retired_mode_is_reflected_in_generated_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + List<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(5); + nodes.set(2, new ConfiguredNode(2, true)); + fixture.cluster.setNodes(nodes); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + + assertThat(state.toString(), equalTo("distributor:5 storage:5 .2.s:r")); + } + + private void do_test_change_within_node_transition_time_window_generates_maintenance(State reportedState) { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams() + .currentTimeInMilllis(10_000) + .transitionTimes(2000); + + fixture.reportStorageNodeState(1, reportedState); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + // Node 1 transitioned to reported `reportedState` at time 9000ms after epoch. This means that according to the + // above transition time config, it should remain in generated maintenance mode until time 11000ms, + // at which point it should finally transition to generated state Down. + nodeInfo.setTransitionTime(9000); + { + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .1.s:m")); + } + + nodeInfo.setTransitionTime(10999); + { + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .1.s:m")); + } + } + + @Test + public void reported_down_node_within_transition_time_has_maintenance_generated_state() { + do_test_change_within_node_transition_time_window_generates_maintenance(State.DOWN); + } + + @Test + public void reported_stopping_node_within_transition_time_has_maintenance_generated_state() { + do_test_change_within_node_transition_time_window_generates_maintenance(State.STOPPING); + } + + @Test + public void reported_node_down_after_transition_time_has_down_generated_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams() + .currentTimeInMilllis(11_000) + .transitionTimes(2000); + + fixture.reportStorageNodeState(1, State.DOWN); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + nodeInfo.setTransitionTime(9000); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .1.s:d")); + } + + @Test + public void distributor_nodes_are_not_implicitly_transitioned_to_maintenance_mode() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams() + .currentTimeInMilllis(10_000) + .transitionTimes(2000); + + fixture.reportDistributorNodeState(2, State.DOWN); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.DISTRIBUTOR, 2)); + nodeInfo.setTransitionTime(9000); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 .2.s:d storage:5")); + } + + @Test + public void transient_maintenance_mode_does_not_override_wanted_down_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams() + .currentTimeInMilllis(10_000) + .transitionTimes(2000); + + fixture.proposeStorageNodeWantedState(2, State.DOWN); + fixture.reportStorageNodeState(2, State.DOWN); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 2)); + nodeInfo.setTransitionTime(9000); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + // Should _not_ be in maintenance mode, since we explicitly want it to stay down. + assertThat(state.toString(), equalTo("distributor:5 storage:5 .2.s:d")); + } + + @Test + public void reported_down_retired_node_within_transition_time_transitions_to_maintenance() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams() + .currentTimeInMilllis(10_000) + .transitionTimes(2000); + + fixture.proposeStorageNodeWantedState(2, State.RETIRED); + fixture.reportStorageNodeState(2, State.DOWN); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 2)); + nodeInfo.setTransitionTime(9000); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .2.s:m")); + } + + @Test + public void crash_count_exceeding_limit_marks_node_as_down() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams().maxPrematureCrashes(10); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 3)); + nodeInfo.setPrematureCrashCount(11); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .3.s:d")); + } + + @Test + public void crash_count_not_exceeding_limit_does_not_mark_node_as_down() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5).bringEntireClusterUp(); + final ClusterStateGenerator.Params params = fixture.generatorParams().maxPrematureCrashes(10); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 3)); + nodeInfo.setPrematureCrashCount(10); // "Max crashes" range is inclusive + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5")); + } + + @Test + public void exceeded_crash_count_does_not_override_wanted_maintenance_state() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .proposeStorageNodeWantedState(1, State.MAINTENANCE); + final ClusterStateGenerator.Params params = fixture.generatorParams().maxPrematureCrashes(10); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + nodeInfo.setPrematureCrashCount(11); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .1.s:m")); + } + + // Stopping -> Down is expected and does not indicate an unstable node. + @Test + public void transition_from_controlled_stop_to_down_does_not_add_to_crash_counter() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(2) + .bringEntireClusterUp() + .reportStorageNodeState(1, State.STOPPING, "controlled shutdown") // urgh, string matching logic + .reportStorageNodeState(1, State.DOWN); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + assertThat(nodeInfo.getPrematureCrashCount(), equalTo(0)); + } + + @Test + public void non_observed_storage_node_start_timestamp_is_included_in_state() { + final NodeState nodeState = new NodeState(NodeType.STORAGE, State.UP); + // A reported state timestamp that is not yet marked as observed in the NodeInfo + // for the same node is considered not observed by other nodes and must therefore + // be included in the generated cluster state + nodeState.setStartTimestamp(5000); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .reportStorageNodeState(0, nodeState); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .0.t:5000")); + } + + @Test + public void non_observed_distributor_start_timestamp_is_included_in_state() { + final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, State.UP); + nodeState.setStartTimestamp(6000); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .reportDistributorNodeState(1, nodeState); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:5 .1.t:6000 storage:5")); + } + + @Test + public void fully_observed_storage_node_timestamp_not_included_in_state() { + final NodeState nodeState = new NodeState(NodeType.STORAGE, State.UP); + nodeState.setStartTimestamp(5000); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .reportStorageNodeState(0, nodeState); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + nodeInfo.setStartTimestamp(5000); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:5 storage:5")); + } + + @Test + public void fully_observed_distributor_timestamp_not_included_in_state() { + final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, State.UP); + nodeState.setStartTimestamp(6000); + + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .reportDistributorNodeState(0, nodeState); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.DISTRIBUTOR, 0)); + nodeInfo.setStartTimestamp(6000); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:5 storage:5")); + } + + @Test + public void cluster_down_if_less_than_min_count_of_storage_nodes_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.DOWN) + .reportStorageNodeState(2, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minStorageNodesUp(2); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("cluster:d distributor:3 storage:2 .0.s:d")); + assertThat(state.getClusterStateReason(), equalTo(Optional.of(ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE))); + } + + @Test + public void cluster_not_down_if_more_than_min_count_of_storage_nodes_are_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minStorageNodesUp(2); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:3 storage:3 .0.s:d")); + assertThat(state.getClusterStateReason(), equalTo(Optional.empty())); + } + + @Test + public void cluster_down_if_less_than_min_count_of_distributors_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportDistributorNodeState(0, State.DOWN) + .reportDistributorNodeState(2, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minDistributorNodesUp(2); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("cluster:d distributor:2 .0.s:d storage:3")); + assertThat(state.getClusterStateReason(), equalTo(Optional.of(ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE))); + } + + @Test + public void cluster_not_down_if_more_than_min_count_of_distributors_are_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportDistributorNodeState(0, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minDistributorNodesUp(2); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:3 .0.s:d storage:3")); + assertThat(state.getClusterStateReason(), equalTo(Optional.empty())); + } + + @Test + public void maintenance_mode_counted_as_down_for_cluster_availability() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.DOWN) + .proposeStorageNodeWantedState(2, State.MAINTENANCE); + final ClusterStateGenerator.Params params = fixture.generatorParams().minStorageNodesUp(2); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("cluster:d distributor:3 storage:3 .0.s:d .2.s:m")); + } + + @Test + public void init_and_retired_counted_as_up_for_cluster_availability() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.INITIALIZING) + .proposeStorageNodeWantedState(1, State.RETIRED); + // Any node being treated as down should take down the cluster here + final ClusterStateGenerator.Params params = fixture.generatorParams().minStorageNodesUp(3); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:3 storage:3 .0.s:i .0.i:1.0 .1.s:r")); + } + + @Test + public void cluster_down_if_less_than_min_ratio_of_storage_nodes_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.DOWN) + .reportStorageNodeState(2, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minRatioOfStorageNodesUp(0.5); + + // TODO de-dupe a lot of these tests? + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("cluster:d distributor:3 storage:2 .0.s:d")); + assertThat(state.getClusterStateReason(), equalTo(Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO))); + } + + @Test + public void cluster_not_down_if_more_than_min_ratio_of_storage_nodes_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.DOWN); + // Min node ratio is inclusive, i.e. 0.5 of 2 nodes is enough for cluster to be up. + final ClusterStateGenerator.Params params = fixture.generatorParams().minRatioOfStorageNodesUp(0.5); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:3 storage:3 .0.s:d")); + assertThat(state.getClusterStateReason(), equalTo(Optional.empty())); + } + + @Test + public void cluster_down_if_less_than_min_ratio_of_distributors_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportDistributorNodeState(0, State.DOWN) + .reportDistributorNodeState(2, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minRatioOfDistributorNodesUp(0.5); + + // TODO de-dupe a lot of these tests? + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("cluster:d distributor:2 .0.s:d storage:3")); + assertThat(state.getClusterStateReason(), equalTo(Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO))); + } + + @Test + public void cluster_not_down_if_more_than_min_ratio_of_distributors_available() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportDistributorNodeState(0, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minRatioOfDistributorNodesUp(0.5); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:3 .0.s:d storage:3")); + assertThat(state.getClusterStateReason(), equalTo(Optional.empty())); + } + + @Test + public void group_nodes_are_marked_down_if_group_availability_too_low() { + final ClusterFixture fixture = ClusterFixture + .forHierarchicCluster(DistributionBuilder.withGroups(3).eachWithNodeCount(3)) + .bringEntireClusterUp() + .reportStorageNodeState(4, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minNodeRatioPerGroup(0.68); + + // Node 4 is down, which is more than 32% of nodes down in group #2. Nodes 3,5 should be implicitly + // marked down as it is in the same group. + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:9 storage:9 .3.s:d .4.s:d .5.s:d")); + } + + @Test + public void group_nodes_are_not_marked_down_if_group_availability_sufficiently_high() { + final ClusterFixture fixture = ClusterFixture + .forHierarchicCluster(DistributionBuilder.withGroups(3).eachWithNodeCount(3)) + .bringEntireClusterUp() + .reportStorageNodeState(4, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minNodeRatioPerGroup(0.65); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:9 storage:9 .4.s:d")); // No other nodes down implicitly + } + + @Test + public void implicitly_downed_group_nodes_receive_a_state_description() { + final ClusterFixture fixture = ClusterFixture + .forHierarchicCluster(DistributionBuilder.withGroups(2).eachWithNodeCount(2)) + .bringEntireClusterUp() + .reportStorageNodeState(3, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minNodeRatioPerGroup(0.51); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(true), equalTo("distributor:4 storage:4 " + + ".2.s:d .2.m:group\\x20node\\x20availability\\x20below\\x20configured\\x20threshold " + + ".3.s:d .3.m:mockdesc")); // Preserve description for non-implicitly taken down node + } + + @Test + public void implicitly_downed_group_nodes_are_annotated_with_group_reason() { + final ClusterFixture fixture = ClusterFixture + .forHierarchicCluster(DistributionBuilder.withGroups(2).eachWithNodeCount(2)) + .bringEntireClusterUp() + .reportStorageNodeState(3, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minNodeRatioPerGroup(0.51); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.getNodeStateReasons(), + hasStateReasonForNode(storageNode(2), NodeStateReason.GROUP_IS_DOWN)); + } + + @Test + public void maintenance_nodes_in_downed_group_are_not_affected() { + final ClusterFixture fixture = ClusterFixture + .forHierarchicCluster(DistributionBuilder.withGroups(3).eachWithNodeCount(3)) + .bringEntireClusterUp() + .proposeStorageNodeWantedState(3, State.MAINTENANCE) + .reportStorageNodeState(4, State.DOWN); + final ClusterStateGenerator.Params params = fixture.generatorParams().minNodeRatioPerGroup(0.68); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + // 4 is down by itself, 5 is down implicitly and 3 should happily stay in Maintenance mode. + // Side note: most special cases for when a node should and should not be affected by group + // down edges are covered in GroupAvailabilityCalculatorTest and GroupAutoTakedownTest. + // We test this case explicitly since it's an assurance that code integration works as expected. + assertThat(state.toString(), equalTo("distributor:9 storage:9 .3.s:m .4.s:d .5.s:d")); + } + + /** + * Cluster-wide distribution bit count cannot be higher than the lowest split bit + * count reported by the set of storage nodes. This is because the distribution bit + * directly impacts which level of the bucket tree is considered the root level, + * and any buckets caught over this level would not be accessible in the data space. + */ + @Test + public void distribution_bits_bounded_by_reported_min_bits_from_storage_node() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(1, new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(7)); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("bits:7 distributor:3 storage:3")); + } + + @Test + public void distribution_bits_bounded_by_lowest_reporting_storage_node() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(6)) + .reportStorageNodeState(1, new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(5)); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("bits:5 distributor:3 storage:3")); + } + + @Test + public void distribution_bits_bounded_by_config_parameter() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3).bringEntireClusterUp(); + + final ClusterStateGenerator.Params params = fixture.generatorParams().idealDistributionBits(12); + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("bits:12 distributor:3 storage:3")); + } + + // TODO do we really want this behavior? It's the legacy one, but it seems... dangerous.. Especially for maintenance + // TODO We generally want to avoid distribution bit decreases if at all possible, since "collapsing" + // the top-level bucket space can cause data loss on timestamp collisions across super buckets. + @Test + public void distribution_bit_not_influenced_by_nodes_down_or_in_maintenance() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(7)) + .reportStorageNodeState(1, new NodeState(NodeType.STORAGE, State.DOWN).setMinUsedBits(6)) + .reportStorageNodeState(2, new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(5)) + .proposeStorageNodeWantedState(2, State.MAINTENANCE); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("bits:7 distributor:3 storage:3 .1.s:d .2.s:m")); + } + + private String do_test_distribution_bit_watermark(int lowestObserved, int node0MinUsedBits) { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(node0MinUsedBits)); + + final ClusterStateGenerator.Params params = fixture.generatorParams() + .highestObservedDistributionBitCount(8) // TODO is this even needed for our current purposes? + .lowestObservedDistributionBitCount(lowestObserved); + + return ClusterStateGenerator.generatedStateFrom(params).toString(); + } + + /** + * Distribution bit increases should not take place incrementally. Doing so would + * let e.g. a transition from 10 bits to 20 bits cause 10 interim full re-distributions. + */ + @Test + public void published_distribution_bit_bound_by_low_watermark_when_nodes_report_less_than_config_bits() { + assertThat(do_test_distribution_bit_watermark(5, 5), + equalTo("bits:5 distributor:3 storage:3")); + assertThat(do_test_distribution_bit_watermark(5, 6), + equalTo("bits:5 distributor:3 storage:3")); + assertThat(do_test_distribution_bit_watermark(5, 15), + equalTo("bits:5 distributor:3 storage:3")); + } + + @Test + public void published_state_jumps_to_configured_ideal_bits_when_all_nodes_report_it() { + // Note: the rest of the mocked nodes always report 16 bits by default + assertThat(do_test_distribution_bit_watermark(5, 16), + equalTo("distributor:3 storage:3")); // "bits:16" implied + } + + private String do_test_storage_node_with_no_init_progress(State wantedState) { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.5)) + .proposeStorageNodeWantedState(0, wantedState); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + nodeInfo.setInitProgressTime(10_000); + + final ClusterStateGenerator.Params params = fixture.generatorParams() + .maxInitProgressTime(1000) + .currentTimeInMilllis(11_000); + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + return state.toString(); + } + + @Test + public void storage_node_with_no_init_progress_within_timeout_is_marked_down() { + assertThat(do_test_storage_node_with_no_init_progress(State.UP), + equalTo("distributor:3 storage:3 .0.s:d")); + } + + /** + * As per usual, we shouldn't transition implicitly to Down if Maintenance is set + * as the wanted state. + */ + @Test + public void maintenance_wanted_state_overrides_storage_node_with_no_init_progress() { + assertThat(do_test_storage_node_with_no_init_progress(State.MAINTENANCE), + equalTo("distributor:3 storage:3 .0.s:m")); + } + + /** + * Legacy behavior: if a node has crashed (i.e. transition into Down) at least once + * while in Init mode, its subsequent init mode will not be made public. + * This means the node will remain in a Down-state until it has finished + * initializing. This is presumably because unstable nodes may not be able to finish + * their init stage and would otherwise pop in and out of the cluster state. + */ + @Test + public void unstable_init_storage_node_has_init_state_substituted_by_down() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .reportStorageNodeState(0, State.INITIALIZING) + .reportStorageNodeState(0, State.DOWN) // Init -> Down triggers unstable init flag + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.5)); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .0.s:d")); + } + + @Test + public void storage_node_with_crashes_but_not_unstable_init_does_not_have_init_state_substituted_by_down() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.5)); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + nodeInfo.setPrematureCrashCount(5); + + final AnnotatedClusterState state = generateFromFixtureWithDefaultParams(fixture); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .0.s:i .0.i:0.5")); + } + + /** + * The generated state must be considered over the Reported state when deciding whether + * to override it with the Wanted state. Otherwise, an unstable retired node could have + * its generated state be Retired instead of Down. We want it to stay down instead of + * potentially contributing additional instability to the cluster. + */ + @Test + public void unstable_retired_node_should_be_marked_down() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(5) + .bringEntireClusterUp() + .proposeStorageNodeWantedState(3, State.RETIRED); + final ClusterStateGenerator.Params params = fixture.generatorParams().maxPrematureCrashes(10); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 3)); + nodeInfo.setPrematureCrashCount(11); + + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:5 storage:5 .3.s:d")); + } + + @Test + public void generator_params_can_inherit_values_from_controller_options() { + FleetControllerOptions options = new FleetControllerOptions("foocluster"); + options.maxPrematureCrashes = 1; + options.minStorageNodesUp = 2; + options.minDistributorNodesUp = 3; + options.minRatioOfStorageNodesUp = 0.4; + options.minRatioOfDistributorNodesUp = 0.5; + options.minNodeRatioPerGroup = 0.6; + options.distributionBits = 7; + options.maxTransitionTime = ClusterStateGenerator.Params.buildTransitionTimeMap(1000, 2000); + final ClusterStateGenerator.Params params = ClusterStateGenerator.Params.fromOptions(options); + assertThat(params.maxPrematureCrashes, equalTo(options.maxPrematureCrashes)); + assertThat(params.minStorageNodesUp, equalTo(options.minStorageNodesUp)); + assertThat(params.minDistributorNodesUp, equalTo(options.minDistributorNodesUp)); + assertThat(params.minRatioOfStorageNodesUp, equalTo(options.minRatioOfStorageNodesUp)); + assertThat(params.minRatioOfDistributorNodesUp, equalTo(options.minRatioOfDistributorNodesUp)); + assertThat(params.minNodeRatioPerGroup, equalTo(options.minNodeRatioPerGroup)); + assertThat(params.transitionTimes, equalTo(options.maxTransitionTime)); + } + + @Test + public void configured_zero_init_progress_time_disables_auto_init_to_down_feature() { + final ClusterFixture fixture = ClusterFixture.forFlatCluster(3) + .bringEntireClusterUp() + .reportStorageNodeState(0, new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.5)); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + nodeInfo.setInitProgressTime(10_000); + + final ClusterStateGenerator.Params params = fixture.generatorParams() + .maxInitProgressTime(0) + .currentTimeInMilllis(11_000); + final AnnotatedClusterState state = ClusterStateGenerator.generatedStateFrom(params); + assertThat(state.toString(), equalTo("distributor:3 storage:3 .0.s:i .0.i:0.5")); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java index 1adb0dcad7d..74661147085 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java @@ -74,13 +74,14 @@ public class DistributionBitCountTest extends FleetControllerTest { nodes.get(3).setNodeState(new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(11)); ClusterState startState = waitForState("version:\\d+ bits:11 distributor:10 storage:10"); - ClusterState state = waitForClusterStateIncludingNodesWithMinUsedBits(11, 2); nodes.get(1).setNodeState(new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(12)); - assertEquals(state + "->" + fleetController.getSystemState(), startState.getVersion(), fleetController.getSystemState().getVersion()); + assertEquals(startState + "->" + fleetController.getSystemState(), + startState.getVersion(), fleetController.getSystemState().getVersion()); for (int i = 0; i < 10; ++i) { - nodes.get(i).setNodeState(new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(17)); + // nodes is array of [distr.0, stor.0, distr.1, stor.1, ...] and we just want the storage nodes + nodes.get(i*2 + 1).setNodeState(new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(17)); } assertEquals(startState.getVersion() + 1, waitForState("version:\\d+ bits:17 distributor:10 storage:10").getVersion()); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculatorTest.java new file mode 100644 index 00000000000..2a5b3adcfe7 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculatorTest.java @@ -0,0 +1,319 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import static com.yahoo.vespa.clustercontroller.core.matchers.EventForNode.eventForNode; +import static com.yahoo.vespa.clustercontroller.core.matchers.NodeEventWithDescription.nodeEventWithDescription; +import static com.yahoo.vespa.clustercontroller.core.matchers.ClusterEventWithDescription.clusterEventWithDescription; +import static com.yahoo.vespa.clustercontroller.core.matchers.EventTypeIs.eventTypeIs; +import static com.yahoo.vespa.clustercontroller.core.matchers.EventTimeIs.eventTimeIs; +import static org.hamcrest.CoreMatchers.allOf; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.hamcrest.CoreMatchers.hasItem; + +import static com.yahoo.vespa.clustercontroller.core.ClusterFixture.storageNode; +import static com.yahoo.vespa.clustercontroller.core.ClusterFixture.distributorNode; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class EventDiffCalculatorTest { + + private static Map<Node, NodeStateReason> emptyNodeStateReasons() { + return Collections.emptyMap(); + } + + private static class EventFixture { + final ClusterFixture clusterFixture; + // TODO could reasonably put shared state into a common class to avoid dupes for both before/after + Optional<ClusterStateReason> clusterReasonBefore = Optional.empty(); + Optional<ClusterStateReason> clusterReasonAfter = Optional.empty(); + ClusterState clusterStateBefore = ClusterState.emptyState(); + ClusterState clusterStateAfter = ClusterState.emptyState(); + final Map<Node, NodeStateReason> nodeReasonsBefore = new HashMap<>(); + final Map<Node, NodeStateReason> nodeReasonsAfter = new HashMap<>(); + long currentTimeMs = 0; + + EventFixture(int nodeCount) { + this.clusterFixture = ClusterFixture.forFlatCluster(nodeCount); + } + + EventFixture clusterStateBefore(String stateStr) { + clusterStateBefore = ClusterState.stateFromString(stateStr); + return this; + } + EventFixture clusterStateAfter(String stateStr) { + clusterStateAfter = ClusterState.stateFromString(stateStr); + return this; + } + EventFixture storageNodeReasonBefore(int index, NodeStateReason reason) { + nodeReasonsBefore.put(storageNode(index), reason); + return this; + } + EventFixture storageNodeReasonAfter(int index, NodeStateReason reason) { + nodeReasonsAfter.put(storageNode(index), reason); + return this; + } + EventFixture clusterReasonBefore(ClusterStateReason reason) { + this.clusterReasonBefore = Optional.of(reason); + return this; + } + EventFixture clusterReasonAfter(ClusterStateReason reason) { + this.clusterReasonAfter = Optional.of(reason); + return this; + } + EventFixture currentTimeMs(long timeMs) { + this.currentTimeMs = timeMs; + return this; + } + + List<Event> computeEventDiff() { + final AnnotatedClusterState stateBefore = new AnnotatedClusterState( + clusterStateBefore, clusterReasonBefore, nodeReasonsBefore); + final AnnotatedClusterState stateAfter = new AnnotatedClusterState( + clusterStateAfter, clusterReasonAfter, nodeReasonsAfter); + + return EventDiffCalculator.computeEventDiff( + EventDiffCalculator.params() + .cluster(clusterFixture.cluster()) + .fromState(stateBefore) + .toState(stateAfter) + .currentTimeMs(currentTimeMs)); + } + + static EventFixture createForNodes(int nodeCount) { + return new EventFixture(nodeCount); + } + + } + + @Test + public void single_storage_node_state_transition_emits_altered_node_state_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("distributor:3 storage:3 .0.s:d"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem(allOf( + eventForNode(storageNode(0)), + eventTypeIs(NodeEvent.Type.CURRENT), + nodeEventWithDescription("Altered node state in cluster state from 'U' to 'D'")))); + } + + @Test + public void single_distributor_node_state_transition_emits_altered_node_state_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("distributor:3 .1.s:d storage:3"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem(allOf( + eventForNode(distributorNode(1)), + eventTypeIs(NodeEvent.Type.CURRENT), + nodeEventWithDescription("Altered node state in cluster state from 'U' to 'D'")))); + } + + @Test + public void node_state_change_event_is_tagged_with_given_time() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("distributor:3 storage:3 .0.s:d") + .currentTimeMs(123456); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem(eventTimeIs(123456))); + } + + @Test + public void multiple_node_state_transitions_emit_multiple_node_state_events() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3 .1.s:d") + .clusterStateAfter("distributor:3 .2.s:d storage:3 .0.s:r"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(3)); + assertThat(events, hasItem(allOf( + eventForNode(distributorNode(2)), + nodeEventWithDescription("Altered node state in cluster state from 'U' to 'D'")))); + assertThat(events, hasItem(allOf( + eventForNode(storageNode(0)), + nodeEventWithDescription("Altered node state in cluster state from 'U' to 'R'")))); + assertThat(events, hasItem(allOf( + eventForNode(storageNode(1)), + nodeEventWithDescription("Altered node state in cluster state from 'D' to 'U'")))); + } + + @Test + public void no_emitted_node_state_event_when_node_state_not_changed() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("distributor:3 storage:3"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(0)); + } + + @Test + public void node_down_edge_with_group_down_reason_has_separate_event_emitted() { + // We sneakily use a flat cluster here but still use a 'group down' reason. Differ doesn't currently care. + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("distributor:3 storage:3 .1.s:d") + .storageNodeReasonAfter(1, NodeStateReason.GROUP_IS_DOWN); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(2)); + // Both the regular edge event and the group down event is emitted + assertThat(events, hasItem(allOf( + eventForNode(storageNode(1)), + nodeEventWithDescription("Altered node state in cluster state from 'U' to 'D'")))); + assertThat(events, hasItem(allOf( + eventForNode(storageNode(1)), + eventTypeIs(NodeEvent.Type.CURRENT), + nodeEventWithDescription("Group node availability is below configured threshold")))); + } + + @Test + public void group_down_to_group_down_does_not_emit_new_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3 .1.s:d") + .clusterStateAfter("distributor:3 storage:3 .1.s:m") + .storageNodeReasonBefore(1, NodeStateReason.GROUP_IS_DOWN) + .storageNodeReasonAfter(1, NodeStateReason.GROUP_IS_DOWN); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + // Should not get a group availability event since nothing has changed in this regard + assertThat(events, hasItem(allOf( + eventForNode(storageNode(1)), + nodeEventWithDescription("Altered node state in cluster state from 'D' to 'M'")))); + } + + @Test + public void group_down_to_clear_reason_emits_group_up_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3 .2.s:d") + .clusterStateAfter("distributor:3 storage:3") + .storageNodeReasonBefore(2, NodeStateReason.GROUP_IS_DOWN); // But no after-reason. + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(2)); + assertThat(events, hasItem(allOf( + eventForNode(storageNode(2)), + nodeEventWithDescription("Altered node state in cluster state from 'D' to 'U'")))); + assertThat(events, hasItem(allOf( + eventForNode(storageNode(2)), + eventTypeIs(NodeEvent.Type.CURRENT), + nodeEventWithDescription("Group node availability has been restored")))); + } + + @Test + public void cluster_up_edge_emits_sufficient_node_availability_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("cluster:d distributor:3 storage:3") + .clusterStateAfter("distributor:3 storage:3"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem( + clusterEventWithDescription("Enough nodes available for system to become up"))); + } + + @Test + public void cluster_down_event_without_reason_annotation_emits_generic_down_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem( + clusterEventWithDescription("Cluster is down"))); + } + + @Test + public void cluster_event_is_tagged_with_given_time() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3") + .currentTimeMs(56789); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem(eventTimeIs(56789))); + } + + @Test + public void no_event_emitted_for_cluster_down_to_down_edge() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("cluster:d distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3"); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(0)); + } + + @Test + public void too_few_storage_nodes_cluster_down_reason_emits_corresponding_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3") + .clusterReasonAfter(ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + // TODO(?) these messages currently don't include the current configured limits + assertThat(events, hasItem( + clusterEventWithDescription("Too few storage nodes available in cluster. Setting cluster state down"))); + } + + @Test + public void too_few_distributor_nodes_cluster_down_reason_emits_corresponding_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3") + .clusterReasonAfter(ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem( + clusterEventWithDescription("Too few distributor nodes available in cluster. Setting cluster state down"))); + } + + @Test + public void too_low_storage_node_ratio_cluster_down_reason_emits_corresponding_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3") + .clusterReasonAfter(ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem( + clusterEventWithDescription("Too low ratio of available storage nodes. Setting cluster state down"))); + } + + @Test + public void too_low_distributor_node_ratio_cluster_down_reason_emits_corresponding_event() { + final EventFixture fixture = EventFixture.createForNodes(3) + .clusterStateBefore("distributor:3 storage:3") + .clusterStateAfter("cluster:d distributor:3 storage:3") + .clusterReasonAfter(ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO); + + final List<Event> events = fixture.computeEventDiff(); + assertThat(events.size(), equalTo(1)); + assertThat(events, hasItem( + clusterEventWithDescription("Too low ratio of available distributor nodes. Setting cluster state down"))); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index f4b3e648f63..d0aa0bceba9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -6,13 +6,11 @@ import com.yahoo.jrt.slobrok.server.Slobrok; import com.yahoo.log.LogLevel; import com.yahoo.log.LogSetup; import com.yahoo.vdslib.distribution.ConfiguredNode; -import com.yahoo.vdslib.distribution.Distribution; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.NodeType; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; -import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import com.yahoo.vespa.clustercontroller.core.rpc.RpcServer; import com.yahoo.vespa.clustercontroller.core.rpc.SlobrokClient; @@ -150,7 +148,7 @@ public abstract class FleetControllerTest implements Waiter { } RpcServer rpcServer = new RpcServer(timer, timer, options.clusterName, options.fleetControllerIndex, options.slobrokBackOffPolicy); DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - SystemStateGenerator stateGenerator = new SystemStateGenerator(timer, log, metricUpdater); + StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); FleetController controller = new FleetController(timer, log, cluster, stateGatherer, communicator, status, rpcServer, lookUp, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java index be60fba234a..a7307e0180a 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/GroupAutoTakedownTest.java @@ -9,19 +9,22 @@ import com.yahoo.vdslib.state.NodeType; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; -import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; + +import static com.yahoo.vespa.clustercontroller.core.matchers.EventForNode.eventForNode; +import static com.yahoo.vespa.clustercontroller.core.matchers.NodeEventWithDescription.nodeEventWithDescription; import org.junit.Test; -import org.mockito.ArgumentMatcher; import java.util.HashSet; +import java.util.List; import java.util.Set; import static org.hamcrest.core.AllOf.allOf; +import static org.hamcrest.core.IsCollectionContaining.hasItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -43,26 +46,29 @@ public class GroupAutoTakedownTest { } private static void setSharedFixtureOptions(ClusterFixture fixture, double minNodeRatioPerGroup) { - fixture.generator.setMinNodeRatioPerGroup(minNodeRatioPerGroup); + fixture.setMinNodeRatioPerGroup(minNodeRatioPerGroup); fixture.disableTransientMaintenanceModeOnDown(); fixture.disableAutoClusterTakedown(); fixture.bringEntireClusterUp(); } private String stateAfterStorageTransition(ClusterFixture fixture, final int index, final State state) { - transitionStoreNodeToState(fixture, index, state); + transitionStorageNodeToState(fixture, index, state); return fixture.generatedClusterState(); } private String verboseStateAfterStorageTransition(ClusterFixture fixture, final int index, final State state) { - transitionStoreNodeToState(fixture, index, state); + transitionStorageNodeToState(fixture, index, state); return fixture.verboseGeneratedClusterState(); } - private void transitionStoreNodeToState(ClusterFixture fixture, int index, State state) { + private void transitionStorageNodeToState(ClusterFixture fixture, int index, State state) { fixture.reportStorageNodeState(index, state); - SystemStateListener listener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); + } + + private AnnotatedClusterState annotatedStateAfterStorageTransition(ClusterFixture fixture, final int index, final State state) { + transitionStorageNodeToState(fixture, index, state); + return fixture.annotatedGeneratedClusterState(); } /** @@ -74,12 +80,9 @@ public class GroupAutoTakedownTest { public void config_does_not_apply_to_flat_hierarchy_clusters() { ClusterFixture fixture = createFixtureForAllUpFlatCluster(5, 0.99); - SystemStateListener listener = mock(SystemStateListener.class); - // First invocation; generates initial state and clears "new state" flag - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); - assertEquals("version:1 distributor:5 storage:5", fixture.generatedClusterState()); + assertEquals("distributor:5 storage:5", fixture.generatedClusterState()); - assertEquals("version:2 distributor:5 storage:5 .1.s:d", + assertEquals("distributor:5 storage:5 .1.s:d", stateAfterStorageTransition(fixture, 1, State.DOWN)); } @@ -88,15 +91,13 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); - SystemStateListener listener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); - assertEquals("version:1 distributor:6 storage:6", fixture.generatedClusterState()); + assertEquals("distributor:6 storage:6", fixture.generatedClusterState()); // Same group as node 4 - assertEquals("version:2 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); // Same group as node 1 - assertEquals("version:3 distributor:6 storage:4 .0.s:d .1.s:d", + assertEquals("distributor:6 storage:4 .0.s:d .1.s:d", stateAfterStorageTransition(fixture, 0, State.DOWN)); } @@ -106,11 +107,11 @@ public class GroupAutoTakedownTest { DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); // Group #2 -> down - assertEquals("version:1 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); // Group #2 -> back up again - assertEquals("version:2 distributor:6 storage:6", + assertEquals("distributor:6 storage:6", stateAfterStorageTransition(fixture, 5, State.UP)); } @@ -119,16 +120,12 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); - assertEquals("version:1 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); // 4, 5 in same group; this should not cause a new state since it's already implicitly down fixture.reportStorageNodeState(4, State.DOWN); - - SystemStateListener listener = mock(SystemStateListener.class); - assertFalse(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); - - assertEquals("version:1 distributor:6 storage:4", fixture.generatedClusterState()); + assertEquals("distributor:6 storage:4", fixture.generatedClusterState()); } @Test @@ -139,7 +136,7 @@ public class GroupAutoTakedownTest { // Nodes 6 and 7 are taken down implicitly and should have a message reflecting this. // Node 8 is taken down by the fixture and gets a fixture-assigned message that // we should _not_ lose/overwrite. - assertEquals("version:1 distributor:9 storage:9 .6.s:d " + + assertEquals("distributor:9 storage:9 .6.s:d " + ".6.m:group\\x20node\\x20availability\\x20below\\x20configured\\x20threshold " + ".7.s:d " + ".7.m:group\\x20node\\x20availability\\x20below\\x20configured\\x20threshold " + @@ -151,12 +148,12 @@ public class GroupAutoTakedownTest { public void legacy_cluster_wide_availabilty_ratio_is_computed_after_group_takedowns() { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); - fixture.generator.setMinNodesUp(5, 5, 0.51, 0.51); + fixture.setMinNodesUp(5, 5, 0.51, 0.51); // Taking down a node in a group forces the entire group down, which leaves us with // only 4 content nodes (vs. minimum of 5 as specified above). The entire cluster // should be marked as down in this case. - assertEquals("version:1 cluster:d distributor:6 storage:4", + assertEquals("cluster:d distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); } @@ -165,16 +162,12 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(3), 0.99); - NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 5)); - fixture.generator.proposeNewNodeState(nodeInfo, new NodeState(NodeType.STORAGE, State.MAINTENANCE)); - SystemStateListener listener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); - + fixture.proposeStorageNodeWantedState(5, State.MAINTENANCE); // Maintenance not counted as down, so group still up - assertEquals("version:1 distributor:9 storage:9 .5.s:m", fixture.generatedClusterState()); + assertEquals("distributor:9 storage:9 .5.s:m", fixture.generatedClusterState()); // Group goes down, but maintenance node should still be in maintenance - assertEquals("version:2 distributor:9 storage:9 .3.s:d .4.s:d .5.s:m", + assertEquals("distributor:9 storage:9 .3.s:d .4.s:d .5.s:m", stateAfterStorageTransition(fixture, 4, State.DOWN)); } @@ -186,51 +179,16 @@ public class GroupAutoTakedownTest { // Our timers are mocked, so taking down node 4 will deterministically transition to // a transient maintenance mode. Group should not be taken down here. - assertEquals("version:1 distributor:9 storage:9 .4.s:m", + assertEquals("distributor:9 storage:9 .4.s:m", stateAfterStorageTransition(fixture, 4, State.DOWN)); // However, once grace period expires the group should be taken down. fixture.timer.advanceTime(1001); NodeStateOrHostInfoChangeHandler changeListener = mock(NodeStateOrHostInfoChangeHandler.class); - fixture.generator.watchTimers(fixture.cluster, changeListener); - SystemStateListener stateListener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, stateListener)); - - assertEquals("version:2 distributor:9 storage:9 .3.s:d .4.s:d .5.s:d", fixture.generatedClusterState()); - } - - private static class NodeEventWithDescription extends ArgumentMatcher<NodeEvent> { - private final String expected; - - NodeEventWithDescription(String expected) { - this.expected = expected; - } - - @Override - public boolean matches(Object o) { - return expected.equals(((NodeEvent)o).getDescription()); - } - } + fixture.nodeStateChangeHandler.watchTimers( + fixture.cluster, fixture.annotatedGeneratedClusterState().getClusterState(), changeListener); - private static NodeEventWithDescription nodeEventWithDescription(String description) { - return new NodeEventWithDescription(description); - } - - private static class EventForNode extends ArgumentMatcher<NodeEvent> { - private final Node expected; - - EventForNode(Node expected) { - this.expected = expected; - } - - @Override - public boolean matches(Object o) { - return ((NodeEvent)o).getNode().getNode().equals(expected); - } - } - - private static EventForNode eventForNode(Node expected) { - return new EventForNode(expected); + assertEquals("distributor:9 storage:9 .3.s:d .4.s:d .5.s:d", fixture.generatedClusterState()); } private static Node contentNode(int index) { @@ -242,13 +200,14 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); - assertEquals("version:1 distributor:6 storage:4", - stateAfterStorageTransition(fixture, 5, State.DOWN)); + final List<Event> events = EventDiffCalculator.computeEventDiff(EventDiffCalculator.params() + .cluster(fixture.cluster) + .fromState(fixture.annotatedGeneratedClusterState()) + .toState(annotatedStateAfterStorageTransition(fixture, 5, State.DOWN))); - verify(fixture.eventLog).addNodeOnlyEvent(argThat(allOf( - nodeEventWithDescription("Setting node down as the total availability of its group is " + - "below the configured threshold"), - eventForNode(contentNode(4)))), any()); + assertThat(events, hasItem(allOf( + nodeEventWithDescription("Group node availability is below configured threshold"), + eventForNode(contentNode(4))))); } @Test @@ -256,30 +215,31 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); - assertEquals("version:1 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); - assertEquals("version:2 distributor:6 storage:6", - stateAfterStorageTransition(fixture, 5, State.UP)); - verify(fixture.eventLog).addNodeOnlyEvent(argThat(allOf( - nodeEventWithDescription("Group availability restored; taking node back up"), - eventForNode(contentNode(4)))), any()); + final List<Event> events = EventDiffCalculator.computeEventDiff(EventDiffCalculator.params() + .cluster(fixture.cluster) + .fromState(fixture.annotatedGeneratedClusterState()) + .toState(annotatedStateAfterStorageTransition(fixture, 5, State.UP))); + + assertThat(events, hasItem(allOf( + nodeEventWithDescription("Group node availability has been restored"), + eventForNode(contentNode(4))))); } @Test - public void wanted_state_retired_implicitly_down_node_transitioned_it_to_retired_mode_immediately() { + public void wanted_state_retired_implicitly_down_node_is_transitioned_to_retired_mode_immediately() { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(3), 0.99); - assertEquals("version:1 distributor:9 storage:6", + assertEquals("distributor:9 storage:6", stateAfterStorageTransition(fixture, 6, State.DOWN)); // Node 7 is implicitly down. Mark wanted state as retired. It should now be Retired // but not Down. fixture.proposeStorageNodeWantedState(7, State.RETIRED); - SystemStateListener stateListener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, stateListener)); - assertEquals("version:2 distributor:9 storage:8 .6.s:d .7.s:r", fixture.generatedClusterState()); + assertEquals("distributor:9 storage:8 .6.s:d .7.s:r", fixture.generatedClusterState()); } @Test @@ -287,9 +247,9 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.49); - assertEquals("version:1 distributor:6 storage:6 .4.s:d", + assertEquals("distributor:6 storage:6 .4.s:d", stateAfterStorageTransition(fixture, 4, State.DOWN)); - assertEquals("version:2 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); // Node 5 gets config-retired under our feet. @@ -299,9 +259,8 @@ public class GroupAutoTakedownTest { // TODO this should ideally also set the retired flag in the distribution // config, but only the ConfiguredNodes are actually looked at currently. fixture.cluster.setNodes(nodes); - fixture.generator.setNodes(fixture.cluster.clusterInfo()); - assertEquals("version:3 distributor:6 storage:6 .4.s:d .5.s:r", + assertEquals("distributor:6 storage:6 .4.s:d .5.s:r", stateAfterStorageTransition(fixture, 5, State.UP)); } @@ -314,14 +273,12 @@ public class GroupAutoTakedownTest { newState.setInitProgress(0.5); fixture.reportStorageNodeState(4, newState); - SystemStateListener stateListener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, stateListener)); - assertEquals("version:1 distributor:6 storage:6 .4.s:i .4.i:0.5", fixture.generatedClusterState()); + assertEquals("distributor:6 storage:6 .4.s:i .4.i:0.5", fixture.generatedClusterState()); - assertEquals("version:2 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); - assertEquals("version:3 distributor:6 storage:6 .4.s:i .4.i:0.5", + assertEquals("distributor:6 storage:6 .4.s:i .4.i:0.5", stateAfterStorageTransition(fixture, 5, State.UP)); } @@ -330,20 +287,17 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(2), 0.51); - final Node node = new Node(NodeType.STORAGE, 4); final NodeState newState = new NodeState(NodeType.STORAGE, State.UP); newState.setDiskCount(7); newState.setDiskState(5, new DiskState(State.DOWN)); fixture.reportStorageNodeState(4, newState); - SystemStateListener stateListener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, stateListener)); - assertEquals("version:1 distributor:6 storage:6 .4.d:7 .4.d.5.s:d", fixture.generatedClusterState()); + assertEquals("distributor:6 storage:6 .4.d:7 .4.d.5.s:d", fixture.generatedClusterState()); - assertEquals("version:2 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); - assertEquals("version:3 distributor:6 storage:6 .4.d:7 .4.d.5.s:d", + assertEquals("distributor:6 storage:6 .4.d:7 .4.d.5.s:d", stateAfterStorageTransition(fixture, 5, State.UP)); } @@ -352,19 +306,15 @@ public class GroupAutoTakedownTest { ClusterFixture fixture = createFixtureForAllUpHierarchicCluster( DistributionBuilder.withGroups(3).eachWithNodeCount(3), 0.60); - NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 5)); - nodeInfo.setWantedState(new NodeState(NodeType.STORAGE, State.DOWN).setDescription("borkbork")); - fixture.generator.proposeNewNodeState(nodeInfo, nodeInfo.getWantedState()); - SystemStateListener listener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); + fixture.proposeStorageNodeWantedState(5, State.DOWN, "borkbork"); - assertEquals("version:1 distributor:9 storage:9 .5.s:d .5.m:borkbork", fixture.verboseGeneratedClusterState()); + assertEquals("distributor:9 storage:9 .5.s:d .5.m:borkbork", fixture.verboseGeneratedClusterState()); - assertEquals("version:2 distributor:9 storage:9 " + + assertEquals("distributor:9 storage:9 " + ".3.s:d .3.m:group\\x20node\\x20availability\\x20below\\x20configured\\x20threshold " + ".4.s:d .4.m:mockdesc .5.s:d .5.m:borkbork", verboseStateAfterStorageTransition(fixture, 4, State.DOWN)); - assertEquals("version:3 distributor:9 storage:9 .5.s:d .5.m:borkbork", + assertEquals("distributor:9 storage:9 .5.s:d .5.m:borkbork", verboseStateAfterStorageTransition(fixture, 4, State.UP)); } @@ -378,25 +328,23 @@ public class GroupAutoTakedownTest { fixture.reportStorageNodeState(4, newState); - SystemStateListener listener = mock(SystemStateListener.class); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); - - assertEquals("version:1 distributor:6 storage:6 .4.t:123456", fixture.generatedClusterState()); + assertEquals("distributor:6 storage:6 .4.t:123456", fixture.generatedClusterState()); DatabaseHandler handler = mock(DatabaseHandler.class); DatabaseHandler.Context context = mock(DatabaseHandler.Context.class); when(context.getCluster()).thenReturn(fixture.cluster); - fixture.generator.handleAllDistributorsInSync(handler, context); - assertTrue(fixture.generator.notifyIfNewSystemState(fixture.cluster, listener)); + Set<ConfiguredNode> nodes = new HashSet<>(fixture.cluster.clusterInfo().getConfiguredNodes().values()); + fixture.nodeStateChangeHandler.handleAllDistributorsInSync( + fixture.annotatedGeneratedClusterState().getClusterState(), nodes, handler, context); // Timestamp should now be cleared from state - assertEquals("version:2 distributor:6 storage:6", fixture.generatedClusterState()); + assertEquals("distributor:6 storage:6", fixture.generatedClusterState()); // Trigger a group down+up edge. Timestamp should _not_ be reintroduced since it was previously cleared. - assertEquals("version:3 distributor:6 storage:4", + assertEquals("distributor:6 storage:4", stateAfterStorageTransition(fixture, 5, State.DOWN)); - assertEquals("version:4 distributor:6 storage:6", + assertEquals("distributor:6 storage:6", stateAfterStorageTransition(fixture, 5, State.UP)); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java index ba2cd287a9a..80435ee7c7d 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MasterElectionTest.java @@ -191,15 +191,15 @@ public class MasterElectionTest extends FleetControllerTest { log.log(LogLevel.INFO, "Leaving waitForMaster"); } - private static class VersionMonotonicityChecker { + private static class StrictlyIncreasingVersionChecker { private ClusterState lastState; - private VersionMonotonicityChecker(ClusterState initialState) { + private StrictlyIncreasingVersionChecker(ClusterState initialState) { this.lastState = initialState; } - public static VersionMonotonicityChecker bootstrappedWith(ClusterState initialState) { - return new VersionMonotonicityChecker(initialState); + public static StrictlyIncreasingVersionChecker bootstrappedWith(ClusterState initialState) { + return new StrictlyIncreasingVersionChecker(initialState); } public void updateAndVerify(ClusterState currentState) { @@ -207,7 +207,7 @@ public class MasterElectionTest extends FleetControllerTest { lastState = currentState; if (currentState.getVersion() <= last.getVersion()) { throw new IllegalStateException( - String.format("Cluster state version monotonicity invariant broken! " + + String.format("Cluster state version strict increase invariant broken! " + "Old state was '%s', new state is '%s'", last, currentState)); } } @@ -226,7 +226,8 @@ public class MasterElectionTest extends FleetControllerTest { waitForStableSystem(); waitForMaster(0); Arrays.asList(0, 1, 2, 3, 4).stream().forEach(this::waitForCompleteCycle); - VersionMonotonicityChecker checker = VersionMonotonicityChecker.bootstrappedWith(fleetControllers.get(0).getClusterState()); + StrictlyIncreasingVersionChecker checker = StrictlyIncreasingVersionChecker.bootstrappedWith( + fleetControllers.get(0).getClusterState()); fleetControllers.get(0).shutdown(); waitForMaster(1); Arrays.asList(1, 2, 3, 4).stream().forEach(this::waitForCompleteCycle); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeInfoTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeInfoTest.java new file mode 100644 index 00000000000..bf0adf7736c --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/NodeInfoTest.java @@ -0,0 +1,80 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class NodeInfoTest { + + @Test + public void unstable_init_flag_is_initially_clear() { + ClusterFixture fixture = ClusterFixture.forFlatCluster(3); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + assertFalse(nodeInfo.recentlyObservedUnstableDuringInit()); + } + + private static ClusterFixture fixtureWithNodeMarkedAsUnstableInit(int nodeIndex) { + return ClusterFixture.forFlatCluster(3) + .reportStorageNodeState(nodeIndex, State.INITIALIZING) + .reportStorageNodeState(nodeIndex, State.DOWN); + } + + @Test + public void down_edge_during_init_state_marks_as_unstable_init() { + ClusterFixture fixture = fixtureWithNodeMarkedAsUnstableInit(1); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + assertTrue(nodeInfo.recentlyObservedUnstableDuringInit()); + } + + @Test + public void stopping_edge_during_init_does_not_mark_as_unstable_init() { + ClusterFixture fixture = ClusterFixture.forFlatCluster(3).reportStorageNodeState(0, State.INITIALIZING); + fixture.reportStorageNodeState(0, State.STOPPING); + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + + assertFalse(nodeInfo.recentlyObservedUnstableDuringInit()); + } + + /** + * The cluster controller will, after a time of observed stable state, reset the crash + * counter for a given node. This should also reset the unstable init flag to keep it + * from haunting a now stable node. + */ + @Test + public void zeroing_crash_count_resets_unstable_init_flag() { + ClusterFixture fixture = fixtureWithNodeMarkedAsUnstableInit(1); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + nodeInfo.setPrematureCrashCount(0); + assertFalse(nodeInfo.recentlyObservedUnstableDuringInit()); + } + + /** + * A non-zero crash count update, on the other hand, implies the node is suffering + * further instabilities and should not clear the unstable init flag. + */ + @Test + public void non_zero_crash_count_update_does_not_reset_unstable_init_flag() { + ClusterFixture fixture = fixtureWithNodeMarkedAsUnstableInit(1); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + nodeInfo.setPrematureCrashCount(3); + assertTrue(nodeInfo.recentlyObservedUnstableDuringInit()); + } + + @Test + public void non_zero_crash_count_does_not_implicitly_set_unstable_init_flag() { + ClusterFixture fixture = ClusterFixture.forFlatCluster(3); + + final NodeInfo nodeInfo = fixture.cluster.getNodeInfo(new Node(NodeType.STORAGE, 1)); + nodeInfo.setPrematureCrashCount(1); + assertFalse(nodeInfo.recentlyObservedUnstableDuringInit()); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java index 2816b75622e..f7f86907205 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcServerTest.java @@ -437,13 +437,13 @@ public class RpcServerTest extends FleetControllerTest { { // Configuration change: Remove the previously retired nodes /* TODO: Verify current result: version:23 distributor:7 .0.s:d .1.s:d .2.s:d .3.s:d .4.s:d storage:7 .0.s:m .1.s:m .2.s:m .3.s:m .4.s:m - TODO: Make this work without stopping/disconnecting (see SystemStateGenerator.setNodes + TODO: Make this work without stopping/disconnecting (see StateChangeHandler.setNodes Set<ConfiguredNode> configuredNodes = new TreeSet<>(); configuredNodes.add(new ConfiguredNode(5, false)); configuredNodes.add(new ConfiguredNode(6, false)); FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes); options.slobrokConnectionSpecs = this.options.slobrokConnectionSpecs; - this.options.maxInitProgressTime = 30000; + this.options.maxInitProgressTimeMs = 30000; this.options.stableStateTimePeriod = 60000; fleetController.updateOptions(options, 0); for (int i = 0; i < 5*2; i++) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateGeneratorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandlerTest.java index 35118933b42..f591e8efc06 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateGeneratorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandlerTest.java @@ -6,7 +6,6 @@ import com.yahoo.vdslib.distribution.Distribution; import com.yahoo.vdslib.state.*; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; -import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener; import com.yahoo.vespa.clustercontroller.core.mocks.TestEventLog; import com.yahoo.vespa.clustercontroller.core.testutils.LogFormatter; import junit.framework.TestCase; @@ -16,33 +15,16 @@ import java.util.Set; import java.util.TreeSet; import java.util.logging.Logger; -public class SystemStateGeneratorTest extends TestCase { - private static final Logger log = Logger.getLogger(SystemStateGeneratorTest.class.getName()); - class Config { +public class StateChangeHandlerTest extends TestCase { + private static final Logger log = Logger.getLogger(StateChangeHandlerTest.class.getName()); + private class Config { int nodeCount = 3; int stableStateTime = 1000 * 60000; int maxSlobrokDisconnectPeriod = 60000; int maxPrematureCrashes = 3; } - class TestSystemStateListener implements SystemStateListener { - LinkedList<ClusterState> states = new LinkedList<>(); - @Override - public void handleNewSystemState(ClusterState state) { - states.add(state); - } - - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("States("); - for (ClusterState state : states) sb.append('\n').append(state.toString()); - sb.append(")"); - return sb.toString(); - } - - } - - class TestNodeStateOrHostInfoChangeHandler implements NodeStateOrHostInfoChangeHandler { + private class TestNodeStateOrHostInfoChangeHandler implements NodeStateOrHostInfoChangeHandler { LinkedList<String> events = new LinkedList<>(); @@ -75,9 +57,9 @@ public class SystemStateGeneratorTest extends TestCase { private Set<ConfiguredNode> configuredNodes = new TreeSet<>(); private Config config; private ContentCluster cluster; - private SystemStateGenerator generator; - private TestSystemStateListener systemStateListener; + private StateChangeHandler nodeStateChangeHandler; private TestNodeStateOrHostInfoChangeHandler nodeStateUpdateListener; + private final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params(); public void setUp() { LogFormatter.initializeLogging(); @@ -88,20 +70,18 @@ public class SystemStateGeneratorTest extends TestCase { this.config = config; for (int i=0; i<config.nodeCount; ++i) configuredNodes.add(new ConfiguredNode(i, false)); cluster = new ContentCluster("testcluster", configuredNodes, distribution, 0, 0.0); - generator = new SystemStateGenerator(clock, eventLog, null); - generator.setNodes(cluster.clusterInfo()); - generator.setStableStateTimePeriod(config.stableStateTime); - generator.setMaxPrematureCrashes(config.maxPrematureCrashes); - generator.setMaxSlobrokDisconnectGracePeriod(config.maxSlobrokDisconnectPeriod); - generator.setMinNodesUp(1, 1, 0, 0); - systemStateListener = new TestSystemStateListener(); + nodeStateChangeHandler = new StateChangeHandler(clock, eventLog, null); + params.minStorageNodesUp(1).minDistributorNodesUp(1) + .minRatioOfStorageNodesUp(0.0).minRatioOfDistributorNodesUp(0.0) + .maxPrematureCrashes(config.maxPrematureCrashes) + .transitionTimes(5000) + .cluster(cluster); nodeStateUpdateListener = new TestNodeStateOrHostInfoChangeHandler(); } - private void assertNewClusterStateReceived() { - assertTrue(generator.notifyIfNewSystemState(cluster, systemStateListener)); - assertTrue(systemStateListener.toString(), systemStateListener.states.size() == 1); - systemStateListener.states.clear(); + private ClusterState currentClusterState() { + params.currentTimeInMilllis(clock.getCurrentTimeInMillis()); + return ClusterStateGenerator.generatedStateFrom(params).getClusterState(); } private void startWithStableStateClusterWithNodesUp() { @@ -109,61 +89,55 @@ public class SystemStateGeneratorTest extends TestCase { for (ConfiguredNode i : configuredNodes) { NodeInfo nodeInfo = cluster.clusterInfo().setRpcAddress(new Node(type, i.index()), null); nodeInfo.markRpcAddressLive(); - generator.handleNewReportedNodeState(nodeInfo, new NodeState(type, State.UP), null); + nodeStateChangeHandler.handleNewReportedNodeState( + currentClusterState(), nodeInfo, new NodeState(type, State.UP), null); nodeInfo.setReportedState(new NodeState(type, State.UP), clock.getCurrentTimeInMillis()); } } - assertNewClusterStateReceived(); for (NodeType type : NodeType.getTypes()) { for (ConfiguredNode i : configuredNodes) { Node n = new Node(type, i.index()); - assertEquals(State.UP, generator.getClusterState().getNodeState(n).getState()); + assertEquals(State.UP, currentClusterState().getNodeState(n).getState()); } } clock.advanceTime(config.stableStateTime); } private void markNodeOutOfSlobrok(Node node) { + final ClusterState stateBefore = currentClusterState(); log.info("Marking " + node + " out of slobrok"); cluster.getNodeInfo(node).markRpcAddressOutdated(clock); - generator.handleMissingNode(cluster.getNodeInfo(node), nodeStateUpdateListener); - assertTrue(nodeStateUpdateListener.toString(), nodeStateUpdateListener.events.isEmpty()); - nodeStateUpdateListener.events.clear(); + nodeStateChangeHandler.handleMissingNode(stateBefore, cluster.getNodeInfo(node), nodeStateUpdateListener); assertTrue(eventLog.toString(), eventLog.toString().contains("Node is no longer in slobrok")); eventLog.clear(); } private void markNodeBackIntoSlobrok(Node node, State state) { + final ClusterState stateBefore = currentClusterState(); log.info("Marking " + node + " back in slobrok"); cluster.getNodeInfo(node).markRpcAddressLive(); - generator.handleReturnedRpcAddress(cluster.getNodeInfo(node)); - assertEquals(0, nodeStateUpdateListener.events.size()); - assertEquals(0, systemStateListener.states.size()); - generator.handleNewReportedNodeState(cluster.getNodeInfo(node), new NodeState(node.getType(), state), nodeStateUpdateListener); + nodeStateChangeHandler.handleReturnedRpcAddress(cluster.getNodeInfo(node)); + nodeStateChangeHandler.handleNewReportedNodeState( + stateBefore, cluster.getNodeInfo(node), + new NodeState(node.getType(), state), nodeStateUpdateListener); cluster.getNodeInfo(node).setReportedState(new NodeState(node.getType(), state), clock.getCurrentTimeInMillis()); - assertEquals(0, nodeStateUpdateListener.events.size()); - assertEquals(0, systemStateListener.states.size()); } private void verifyClusterStateChanged(Node node, State state) { log.info("Verifying cluster state has been updated for " + node + " to " + state); - assertTrue(generator.notifyIfNewSystemState(cluster, systemStateListener)); - assertEquals(1, systemStateListener.states.size()); - assertEquals(state, systemStateListener.states.get(0).getNodeState(node).getState()); - systemStateListener.states.clear(); - assertEquals(state, generator.getClusterState().getNodeState(node).getState()); + assertTrue(nodeStateChangeHandler.stateMayHaveChanged()); + assertEquals(state, currentClusterState().getNodeState(node).getState()); } private void verifyNodeStateAfterTimerWatch(Node node, State state) { log.info("Verifying state of node after timer watch."); - generator.watchTimers(cluster, nodeStateUpdateListener); + nodeStateChangeHandler.watchTimers(cluster, currentClusterState(), nodeStateUpdateListener); assertEquals(0, nodeStateUpdateListener.events.size()); verifyClusterStateChanged(node, state); } private void verifyPrematureCrashCountCleared(Node node) { - assertTrue(generator.watchTimers(cluster, nodeStateUpdateListener)); - assertEquals(0, nodeStateUpdateListener.events.size()); + assertTrue(nodeStateChangeHandler.watchTimers(cluster, currentClusterState(), nodeStateUpdateListener)); assertEquals(0, cluster.getNodeInfo(node).getPrematureCrashCount()); } @@ -175,15 +149,15 @@ public class SystemStateGeneratorTest extends TestCase { log.info("Iteration " + j); assertEquals(0, cluster.getNodeInfo(node).getPrematureCrashCount()); assertEquals(State.UP, cluster.getNodeInfo(node).getWantedState().getState()); - assertEquals(State.UP, generator.getClusterState().getNodeState(node).getState()); + assertEquals(State.UP, currentClusterState().getNodeState(node).getState()); for (int k=0; k<config.maxPrematureCrashes; ++k) { log.info("Premature iteration " + k); markNodeOutOfSlobrok(node); log.info("Passing max disconnect time period. Watching timers"); clock.advanceTime(config.maxSlobrokDisconnectPeriod); - verifyNodeStateAfterTimerWatch(node, State.MAINTENANCE); + cluster.getNodeInfo(node).setReportedState(new NodeState(node.getType(), State.DOWN), clock.getCurrentTimeInMillis()); assertEquals(k, cluster.getNodeInfo(node).getPrematureCrashCount()); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index b94691bb880..c31f80d9b53 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -8,8 +8,10 @@ import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter; import com.yahoo.vespa.clustercontroller.utils.util.NoMetricReporter; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -43,7 +45,7 @@ public class StateChangeTest extends FleetControllerTest { options.minStorageNodesUp, options.minRatioOfStorageNodesUp); NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, eventLog); DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer); - SystemStateGenerator stateGenerator = new SystemStateGenerator(timer, eventLog, metricUpdater); + StateChangeHandler stateGenerator = new StateChangeHandler(timer, eventLog, metricUpdater); SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer); MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer); ctrl = new FleetController(timer, eventLog, cluster, stateGatherer, communicator, null, null, communicator, database, stateGenerator, stateBroadcaster, masterElectionHandler, metricUpdater, options); @@ -109,8 +111,13 @@ public class StateChangeTest extends FleetControllerTest { // Now, fleet controller should have generated a new cluster state. ctrl.tick(); - assertEquals("version:6 distributor:10 .0.s:i .0.i:0.0 .1.s:i .1.i:0.0 .2.s:i .2.i:0.0 .3.s:i .3.i:0.0 .4.s:i .4.i:0.0 .5.s:i .5.i:0.0 .6.s:i .6.i:0.0 .7.s:i .7.i:0.0 .8.s:i .8.i:0.0 .9.s:i .9.i:0.0 storage:10 .0.s:i .0.i:0.9 .1.s:i .1.i:0.9 .2.s:i .2.i:0.9 .3.s:i .3.i:0.9 .4.s:i .4.i:0.9 .5.s:i .5.i:0.9 .6.s:i .6.i:0.9 .7.s:i .7.i:0.9 .8.s:i .8.i:0.9 .9.s:i .9.i:0.9", - ctrl.getSystemState().toString()); + // Regular init progress does not update the cluster state until the node is done initializing (or goes down, + // whichever comes first). + assertEquals("version:6 distributor:10 .0.s:i .0.i:0.0 .1.s:i .1.i:0.0 .2.s:i .2.i:0.0 .3.s:i .3.i:0.0 " + + ".4.s:i .4.i:0.0 .5.s:i .5.i:0.0 .6.s:i .6.i:0.0 .7.s:i .7.i:0.0 .8.s:i .8.i:0.0 " + + ".9.s:i .9.i:0.0 storage:10 .0.s:i .0.i:0.1 .1.s:i .1.i:0.1 .2.s:i .2.i:0.1 .3.s:i .3.i:0.1 " + + ".4.s:i .4.i:0.1 .5.s:i .5.i:0.1 .6.s:i .6.i:0.1 .7.s:i .7.i:0.1 .8.s:i .8.i:0.1 .9.s:i .9.i:0.1", + ctrl.consolidatedClusterState().toString()); timer.advanceTime(options.maxInitProgressTime / 20); ctrl.tick(); @@ -131,24 +138,23 @@ public class StateChangeTest extends FleetControllerTest { assertEquals("version:8 distributor:10 storage:10", ctrl.getSystemState().toString()); - verifyNodeEvents(new Node(NodeType.DISTRIBUTOR, 0), "Event: distributor.0: Now reporting state U\n" + - "Event: distributor.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: distributor.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: distributor.0: Now reporting state I, i 0.00\n" + - "Event: distributor.0: Altered node state in cluster state from 'U' to 'I, i 0.00'.\n" + + "Event: distributor.0: Altered node state in cluster state from 'U' to 'I, i 0.00'\n" + "Event: distributor.0: Now reporting state U\n" + - "Event: distributor.0: Altered node state in cluster state from 'I, i 0.00' to 'U'.\n"); + "Event: distributor.0: Altered node state in cluster state from 'I, i 0.00' to 'U'\n"); verifyNodeEvents(new Node(NodeType.STORAGE, 0), "Event: storage.0: Now reporting state U\n" + - "Event: storage.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.0: Now reporting state I, i 0.00 (ls)\n" + - "Event: storage.0: Altered node state in cluster state from 'U' to 'D: Listing buckets. Progress 0.0 %.'.\n" + + "Event: storage.0: Altered node state in cluster state from 'U' to 'D'\n" + "Event: storage.0: Now reporting state I, i 0.100 (read)\n" + - "Event: storage.0: Altered node state in cluster state from 'D: Listing buckets. Progress 0.0 %.' to 'I, i 0.100 (read)'.\n" + + "Event: storage.0: Altered node state in cluster state from 'D' to 'I, i 0.100 (read)'\n" + "Event: storage.0: Now reporting state U\n" + - "Event: storage.0: Altered node state in cluster state from 'I, i 0.900 (read)' to 'U'.\n"); + "Event: storage.0: Altered node state in cluster state from 'I, i 0.100 (read)' to 'U'\n"); } @Test @@ -172,7 +178,6 @@ public class StateChangeTest extends FleetControllerTest { assertEquals("version:4 distributor:10 .0.s:d storage:10", ctrl.getSystemState().toString()); timer.advanceTime(1000); - long distStartTime = timer.getCurrentTimeInMillis() / 1000; ctrl.tick(); @@ -210,23 +215,24 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.DISTRIBUTOR, 0), "Event: distributor.0: Now reporting state U\n" + - "Event: distributor.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: distributor.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: distributor.0: Failed to get node state: D: Closed at other end\n" + "Event: distributor.0: Stopped or possibly crashed after 0 ms, which is before stable state time period. Premature crash count is now 1.\n" + - "Event: distributor.0: Altered node state in cluster state from 'U' to 'D: Closed at other end'.\n" + + "Event: distributor.0: Altered node state in cluster state from 'U' to 'D: Closed at other end'\n" + "Event: distributor.0: Now reporting state U, t 12345678\n" + - "Event: distributor.0: Altered node state in cluster state from 'D: Closed at other end' to 'U, t 12345678'.\n"); + "Event: distributor.0: Altered node state in cluster state from 'D: Closed at other end' to 'U, t 12345678'\n" + + "Event: distributor.0: Altered node state in cluster state from 'U, t 12345678' to 'U'\n"); verifyNodeEvents(new Node(NodeType.STORAGE, 0), "Event: storage.0: Now reporting state U\n" + - "Event: storage.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.0: Failed to get node state: D: Closed at other end\n" + "Event: storage.0: Stopped or possibly crashed after 1000 ms, which is before stable state time period. Premature crash count is now 1.\n" + - "Event: storage.0: Altered node state in cluster state from 'U' to 'M: Closed at other end'.\n" + + "Event: storage.0: Altered node state in cluster state from 'U' to 'M: Closed at other end'\n" + "Event: storage.0: 5001 milliseconds without contact. Marking node down.\n" + - "Event: storage.0: Altered node state in cluster state from 'M: Closed at other end' to 'D: Closed at other end'.\n" + + "Event: storage.0: Altered node state in cluster state from 'M: Closed at other end' to 'D: Closed at other end'\n" + "Event: storage.0: Now reporting state U, t 12345679\n" + - "Event: storage.0: Altered node state in cluster state from 'D: Closed at other end' to 'U, t 12345679'.\n"); + "Event: storage.0: Altered node state in cluster state from 'D: Closed at other end' to 'U, t 12345679'\n"); assertEquals(1, ctrl.getCluster().getNodeInfo(new Node(NodeType.DISTRIBUTOR, 0)).getPrematureCrashCount()); assertEquals(1, ctrl.getCluster().getNodeInfo(new Node(NodeType.STORAGE, 0)).getPrematureCrashCount()); @@ -239,7 +245,7 @@ public class StateChangeTest extends FleetControllerTest { @Test public void testNodeGoingDownAndUpNotifying() throws Exception { - // Same test as above, but node manage to notify why it is going down first. + // Same test as above, but node manages to notify why it is going down first. FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10)); options.nodeStateRequestTimeoutMS = 60 * 60 * 1000; options.maxSlobrokDisconnectGracePeriod = 100000; @@ -291,21 +297,21 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.DISTRIBUTOR, 0), "Event: distributor.0: Now reporting state U\n" + - "Event: distributor.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: distributor.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: distributor.0: Failed to get node state: D: controlled shutdown\n" + - "Event: distributor.0: Altered node state in cluster state from 'U' to 'D: controlled shutdown'.\n" + + "Event: distributor.0: Altered node state in cluster state from 'U' to 'D: controlled shutdown'\n" + "Event: distributor.0: Now reporting state U\n" + - "Event: distributor.0: Altered node state in cluster state from 'D: controlled shutdown' to 'U'.\n"); + "Event: distributor.0: Altered node state in cluster state from 'D: controlled shutdown' to 'U'\n"); verifyNodeEvents(new Node(NodeType.STORAGE, 0), "Event: storage.0: Now reporting state U\n" + - "Event: storage.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.0: Failed to get node state: D: controlled shutdown\n" + - "Event: storage.0: Altered node state in cluster state from 'U' to 'M: controlled shutdown'.\n" + + "Event: storage.0: Altered node state in cluster state from 'U' to 'M: controlled shutdown'\n" + "Event: storage.0: 5001 milliseconds without contact. Marking node down.\n" + - "Event: storage.0: Altered node state in cluster state from 'M: controlled shutdown' to 'D: controlled shutdown'.\n" + + "Event: storage.0: Altered node state in cluster state from 'M: controlled shutdown' to 'D: controlled shutdown'\n" + "Event: storage.0: Now reporting state U\n" + - "Event: storage.0: Altered node state in cluster state from 'D: controlled shutdown' to 'U'.\n"); + "Event: storage.0: Altered node state in cluster state from 'D: controlled shutdown' to 'U'\n"); } @@ -346,7 +352,7 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.STORAGE, 0), "Event: storage.0: Now reporting state U\n" + - "Event: storage.0: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.0: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.0: Node is no longer in slobrok, but we still have a pending state request.\n"); } @@ -393,15 +399,15 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.STORAGE, 6), "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.6: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.6: Failed to get node state: D: Connection error: Closed at other end\n" + "Event: storage.6: Stopped or possibly crashed after 0 ms, which is before stable state time period. Premature crash count is now 1.\n" + - "Event: storage.6: Altered node state in cluster state from 'U' to 'M: Connection error: Closed at other end'.\n" + + "Event: storage.6: Altered node state in cluster state from 'U' to 'M: Connection error: Closed at other end'\n" + "Event: storage.6: Now reporting state I, i 0.00 (ls)\n" + "Event: storage.6: Now reporting state I, i 0.600 (read)\n" + - "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'I, i 0.600 (read)'.\n" + + "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'I, i 0.600 (read)'\n" + "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'I, i 0.600 (read)' to 'U'.\n"); + "Event: storage.6: Altered node state in cluster state from 'I, i 0.600 (read)' to 'U'\n"); } @Test @@ -453,14 +459,14 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.STORAGE, 6), "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'D' to 'R'.\n" + + "Event: storage.6: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'R'\n" + "Event: storage.6: Failed to get node state: D: Connection error: Closed at other end\n" + "Event: storage.6: Stopped or possibly crashed after 0 ms, which is before stable state time period. Premature crash count is now 1.\n" + - "Event: storage.6: Altered node state in cluster state from 'R' to 'M: Connection error: Closed at other end'.\n" + + "Event: storage.6: Altered node state in cluster state from 'R' to 'M: Connection error: Closed at other end'\n" + "Event: storage.6: Now reporting state I, i 0.00 (ls)\n" + "Event: storage.6: Now reporting state I, i 0.600 (read)\n" + "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'R: Connection error: Closed at other end'.\n"); + "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'R'\n"); } @Test @@ -522,7 +528,7 @@ public class StateChangeTest extends FleetControllerTest { ctrl.tick(); - assertEquals("Listing buckets. Progress 0.1 %.", ctrl.getSystemState().getNodeState(new Node(NodeType.STORAGE, 6)).getDescription()); + assertEquals("version:5 distributor:10 storage:10 .6.s:d", ctrl.getSystemState().toString()); communicator.setNodeState(new Node(NodeType.STORAGE, 6), new NodeState(NodeType.STORAGE, State.INITIALIZING).setInitProgress(0.1), ""); @@ -542,16 +548,16 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.STORAGE, 6), "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.6: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.6: Failed to get node state: D: Connection error: Closed at other end\n" + - "Event: storage.6: Altered node state in cluster state from 'U' to 'M: Connection error: Closed at other end'.\n" + + "Event: storage.6: Altered node state in cluster state from 'U' to 'M: Connection error: Closed at other end'\n" + "Event: storage.6: 100000 milliseconds without contact. Marking node down.\n" + - "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'D: Connection error: Closed at other end'.\n" + + "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'D: Connection error: Closed at other end'\n" + "Event: storage.6: Now reporting state I, i 0.00100 (ls)\n" + "Event: storage.6: Now reporting state I, i 0.100 (read)\n" + - "Event: storage.6: Altered node state in cluster state from 'D: Listing buckets. Progress 0.1 %.' to 'I, i 0.100 (read)'.\n" + + "Event: storage.6: Altered node state in cluster state from 'D: Connection error: Closed at other end' to 'I, i 0.100 (read)'\n" + "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'I, i 0.100 (read)' to 'U'.\n"); + "Event: storage.6: Altered node state in cluster state from 'I, i 0.100 (read)' to 'U'\n"); } @Test @@ -613,9 +619,6 @@ public class StateChangeTest extends FleetControllerTest { // Still down since it seemingly crashed during last init. assertEquals("version:7 distributor:10 storage:10 .6.s:d", ctrl.getSystemState().toString()); - assertEquals("Down: 5001 ms without initialize progress. Assuming node has deadlocked.", - ctrl.getSystemState().getNodeState(new Node(NodeType.STORAGE, 6)).toString()); - ctrl.tick(); communicator.setNodeState(new Node(NodeType.STORAGE, 6), State.UP, ""); @@ -626,20 +629,20 @@ public class StateChangeTest extends FleetControllerTest { verifyNodeEvents(new Node(NodeType.STORAGE, 6), "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'D' to 'U'.\n" + + "Event: storage.6: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + "Event: storage.6: Failed to get node state: D: Connection error: Closed at other end\n" + - "Event: storage.6: Altered node state in cluster state from 'U' to 'M: Connection error: Closed at other end'.\n" + + "Event: storage.6: Altered node state in cluster state from 'U' to 'M: Connection error: Closed at other end'\n" + "Event: storage.6: 1000000 milliseconds without contact. Marking node down.\n" + - "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'D: Connection error: Closed at other end'.\n" + + "Event: storage.6: Altered node state in cluster state from 'M: Connection error: Closed at other end' to 'D: Connection error: Closed at other end'\n" + "Event: storage.6: Now reporting state I, i 0.100 (read)\n" + - "Event: storage.6: Altered node state in cluster state from 'D: Connection error: Closed at other end' to 'I, i 0.100 (read)'.\n" + + "Event: storage.6: Altered node state in cluster state from 'D: Connection error: Closed at other end' to 'I, i 0.100 (read)'\n" + "Event: storage.6: 5001 milliseconds without initialize progress. Marking node down. Premature crash count is now 1.\n" + - "Event: storage.6: Altered node state in cluster state from 'I, i 0.100 (read)' to 'D: 5001 ms without initialize progress. Assuming node has deadlocked.'.\n" + + "Event: storage.6: Altered node state in cluster state from 'I, i 0.100 (read)' to 'D'\n" + "Event: storage.6: Failed to get node state: D: Connection error: Closed at other end\n" + "Event: storage.6: Now reporting state I, i 0.00 (ls)\n" + "Event: storage.6: Now reporting state I, i 0.100 (read)\n" + "Event: storage.6: Now reporting state U\n" + - "Event: storage.6: Altered node state in cluster state from 'D: 5001 ms without initialize progress. Assuming node has deadlocked.' to 'U'.\n"); + "Event: storage.6: Altered node state in cluster state from 'D' to 'U'\n"); } @@ -684,9 +687,6 @@ public class StateChangeTest extends FleetControllerTest { ctrl.tick(); assertEquals("version:7 distributor:10 storage:10 .6.s:d", ctrl.getSystemState().toString()); - - String desc = ctrl.getSystemState().getNodeState(new Node(NodeType.STORAGE, 6)).getDescription(); - assertEquals("Got reverse intialize progress. Assuming node have prematurely crashed", desc); } @Test @@ -1132,4 +1132,70 @@ public class StateChangeTest extends FleetControllerTest { } } + @Test + public void consolidated_cluster_state_reflects_node_changes_when_cluster_is_down() throws Exception { + FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10)); + options.maxTransitionTime.put(NodeType.STORAGE, 0); + options.minStorageNodesUp = 10; + options.minDistributorNodesUp = 10; + initialize(options); + + ctrl.tick(); + assertThat(ctrl.consolidatedClusterState().toString(), equalTo("version:3 distributor:10 storage:10")); + + communicator.setNodeState(new Node(NodeType.STORAGE, 2), State.DOWN, "foo"); + ctrl.tick(); + + assertThat(ctrl.consolidatedClusterState().toString(), + equalTo("version:4 cluster:d distributor:10 storage:10 .2.s:d")); + + // After this point, any further node changes while the cluster is still down won't be published. + // This is because cluster state similarity checks are short-circuited if both are Down, as no other parts + // of the state matter. Despite this, REST API access and similar features need up-to-date information, + // and therefore need to get a state which represents the _current_ state rather than the published state. + // The consolidated state offers this by selectively generating the current state on-demand if the + // cluster is down. + communicator.setNodeState(new Node(NodeType.STORAGE, 5), State.DOWN, "bar"); + ctrl.tick(); + + // NOTE: _same_ version, different node state content. Overall cluster down-state is still the same. + assertThat(ctrl.consolidatedClusterState().toString(), + equalTo("version:4 cluster:d distributor:10 storage:10 .2.s:d .5.s:d")); + } + + // Related to the above test, watchTimer invocations must receive the _current_ state and not the + // published state. Failure to ensure this would cause events to be fired non-stop, as the effect + // of previous timer invocations (with subsequent state generation) would not be visible. + @Test + public void timer_events_during_cluster_down_observe_most_recent_node_changes() throws Exception { + FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10)); + options.maxTransitionTime.put(NodeType.STORAGE, 1000); + options.minStorageNodesUp = 10; + options.minDistributorNodesUp = 10; + initialize(options); + + ctrl.tick(); + communicator.setNodeState(new Node(NodeType.STORAGE, 2), State.DOWN, "foo"); + timer.advanceTime(500); + ctrl.tick(); + communicator.setNodeState(new Node(NodeType.STORAGE, 3), State.DOWN, "foo"); + ctrl.tick(); + assertThat(ctrl.consolidatedClusterState().toString(), equalTo("version:4 cluster:d distributor:10 storage:10 .2.s:m .3.s:m")); + + // Subsequent timer tick should _not_ trigger additional events. Providing published state + // only would result in "Marking node down" events for node 2 emitted per tick. + for (int i = 0; i < 3; ++i) { + timer.advanceTime(5000); + ctrl.tick(); + } + + verifyNodeEvents(new Node(NodeType.STORAGE, 2), + "Event: storage.2: Now reporting state U\n" + + "Event: storage.2: Altered node state in cluster state from 'D: Node not seen in slobrok.' to 'U'\n" + + "Event: storage.2: Failed to get node state: D: foo\n" + + "Event: storage.2: Stopped or possibly crashed after 500 ms, which is before stable state time period. Premature crash count is now 1.\n" + + "Event: storage.2: Altered node state in cluster state from 'U' to 'M: foo'\n" + + "Event: storage.2: 5000 milliseconds without contact. Marking node down.\n"); + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java new file mode 100644 index 00000000000..72f8c9fb8b7 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateVersionTrackerTest.java @@ -0,0 +1,229 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vdslib.state.Node; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vdslib.state.State; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Optional; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +public class StateVersionTrackerTest { + + private static AnnotatedClusterState stateWithoutAnnotations(String stateStr) { + final ClusterState state = ClusterState.stateFromString(stateStr); + return new AnnotatedClusterState(state, Optional.empty(), AnnotatedClusterState.emptyNodeStateReasons()); + } + + private static StateVersionTracker createWithMockedMetrics() { + return new StateVersionTracker(mock(MetricUpdater.class)); + } + + private static void updateAndPromote(final StateVersionTracker versionTracker, + final AnnotatedClusterState state, + final long timeMs) + { + versionTracker.updateLatestCandidateState(state); + versionTracker.promoteCandidateToVersionedState(timeMs); + } + + @Test + public void version_is_incremented_when_new_state_is_applied() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.setVersionRetrievedFromZooKeeper(100); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:2 storage:2"), 123); + assertThat(versionTracker.getCurrentVersion(), equalTo(101)); + assertThat(versionTracker.getVersionedClusterState().toString(), equalTo("version:101 distributor:2 storage:2")); + } + + @Test + public void version_is_1_upon_construction() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + assertThat(versionTracker.getCurrentVersion(), equalTo(1)); + } + + @Test + public void set_current_version_caps_lowest_version_to_1() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.setVersionRetrievedFromZooKeeper(0); + assertThat(versionTracker.getCurrentVersion(), equalTo(1)); + } + + @Test + public void new_version_from_zk_predicate_initially_false() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + assertThat(versionTracker.hasReceivedNewVersionFromZooKeeper(), is(false)); + } + + @Test + public void new_version_from_zk_predicate_true_after_setting_zk_version() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.setVersionRetrievedFromZooKeeper(5); + assertThat(versionTracker.hasReceivedNewVersionFromZooKeeper(), is(true)); + } + + @Test + public void new_version_from_zk_predicate_false_after_applying_higher_version() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.setVersionRetrievedFromZooKeeper(5); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:2 storage:2"), 123); + assertThat(versionTracker.hasReceivedNewVersionFromZooKeeper(), is(false)); + } + + @Test + public void exposed_states_are_empty_upon_construction() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + assertThat(versionTracker.getVersionedClusterState().toString(), equalTo("")); + assertThat(versionTracker.getAnnotatedVersionedClusterState().getClusterState().toString(), equalTo("")); + } + + @Test + public void diff_from_initial_state_implies_changed_state() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.updateLatestCandidateState(stateWithoutAnnotations("cluster:d")); + assertTrue(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()); + } + + private static boolean stateChangedBetween(String fromState, String toState) { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + updateAndPromote(versionTracker, stateWithoutAnnotations(fromState), 123); + versionTracker.updateLatestCandidateState(stateWithoutAnnotations(toState)); + return versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish(); + } + + @Test + public void version_mismatch_not_counted_as_changed_state() { + assertFalse(stateChangedBetween("distributor:2 storage:2", "distributor:2 storage:2")); + } + + @Test + public void different_distributor_node_count_implies_changed_state() { + assertTrue(stateChangedBetween("distributor:2 storage:2", "distributor:3 storage:2")); + assertTrue(stateChangedBetween("distributor:3 storage:2", "distributor:2 storage:2")); + } + + @Test + public void different_storage_node_count_implies_changed_state() { + assertTrue(stateChangedBetween("distributor:2 storage:2", "distributor:2 storage:3")); + assertTrue(stateChangedBetween("distributor:2 storage:3", "distributor:2 storage:2")); + } + + @Test + public void different_distributor_node_state_implies_changed_state() { + assertTrue(stateChangedBetween("distributor:2 storage:2", "distributor:2 .0.s:d storage:2")); + assertTrue(stateChangedBetween("distributor:2 .0.s:d storage:2", "distributor:2 storage:2")); + } + + @Test + public void different_storage_node_state_implies_changed_state() { + assertTrue(stateChangedBetween("distributor:2 storage:2", "distributor:2 storage:2 .0.s:d")); + assertTrue(stateChangedBetween("distributor:2 storage:2 .0.s:d", "distributor:2 storage:2")); + } + + @Test + public void lowest_observed_distribution_bit_is_initially_16() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + assertThat(versionTracker.getLowestObservedDistributionBits(), equalTo(16)); + } + + @Test + public void lowest_observed_distribution_bit_is_tracked_across_states() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + updateAndPromote(versionTracker, stateWithoutAnnotations("bits:15 distributor:2 storage:2"), 100); + assertThat(versionTracker.getLowestObservedDistributionBits(), equalTo(15)); + + updateAndPromote(versionTracker, stateWithoutAnnotations("bits:17 distributor:2 storage:2"), 200); + assertThat(versionTracker.getLowestObservedDistributionBits(), equalTo(15)); + + updateAndPromote(versionTracker, stateWithoutAnnotations("bits:14 distributor:2 storage:2"), 300); + assertThat(versionTracker.getLowestObservedDistributionBits(), equalTo(14)); + } + + // For similarity purposes, only the cluster-wide bits matter, not the individual node state + // min used bits. The former is derived from the latter, but the latter is not visible in the + // published state (but _is_ visible in the internal ClusterState structures). + @Test + public void per_node_min_bits_changes_are_not_considered_different() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + final AnnotatedClusterState stateWithMinBits = stateWithoutAnnotations("distributor:2 storage:2"); + stateWithMinBits.getClusterState().setNodeState( + new Node(NodeType.STORAGE, 0), + new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(15)); + updateAndPromote(versionTracker, stateWithMinBits, 123); + versionTracker.updateLatestCandidateState(stateWithoutAnnotations("distributor:2 storage:2")); + assertFalse(versionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()); + } + + @Test + public void state_history_is_initially_empty() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + assertTrue(versionTracker.getClusterStateHistory().isEmpty()); + } + + private static ClusterStateHistoryEntry historyEntry(final String state, final long time) { + return new ClusterStateHistoryEntry(ClusterState.stateFromString(state), time); + } + + @Test + public void applying_state_adds_to_cluster_state_history() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:2 storage:2") ,100); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:3 storage:3"), 200); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:4 storage:4"), 300); + + // Note: newest entry first + assertThat(versionTracker.getClusterStateHistory(), + equalTo(Arrays.asList( + historyEntry("version:4 distributor:4 storage:4", 300), + historyEntry("version:3 distributor:3 storage:3", 200), + historyEntry("version:2 distributor:2 storage:2", 100)))); + } + + @Test + public void old_states_pruned_when_state_history_limit_reached() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + versionTracker.setMaxHistoryEntryCount(2); + + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:2 storage:2") ,100); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:3 storage:3"), 200); + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:4 storage:4"), 300); + + assertThat(versionTracker.getClusterStateHistory(), + equalTo(Arrays.asList( + historyEntry("version:4 distributor:4 storage:4", 300), + historyEntry("version:3 distributor:3 storage:3", 200)))); + + updateAndPromote(versionTracker, stateWithoutAnnotations("distributor:5 storage:5"), 400); + + assertThat(versionTracker.getClusterStateHistory(), + equalTo(Arrays.asList( + historyEntry("version:5 distributor:5 storage:5", 400), + historyEntry("version:4 distributor:4 storage:4", 300)))); + } + + @Test + public void can_get_latest_non_published_candidate_state() { + final StateVersionTracker versionTracker = createWithMockedMetrics(); + + AnnotatedClusterState candidate = stateWithoutAnnotations("distributor:2 storage:2"); + versionTracker.updateLatestCandidateState(candidate); + assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate)); + + candidate = stateWithoutAnnotations("distributor:3 storage:3"); + versionTracker.updateLatestCandidateState(candidate); + assertThat(versionTracker.getLatestCandidateState(), equalTo(candidate)); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/ClusterEventWithDescription.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/ClusterEventWithDescription.java new file mode 100644 index 00000000000..111a2c63144 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/ClusterEventWithDescription.java @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.matchers; + +import com.yahoo.vespa.clustercontroller.core.ClusterEvent; +import com.yahoo.vespa.clustercontroller.core.NodeEvent; +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.mockito.ArgumentMatcher; + +public class ClusterEventWithDescription extends ArgumentMatcher<ClusterEvent> { + private final String expected; + + public ClusterEventWithDescription(String expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof ClusterEvent)) { + return false; + } + return expected.equals(((ClusterEvent) o).getDescription()); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("ClusterEvent with description '%s'", expected)); + } + + @Override + public void describeMismatch(Object item, Description description) { + ClusterEvent other = (ClusterEvent)item; + description.appendText(String.format("got description '%s'", other.getDescription())); + } + + @Factory + public static ClusterEventWithDescription clusterEventWithDescription(String description) { + return new ClusterEventWithDescription(description); + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventForNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventForNode.java new file mode 100644 index 00000000000..1f2372dea29 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventForNode.java @@ -0,0 +1,37 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.matchers; + +import com.yahoo.vdslib.state.Node; +import com.yahoo.vespa.clustercontroller.core.NodeEvent; +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.mockito.ArgumentMatcher; + +public class EventForNode extends ArgumentMatcher<NodeEvent> { + private final Node expected; + + EventForNode(Node expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + return ((NodeEvent)o).getNode().getNode().equals(expected); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("NodeEvent for node %s", expected)); + } + + @Override + public void describeMismatch(Object item, Description description) { + NodeEvent other = (NodeEvent)item; + description.appendText(String.format("got node %s", other.getNode().getNode())); + } + + @Factory + public static EventForNode eventForNode(Node expected) { + return new EventForNode(expected); + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventTimeIs.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventTimeIs.java new file mode 100644 index 00000000000..c99505d28ee --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventTimeIs.java @@ -0,0 +1,40 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.matchers; + +import com.yahoo.vespa.clustercontroller.core.Event; +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.mockito.ArgumentMatcher; + +public class EventTimeIs extends ArgumentMatcher<Event> { + private final long expected; + + public EventTimeIs(long expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof Event)) { + return false; + } + return expected == ((Event)o).getTimeMs(); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("Event with time %d", expected)); + } + + @Override + public void describeMismatch(Object item, Description description) { + Event other = (Event)item; + description.appendText(String.format("event time is %d", other.getTimeMs())); + } + + @Factory + public static EventTimeIs eventTimeIs(long time) { + return new EventTimeIs(time); + } +} + diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventTypeIs.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventTypeIs.java new file mode 100644 index 00000000000..5430bc5d8a3 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/EventTypeIs.java @@ -0,0 +1,27 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.matchers; + +import com.yahoo.vespa.clustercontroller.core.NodeEvent; +import org.hamcrest.Factory; +import org.mockito.ArgumentMatcher; + +public class EventTypeIs extends ArgumentMatcher<NodeEvent> { + private final NodeEvent.Type expected; + + public EventTypeIs(NodeEvent.Type expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof NodeEvent)) { + return false; + } + return expected.equals(((NodeEvent)o).getType()); + } + + @Factory + public static EventTypeIs eventTypeIs(NodeEvent.Type type) { + return new EventTypeIs(type); + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/HasStateReasonForNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/HasStateReasonForNode.java new file mode 100644 index 00000000000..a147b9af466 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/HasStateReasonForNode.java @@ -0,0 +1,49 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.matchers; + +import com.yahoo.vdslib.state.Node; +import com.yahoo.vespa.clustercontroller.core.NodeStateReason; +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.mockito.ArgumentMatcher; + +import java.util.Map; + +public class HasStateReasonForNode extends ArgumentMatcher<Map<Node, NodeStateReason>> { + private final Node node; + private final NodeStateReason expected; + + public HasStateReasonForNode(Node node, NodeStateReason expected) { + this.node = node; + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + if (o == null || !(o instanceof Map)) { + return false; + } + return expected == ((Map)o).get(node); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("has node state reason %s", expected.toString())); + } + + @Override + public void describeMismatch(Object item, Description description) { + @SuppressWarnings("unchecked") + Map<Node, NodeStateReason> other = (Map<Node, NodeStateReason>)item; + if (other.containsKey(node)) { + description.appendText(String.format("has reason %s", other.get(node).toString())); + } else { + description.appendText("has no entry for node"); + } + } + + @Factory + public static HasStateReasonForNode hasStateReasonForNode(Node node, NodeStateReason reason) { + return new HasStateReasonForNode(node, reason); + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/NodeEventWithDescription.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/NodeEventWithDescription.java new file mode 100644 index 00000000000..5ac89030c23 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/matchers/NodeEventWithDescription.java @@ -0,0 +1,39 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.matchers; + +import com.yahoo.vespa.clustercontroller.core.NodeEvent; +import org.hamcrest.Description; +import org.hamcrest.Factory; +import org.mockito.ArgumentMatcher; + +public class NodeEventWithDescription extends ArgumentMatcher<NodeEvent> { + private final String expected; + + public NodeEventWithDescription(String expected) { + this.expected = expected; + } + + @Override + public boolean matches(Object o) { + if (!(o instanceof NodeEvent)) { + return false; + } + return expected.equals(((NodeEvent) o).getDescription()); + } + + @Override + public void describeTo(Description description) { + description.appendText(String.format("NodeEvent with description '%s'", expected)); + } + + @Override + public void describeMismatch(Object item, Description description) { + NodeEvent other = (NodeEvent)item; + description.appendText(String.format("got description '%s'", other.getDescription())); + } + + @Factory + public static NodeEventWithDescription nodeEventWithDescription(String description) { + return new NodeEventWithDescription(description); + } +} diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java b/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java index b3d572e48ae..d70b55c66a2 100644 --- a/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/ClusterState.java @@ -11,6 +11,9 @@ import java.util.*; */ public class ClusterState implements Cloneable { + private static final NodeState DEFAULT_STORAGE_UP_NODE_STATE = new NodeState(NodeType.STORAGE, State.UP); + private static final NodeState DEFAULT_DISTRIBUTOR_UP_NODE_STATE = new NodeState(NodeType.DISTRIBUTOR, State.UP); + private int version = 0; private State state = State.DOWN; // nodeStates maps each of the non-up nodes that have an index <= the node count for its type. @@ -30,6 +33,22 @@ public class ClusterState implements Cloneable { deserialize(serialized); } + /** + * Parse a given cluster state string into a returned ClusterState instance, wrapping any + * parse exceptions in a RuntimeException. + */ + public static ClusterState stateFromString(final String stateStr) { + try { + return new ClusterState(stateStr); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static ClusterState emptyState() { + return stateFromString(""); + } + public ClusterState clone() { try{ ClusterState state = (ClusterState) super.clone(); @@ -61,22 +80,81 @@ public class ClusterState implements Cloneable { return true; } + @FunctionalInterface + private interface NodeStateCmp { + boolean similar(NodeType nodeType, NodeState lhs, NodeState rhs); + } + public boolean similarTo(Object o) { if (!(o instanceof ClusterState)) { return false; } - ClusterState other = (ClusterState) o; + final ClusterState other = (ClusterState) o; - if (state.equals(State.DOWN) && other.state.equals(State.DOWN)) return true; // both down, means equal (why??) - if (version != other.version || !state.equals(other.state)) return false; - if (distributionBits != other.distributionBits) return false; - if ( ! nodeCount.equals(other.nodeCount)) return false; + return similarToImpl(other, this::normalizedNodeStateSimilarTo); + } + + public boolean similarToIgnoringInitProgress(final ClusterState other) { + return similarToImpl(other, this::normalizedNodeStateSimilarToIgnoringInitProgress); + } - for (Map.Entry<Node, NodeState> nodeStateEntry : nodeStates.entrySet()) { - NodeState otherNodeState = other.nodeStates.get(nodeStateEntry.getKey()); - if (otherNodeState == null || ! otherNodeState.similarTo(nodeStateEntry.getValue())) return false; + private boolean similarToImpl(final ClusterState other, final NodeStateCmp nodeStateCmp) { + // Two cluster states are considered similar if they are both down. When clusters + // are down, their individual node states do not matter to ideal state computations + // and content nodes therefore do not need to observe them. + if (state.equals(State.DOWN) && other.state.equals(State.DOWN)) { + return true; + } + if (!metaInformationSimilarTo(other)) { + return false; + } + // TODO verify behavior of C++ impl against this + for (Node node : unionNodeSetWith(other.nodeStates.keySet())) { + final NodeState lhs = nodeStates.get(node); + final NodeState rhs = other.nodeStates.get(node); + if (!nodeStateCmp.similar(node.getType(), lhs, rhs)) { + return false; + } } return true; } + private Set<Node> unionNodeSetWith(final Set<Node> otherNodes) { + final Set<Node> unionNodeSet = new TreeSet<Node>(nodeStates.keySet()); + unionNodeSet.addAll(otherNodes); + return unionNodeSet; + } + + private boolean metaInformationSimilarTo(final ClusterState other) { + if (version != other.version || !state.equals(other.state)) { + return false; + } + if (distributionBits != other.distributionBits) { + return false; + } + return nodeCount.equals(other.nodeCount); + } + + private boolean normalizedNodeStateSimilarTo(final NodeType nodeType, final NodeState lhs, final NodeState rhs) { + final NodeState lhsNormalized = (lhs != null ? lhs : defaultUpNodeState(nodeType)); + final NodeState rhsNormalized = (rhs != null ? rhs : defaultUpNodeState(nodeType)); + + return lhsNormalized.similarTo(rhsNormalized); + } + + private boolean normalizedNodeStateSimilarToIgnoringInitProgress( + final NodeType nodeType, final NodeState lhs, final NodeState rhs) + { + final NodeState lhsNormalized = (lhs != null ? lhs : defaultUpNodeState(nodeType)); + final NodeState rhsNormalized = (rhs != null ? rhs : defaultUpNodeState(nodeType)); + + return lhsNormalized.similarToIgnoringInitProgress(rhsNormalized); + } + + private static NodeState defaultUpNodeState(final NodeType nodeType) { + return nodeType == NodeType.STORAGE + ? DEFAULT_STORAGE_UP_NODE_STATE + : DEFAULT_DISTRIBUTOR_UP_NODE_STATE; + } + /** * Fleet controller marks states that are actually sent out to nodes as official states. Only fleetcontroller * should set this to official, and only just before sending it out. This state is currently not serialized with @@ -97,7 +175,7 @@ public class ClusterState implements Cloneable { public void addNodeState() throws ParseException { if (!empty) { NodeState ns = NodeState.deserialize(node.getType(), sb.toString()); - if (!ns.equals(new NodeState(node.getType(), State.UP))) { + if (!ns.equals(defaultUpNodeState(node.getType()))) { nodeStates.put(node, ns); } if (nodeCount.get(node.getType().ordinal()) <= node.getIndex()) { diff --git a/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java b/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java index 8c31938dfaf..15c929fe49d 100644 --- a/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java +++ b/vdslib/src/main/java/com/yahoo/vdslib/state/NodeState.java @@ -112,17 +112,27 @@ public class NodeState implements Cloneable { * Cluster state will check for that. */ public boolean similarTo(Object o) { - if (!(o instanceof NodeState)) { return false; } - NodeState other = (NodeState) o; + if (!(o instanceof NodeState)) { + return false; + } + return similarToImpl((NodeState)o, true); + } + + public boolean similarToIgnoringInitProgress(final NodeState other) { + return similarToImpl(other, false); + } + private boolean similarToImpl(final NodeState other, boolean considerInitProgress) { if (state != other.state) return false; if (Math.abs(capacity - other.capacity) > 0.0000000001) return false; if (Math.abs(reliability - other.reliability) > 0.0000000001) return false; if (startTimestamp != other.startTimestamp) return false; // Init progress on different sides of the init progress limit boundary is not similar. - if (type.equals(NodeType.STORAGE) - && initProgress < getListingBucketsInitProgressLimit() ^ other.initProgress < getListingBucketsInitProgressLimit()) + if (considerInitProgress + && type.equals(NodeType.STORAGE) + && (initProgress < getListingBucketsInitProgressLimit() + ^ other.initProgress < getListingBucketsInitProgressLimit())) { return false; } diff --git a/vdslib/src/test/java/com/yahoo/vdslib/state/ClusterStateTestCase.java b/vdslib/src/test/java/com/yahoo/vdslib/state/ClusterStateTestCase.java index c058a7c9919..0d06fcc6faa 100644 --- a/vdslib/src/test/java/com/yahoo/vdslib/state/ClusterStateTestCase.java +++ b/vdslib/src/test/java/com/yahoo/vdslib/state/ClusterStateTestCase.java @@ -1,10 +1,18 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vdslib.state; +import org.junit.Test; + import java.text.ParseException; +import java.util.function.BiFunction; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; -public class ClusterStateTestCase extends junit.framework.TestCase { +public class ClusterStateTestCase{ + @Test public void testSetNodeState() throws ParseException { ClusterState state = new ClusterState(""); assertEquals("", state.toString()); @@ -22,6 +30,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals("distributor:5 .0.s:d .2.s:d .3.s:d storage:1 .0.d:4 .0.d.1.s:d", state.toString()); } + @Test public void testClone() throws ParseException { ClusterState state = new ClusterState(""); state.setNodeState(new Node(NodeType.DISTRIBUTOR, 1), new NodeState(NodeType.DISTRIBUTOR, State.UP).setDescription("available")); @@ -31,8 +40,9 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals(state.toString(true), other.toString(true)); assertEquals(state.toString(false), other.toString(false)); assertEquals(state, other); - } + } + @Test public void testEquals() throws ParseException { ClusterState state = new ClusterState(""); @@ -55,6 +65,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { ClusterState state2 = new ClusterState("distributor:3 .1.s:d .2.s:m storage:3 .1.s:i .2.s:m"); assertFalse(state1.equals(state2)); assertFalse(state1.similarTo(state2)); + assertFalse(state1.similarToIgnoringInitProgress(state2)); } { @@ -62,6 +73,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { ClusterState state2 = new ClusterState("cluster:d version:1 bits:20 distributor:1 storage:1 .0.s:d"); assertFalse(state1.equals(state2)); assertTrue(state1.similarTo(state2)); + assertTrue(state1.similarToIgnoringInitProgress(state2)); } { @@ -69,6 +81,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { ClusterState state2 = new ClusterState("distributor:3 storage:3"); assertFalse(state1.equals(state2)); assertFalse(state1.similarTo(state2)); + assertFalse(state1.similarToIgnoringInitProgress(state2)); } assertFalse(state.equals("class not instance of ClusterState")); @@ -78,6 +91,92 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertTrue(state.similarTo(state)); } + private static ClusterState stateFromString(final String stateStr) { + try { + return new ClusterState(stateStr); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + + private void do_test_differing_storage_node_sets(BiFunction<ClusterState, ClusterState, Boolean> cmp) { + final ClusterState a = stateFromString("distributor:3 storage:3 .0.s:d"); + final ClusterState b = stateFromString("distributor:3 storage:3"); + assertFalse(cmp.apply(a, b)); + assertFalse(cmp.apply(b, a)); + assertTrue(cmp.apply(a, a)); + assertTrue(cmp.apply(b, b)); + } + + private void do_test_differing_distributor_node_sets(BiFunction<ClusterState, ClusterState, Boolean> cmp) { + final ClusterState a = stateFromString("distributor:3 .0.s:d storage:3"); + final ClusterState b = stateFromString("distributor:3 storage:3"); + assertFalse(cmp.apply(a, b)); + assertFalse(cmp.apply(b, a)); + assertTrue(cmp.apply(a, a)); + assertTrue(cmp.apply(b, b)); + } + + @Test + public void similarity_check_considers_differing_distributor_node_state_sets() { + do_test_differing_distributor_node_sets((a, b) -> a.similarTo(b)); + } + + @Test + public void similarity_check_considers_differing_storage_node_state_sets() { + do_test_differing_storage_node_sets((a, b) -> a.similarTo(b)); + } + + @Test + public void structural_similarity_check_considers_differing_distributor_node_state_sets() { + do_test_differing_distributor_node_sets((a, b) -> a.similarToIgnoringInitProgress(b)); + } + + @Test + public void init_progress_ignoring_similarity_check_considers_differing_storage_node_state_sets() { + do_test_differing_storage_node_sets((a, b) -> a.similarToIgnoringInitProgress(b)); + } + + private void do_test_similarity_for_down_cluster_state(BiFunction<ClusterState, ClusterState, Boolean> cmp) { + final ClusterState a = stateFromString("cluster:d distributor:3 .0.s:d storage:3 .2:s:d"); + final ClusterState b = stateFromString("cluster:d distributor:3 storage:3 .1:s:d"); + assertTrue(cmp.apply(a, b)); + assertTrue(cmp.apply(b, a)); + } + + @Test + public void similarity_check_considers_differing_down_cluster_states_similar() { + do_test_similarity_for_down_cluster_state((a, b) -> a.similarTo(b)); + } + + @Test + public void init_progress_ignoring__similarity_check_considers_differing_down_cluster_states_similar() { + do_test_similarity_for_down_cluster_state((a, b) -> a.similarToIgnoringInitProgress(b)); + } + + // If we naively only look at the NodeState sets in the ClusterState instances to be + // compared, we might get false positives. If state A has a NodeState(Up, minBits 15) + // while state B has NodeState(Up, minBits 16), the latter will be pruned away from the + // NodeState set because it's got a "default" Up state. The two states are still semantically + // similar, and should be returned as such. But their state sets technically differ. + @Test + public void similarity_check_does_not_consider_per_storage_node_min_bits() { + final ClusterState a = stateFromString("distributor:4 storage:4"); + final ClusterState b = stateFromString("distributor:4 storage:4"); + b.setNodeState(new Node(NodeType.STORAGE, 1), new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(15)); + assertTrue(a.similarTo(b)); + assertTrue(b.similarTo(a)); + } + + @Test + public void init_progress_ignoring_similarity_check_does_in_fact_ignore_init_progress() { + final ClusterState a = stateFromString("distributor:3 storage:3 .0.i:0.01 .1.i:0.1 .2.i:0.9"); + final ClusterState b = stateFromString("distributor:3 storage:3 .0.i:0.2 .1.i:0.5 .2.i:0.99"); + assertTrue(a.similarToIgnoringInitProgress(b)); + assertTrue(b.similarToIgnoringInitProgress(a)); + } + + @Test public void testTextDiff() throws ParseException { ClusterState state1 = new ClusterState("distributor:9 storage:4"); ClusterState state2 = new ClusterState("distributor:7 storage:6"); @@ -94,6 +193,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals("version: 123 => 0, bits: 16 => 21, official: false => true, storage: [2: [Initializing => Up, disks: 2 => 0, description: Booting => ], 4: Down => Up, 5: Down => Up], distributor: [7: Up => Down, 8: Up => Down]", state1.getTextualDifference(state2)); } + @Test public void testHtmlDiff() throws ParseException { ClusterState state1 = new ClusterState("distributor:9 storage:4"); ClusterState state2 = new ClusterState("distributor:7 storage:6"); @@ -133,7 +233,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { "]", state1.getHtmlDifference(state2)); } - + @Test public void testParser() throws ParseException { ClusterState state = new ClusterState("distributor:2 storage:17 .2.s:d .13.s:r m:cluster\\x20message"); assertEquals("cluster message", state.getDescription()); @@ -191,17 +291,20 @@ public class ClusterStateTestCase extends junit.framework.TestCase { } catch (Exception e) {} } + @Test public void testCapacityExponential() throws ParseException { ClusterState state = new ClusterState("distributor:27 storage:170 .2.s:d .13.c:3E-8 .13.s:r"); - assertEquals(3E-8, state.getNodeState(new Node(NodeType.STORAGE, 13)).getCapacity()); + assertEquals(3E-8, state.getNodeState(new Node(NodeType.STORAGE, 13)).getCapacity(), 1E-8); } + @Test public void testCapacityExponentialCpp() throws ParseException { ClusterState state = new ClusterState("distributor:27 storage:170 .2.s:d .13.c:3e-08 .13.s:r"); - assertEquals(3E-8, state.getNodeState(new Node(NodeType.STORAGE, 13)).getCapacity()); + assertEquals(3E-8, state.getNodeState(new Node(NodeType.STORAGE, 13)).getCapacity(), 1E-8); } + @Test public void testSetState() throws ParseException { ClusterState state = new ClusterState("distributor:2 storage:2"); state.setNodeState(new Node(NodeType.DISTRIBUTOR, 0), new NodeState(NodeType.DISTRIBUTOR, State.DOWN)); @@ -209,6 +312,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals("distributor:2 .0.s:d storage:2", state.toString()); } + @Test public void testVersionAndClusterStates() throws ParseException { ClusterState state = new ClusterState("version:4 cluster:i distributor:2 .1.s:i storage:2 .0.s:i .0.i:0.345"); assertEquals(4, state.getVersion()); @@ -220,6 +324,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals("version:5 cluster:d bits:12 distributor:2 .1.s:i .1.i:1.0 storage:2 .0.s:i .0.i:0.345", state.toString()); } + @Test public void testNotRemovingCommentedDownNodesAtEnd() throws ParseException { ClusterState state = new ClusterState(""); state.setNodeState(new Node(NodeType.DISTRIBUTOR, 0), new NodeState(NodeType.DISTRIBUTOR, State.UP)); @@ -234,6 +339,7 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals("distributor:1 storage:2", state.toString(false)); } + @Test public void testWhitespace() throws ParseException { ClusterState state = new ClusterState("distributor:2\n .1.t:3\nstorage:2\n\t.0.s:i \r\f.1.s:m"); assertEquals(2, state.getNodeCount(NodeType.DISTRIBUTOR)); @@ -243,4 +349,22 @@ public class ClusterStateTestCase extends junit.framework.TestCase { assertEquals(new NodeState(NodeType.STORAGE, State.INITIALIZING), state.getNodeState(new Node(NodeType.STORAGE, 0))); assertEquals(new NodeState(NodeType.STORAGE, State.MAINTENANCE), state.getNodeState(new Node(NodeType.STORAGE, 1))); } + + @Test + public void empty_state_factory_method_returns_empty_state() { + final ClusterState state = ClusterState.emptyState(); + assertEquals("", state.toString()); + } + + @Test + public void state_from_string_factory_method_returns_cluster_state_constructed_from_input() { + final String stateStr = "version:123 distributor:2 storage:2"; + final ClusterState state = ClusterState.stateFromString(stateStr); + assertEquals(stateStr, state.toString()); + } + + @Test(expected=RuntimeException.class) + public void state_from_string_factory_method_throws_runtime_exception_on_parse_failure() { + ClusterState.stateFromString("fraggle rock"); + } } diff --git a/vdslib/src/test/java/com/yahoo/vdslib/state/NodeStateTestCase.java b/vdslib/src/test/java/com/yahoo/vdslib/state/NodeStateTestCase.java index 63137a92c7b..9362838b63c 100644 --- a/vdslib/src/test/java/com/yahoo/vdslib/state/NodeStateTestCase.java +++ b/vdslib/src/test/java/com/yahoo/vdslib/state/NodeStateTestCase.java @@ -165,6 +165,12 @@ public class NodeStateTestCase extends junit.framework.TestCase { assertFalse(ns2.similarTo(ns3)); assertTrue(ns3.similarTo(ns4)); + assertTrue(ns1.similarToIgnoringInitProgress(ns2)); + assertTrue(ns1.similarToIgnoringInitProgress(ns3)); + assertTrue(ns3.similarToIgnoringInitProgress(ns1)); + assertTrue(ns1.similarToIgnoringInitProgress(ns4)); + assertTrue(ns2.similarToIgnoringInitProgress(ns4)); + assertFalse(ns1.equals(ns2)); assertFalse(ns2.equals(ns3)); assertFalse(ns3.equals(ns4)); @@ -176,6 +182,7 @@ public class NodeStateTestCase extends junit.framework.TestCase { NodeState ns1 = new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(16); NodeState ns2 = new NodeState(NodeType.STORAGE, State.UP).setMinUsedBits(18); assertTrue(ns1.similarTo(ns2)); + assertTrue(ns1.similarToIgnoringInitProgress(ns2)); assertFalse(ns1.equals(ns2)); } { @@ -184,12 +191,14 @@ public class NodeStateTestCase extends junit.framework.TestCase { assertEquals(ns, ns2Disks); assertEquals(ns2Disks, ns); assertTrue(ns.similarTo(ns2Disks)); + assertTrue(ns.similarToIgnoringInitProgress(ns2Disks)); assertTrue(ns2Disks.similarTo(ns)); ns2Disks.getDiskState(0).setState(State.DOWN); assertFalse(ns.equals(ns2Disks)); assertFalse(ns2Disks.equals(ns)); assertFalse(ns.similarTo(ns2Disks)); + assertFalse(ns.similarToIgnoringInitProgress(ns2Disks)); assertFalse(ns2Disks.similarTo(ns)); } } |