summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2016-10-05 11:30:50 +0200
committerGitHub <noreply@github.com>2016-10-05 11:30:50 +0200
commitcf687abd43e57e52afe0a56df727bc0a95621da1 (patch)
tree44c8bd4df3e1d4d36436d4ba62a2eff7cfafe606 /clustercontroller-core/src/main/java
parent7a0243a1e6bcbbfb672ff7933635b9ab0d607474 (diff)
Rewrite and refactor core cluster controller state generation logic
Cluster controller will now generate the new cluster state on-demand in a "pure functional" way instead of conditionally patching a working state over time. This makes understanding (and changing) the state generation logic vastly easier than it previously was.
Diffstat (limited to 'clustercontroller-core/src/main/java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java69
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java345
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java46
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java15
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java143
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java197
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java32
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java10
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java530
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java140
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java941
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java10
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java25
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java66
20 files changed, 1509 insertions, 1082 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
new file mode 100644
index 00000000000..05a66ddbf2b
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/AnnotatedClusterState.java
@@ -0,0 +1,69 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class AnnotatedClusterState {
+ private final ClusterState clusterState;
+ private final Map<Node, NodeStateReason> nodeStateReasons;
+ private final Optional<ClusterStateReason> clusterStateReason;
+
+ public AnnotatedClusterState(ClusterState clusterState,
+ Optional<ClusterStateReason> clusterStateReason,
+ Map<Node, NodeStateReason> nodeStateReasons)
+ {
+ this.clusterState = clusterState;
+ this.clusterStateReason = clusterStateReason;
+ this.nodeStateReasons = nodeStateReasons;
+ }
+
+ public static AnnotatedClusterState emptyState() {
+ return new AnnotatedClusterState(ClusterState.emptyState(), Optional.empty(), emptyNodeStateReasons());
+ }
+
+ static Map<Node, NodeStateReason> emptyNodeStateReasons() {
+ return Collections.emptyMap();
+ }
+
+ public ClusterState getClusterState() {
+ return clusterState;
+ }
+
+ public Map<Node, NodeStateReason> getNodeStateReasons() {
+ return Collections.unmodifiableMap(nodeStateReasons);
+ }
+
+ public Optional<ClusterStateReason> getClusterStateReason() {
+ return clusterStateReason;
+ }
+
+ @Override
+ public String toString() {
+ return clusterState.toString();
+ }
+
+ public String toString(boolean verbose) {
+ return clusterState.toString(verbose);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ AnnotatedClusterState that = (AnnotatedClusterState) o;
+ return Objects.equals(clusterState, that.clusterState) &&
+ Objects.equals(nodeStateReasons, that.nodeStateReasons) &&
+ Objects.equals(clusterStateReason, that.clusterStateReason);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clusterState, nodeStateReasons, clusterStateReason);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java
new file mode 100644
index 00000000000..e6fbed71153
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateGenerator.java
@@ -0,0 +1,345 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
+import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vdslib.state.State;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Pure functional cluster state generator which deterministically constructs a full
+ * cluster state given the state of the content cluster, a set of cluster controller
+ * configuration parameters and the current time.
+ *
+ * State version tracking is considered orthogonal to state generation. Therefore,
+ * cluster state version is _not_ set here; its incrementing must be handled by the
+ * caller.
+ */
+public class ClusterStateGenerator {
+
+ static class Params {
+ public ContentCluster cluster;
+ public Map<NodeType, Integer> transitionTimes;
+ public long currentTimeInMillis = 0;
+ public int maxPrematureCrashes = 0;
+ public int minStorageNodesUp = 1;
+ public int minDistributorNodesUp = 1;
+ public double minRatioOfStorageNodesUp = 0.0;
+ public double minRatioOfDistributorNodesUp = 0.0;
+ public double minNodeRatioPerGroup = 0.0;
+ public int idealDistributionBits = 16;
+ public int highestObservedDistributionBitCount = 16;
+ public int lowestObservedDistributionBitCount = 16;
+ public int maxInitProgressTimeMs = 5000;
+
+ Params() {
+ this.transitionTimes = buildTransitionTimeMap(0, 0);
+ }
+
+ // FIXME de-dupe
+ static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTimeMs, int storageTransitionTimeMs) {
+ Map<com.yahoo.vdslib.state.NodeType, java.lang.Integer> maxTransitionTime = new TreeMap<>();
+ maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.DISTRIBUTOR, distributorTransitionTimeMs);
+ maxTransitionTime.put(com.yahoo.vdslib.state.NodeType.STORAGE, storageTransitionTimeMs);
+ return maxTransitionTime;
+ }
+
+ Params cluster(ContentCluster cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+ Params maxInitProgressTime(int maxTimeMs) {
+ this.maxInitProgressTimeMs = maxTimeMs;
+ return this;
+ }
+ Params transitionTimes(int timeMs) {
+ this.transitionTimes = buildTransitionTimeMap(timeMs, timeMs);
+ return this;
+ }
+ Params transitionTimes(Map<NodeType, Integer> timesMs) {
+ this.transitionTimes = timesMs;
+ return this;
+ }
+ Params currentTimeInMilllis(long currentTimeMs) {
+ this.currentTimeInMillis = currentTimeMs;
+ return this;
+ }
+ Params maxPrematureCrashes(int count) {
+ this.maxPrematureCrashes = count;
+ return this;
+ }
+ Params minStorageNodesUp(int nodes) {
+ this.minStorageNodesUp = nodes;
+ return this;
+ }
+ Params minDistributorNodesUp(int nodes) {
+ this.minDistributorNodesUp = nodes;
+ return this;
+ }
+ Params minRatioOfStorageNodesUp(double minRatio) {
+ this.minRatioOfStorageNodesUp = minRatio;
+ return this;
+ }
+ Params minRatioOfDistributorNodesUp(double minRatio) {
+ this.minRatioOfDistributorNodesUp = minRatio;
+ return this;
+ }
+ Params minNodeRatioPerGroup(double minRatio) {
+ this.minNodeRatioPerGroup = minRatio;
+ return this;
+ }
+ Params idealDistributionBits(int distributionBits) {
+ this.idealDistributionBits = distributionBits;
+ return this;
+ }
+ Params highestObservedDistributionBitCount(int bitCount) {
+ this.highestObservedDistributionBitCount = bitCount;
+ return this;
+ }
+ Params lowestObservedDistributionBitCount(int bitCount) {
+ this.lowestObservedDistributionBitCount = bitCount;
+ return this;
+ }
+
+ /**
+ * Infer parameters from controller options. Important: does _not_ set cluster;
+ * it must be explicitly set afterwards on the returned parameter object before
+ * being used to compute states.
+ */
+ static Params fromOptions(FleetControllerOptions opts) {
+ return new Params()
+ .maxPrematureCrashes(opts.maxPrematureCrashes)
+ .minStorageNodesUp(opts.minStorageNodesUp)
+ .minDistributorNodesUp(opts.minDistributorNodesUp)
+ .minRatioOfStorageNodesUp(opts.minRatioOfStorageNodesUp)
+ .minRatioOfDistributorNodesUp(opts.minRatioOfDistributorNodesUp)
+ .minNodeRatioPerGroup(opts.minNodeRatioPerGroup)
+ .idealDistributionBits(opts.distributionBits)
+ .transitionTimes(opts.maxTransitionTime);
+ }
+ }
+
+ static AnnotatedClusterState generatedStateFrom(final Params params) {
+ final ContentCluster cluster = params.cluster;
+ final ClusterState workingState = ClusterState.emptyState();
+ final Map<Node, NodeStateReason> nodeStateReasons = new HashMap<>();
+
+ for (final NodeInfo nodeInfo : cluster.getNodeInfo()) {
+ final NodeState nodeState = computeEffectiveNodeState(nodeInfo, params);
+ workingState.setNodeState(nodeInfo.getNode(), nodeState);
+ }
+
+ takeDownGroupsWithTooLowAvailability(workingState, nodeStateReasons, params);
+
+ final Optional<ClusterStateReason> reasonToBeDown = clusterDownReason(workingState, params);
+ if (reasonToBeDown.isPresent()) {
+ workingState.setClusterState(State.DOWN);
+ }
+ workingState.setDistributionBits(inferDistributionBitCount(cluster, workingState, params));
+
+ return new AnnotatedClusterState(workingState, reasonToBeDown, nodeStateReasons);
+ }
+
+ private static boolean nodeIsConsideredTooUnstable(final NodeInfo nodeInfo, final Params params) {
+ return (params.maxPrematureCrashes != 0
+ && nodeInfo.getPrematureCrashCount() > params.maxPrematureCrashes);
+ }
+
+ private static void applyWantedStateToBaselineState(final NodeState baseline, final NodeState wanted) {
+ // Only copy state and description from Wanted state; this preserves auxiliary
+ // information such as disk states and startup timestamp.
+ baseline.setState(wanted.getState());
+ baseline.setDescription(wanted.getDescription());
+ }
+
+ private static NodeState computeEffectiveNodeState(final NodeInfo nodeInfo, final Params params) {
+ final NodeState reported = nodeInfo.getReportedState();
+ final NodeState wanted = nodeInfo.getWantedState();
+ final NodeState baseline = reported.clone();
+
+ if (nodeIsConsideredTooUnstable(nodeInfo, params)) {
+ baseline.setState(State.DOWN);
+ }
+ if (startupTimestampAlreadyObservedByAllNodes(nodeInfo, baseline)) {
+ baseline.setStartTimestamp(0);
+ }
+ if (nodeInfo.isStorage()) {
+ applyStorageSpecificStateTransforms(nodeInfo, params, reported, wanted, baseline);
+ }
+ if (baseline.above(wanted)) {
+ applyWantedStateToBaselineState(baseline, wanted);
+ }
+
+ return baseline;
+ }
+
+ private static void applyStorageSpecificStateTransforms(NodeInfo nodeInfo, Params params, NodeState reported,
+ NodeState wanted, NodeState baseline)
+ {
+ if (reported.getState() == State.INITIALIZING) {
+ if (timedOutWithoutNewInitProgress(reported, nodeInfo, params)
+ || shouldForceInitToDown(reported)
+ || nodeInfo.recentlyObservedUnstableDuringInit())
+ {
+ baseline.setState(State.DOWN);
+ }
+ if (shouldForceInitToMaintenance(reported, wanted)) {
+ baseline.setState(State.MAINTENANCE);
+ }
+ }
+ // TODO ensure that maintenance cannot override Down for any other cases
+ if (withinTemporalMaintenancePeriod(nodeInfo, baseline, params) && wanted.getState() != State.DOWN) {
+ baseline.setState(State.MAINTENANCE);
+ }
+ }
+
+ // TODO remove notion of init timeout progress? Seems redundant when we've already got RPC timeouts
+ private static boolean timedOutWithoutNewInitProgress(final NodeState reported, final NodeInfo nodeInfo, final Params params) {
+ if (reported.getState() != State.INITIALIZING) {
+ return false;
+ }
+ if (params.maxInitProgressTimeMs <= 0) {
+ return false; // No upper bound for max init time; auto-down for all intents and purposes disabled.
+ }
+ return nodeInfo.getInitProgressTime() + params.maxInitProgressTimeMs <= params.currentTimeInMillis;
+ }
+
+ // Init while listing buckets should be treated as Down, as distributors expect a storage node
+ // in Init mode to have a bucket set readily available. Clients also expect a node in Init to
+ // be able to receive operations.
+ // Precondition: reported.getState() == State.INITIALIZING
+ private static boolean shouldForceInitToDown(final NodeState reported) {
+ return reported.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001;
+ }
+
+ // Special case: since each node is published with a single state, if we let a Retired node
+ // be published with Initializing, it'd start receiving feed and merges. Avoid this by
+ // having it be in maintenance instead for the duration of the init period.
+ private static boolean shouldForceInitToMaintenance(final NodeState reported, final NodeState wanted) {
+ return reported.getState() == State.INITIALIZING && wanted.getState() == State.RETIRED;
+ }
+
+ private static boolean startupTimestampAlreadyObservedByAllNodes(final NodeInfo nodeInfo, final NodeState baseline) {
+ return baseline.getStartTimestamp() == nodeInfo.getStartTimestamp(); // TODO rename NodeInfo getter/setter
+ }
+
+ /**
+ * Determines whether a given storage node should be implicitly set as being
+ * in a maintenance state despite its reported state being Down. This is
+ * predominantly a case when contact has just been lost with a node, but we
+ * do not want to immediately set it to Down just yet (where "yet" is a configurable
+ * amount of time; see params.transitionTime). This is to prevent common node
+ * restart/upgrade scenarios from triggering redistribution and data replication
+ * that would be useless work if the node comes back up immediately afterwards.
+ *
+ * Only makes sense to call for storage nodes, since distributors don't support
+ * being in maintenance mode.
+ */
+ private static boolean withinTemporalMaintenancePeriod(final NodeInfo nodeInfo,
+ final NodeState baseline,
+ final Params params)
+ {
+ final Integer transitionTime = params.transitionTimes.get(nodeInfo.getNode().getType());
+ if (transitionTime == 0 || !baseline.getState().oneOf("sd")) {
+ return false;
+ }
+ return nodeInfo.getTransitionTime() + transitionTime > params.currentTimeInMillis;
+ }
+
+ private static void takeDownGroupsWithTooLowAvailability(final ClusterState workingState,
+ Map<Node, NodeStateReason> nodeStateReasons,
+ final Params params)
+ {
+ final GroupAvailabilityCalculator calc = new GroupAvailabilityCalculator.Builder()
+ .withMinNodeRatioPerGroup(params.minNodeRatioPerGroup)
+ .withDistribution(params.cluster.getDistribution())
+ .build();
+ final Set<Integer> nodesToTakeDown = calc.nodesThatShouldBeDown(workingState);
+
+ for (Integer idx : nodesToTakeDown) {
+ final Node node = storageNode(idx);
+ final NodeState newState = new NodeState(NodeType.STORAGE, State.DOWN);
+ newState.setDescription("group node availability below configured threshold");
+ workingState.setNodeState(node, newState);
+ nodeStateReasons.put(node, NodeStateReason.GROUP_IS_DOWN);
+ }
+ }
+
+ private static Node storageNode(int index) {
+ return new Node(NodeType.STORAGE, index);
+ }
+
+ // TODO we'll want to explicitly persist a bit lower bound in ZooKeeper and ensure we
+ // never go below it (this is _not_ the case today). Nodes that have min bits lower than
+ // this will just have to start splitting out in the background before being allowed
+ // to join the cluster.
+
+ private static int inferDistributionBitCount(final ContentCluster cluster,
+ final ClusterState state,
+ final Params params)
+ {
+ int bitCount = params.idealDistributionBits;
+ final Optional<Integer> minBits = cluster.getConfiguredNodes().values().stream()
+ .map(configuredNode -> cluster.getNodeInfo(storageNode(configuredNode.index())))
+ .filter(node -> state.getNodeState(node.getNode()).getState().oneOf("iur"))
+ .map(nodeInfo -> nodeInfo.getReportedState().getMinUsedBits())
+ .min(Integer::compare);
+
+ if (minBits.isPresent() && minBits.get() < bitCount) {
+ bitCount = minBits.get();
+ }
+ if (bitCount > params.lowestObservedDistributionBitCount && bitCount < params.idealDistributionBits) {
+ bitCount = params.lowestObservedDistributionBitCount;
+ }
+
+ return bitCount;
+ }
+
+ private static boolean nodeStateIsConsideredAvailable(final NodeState ns) {
+ return (ns.getState() == State.UP
+ || ns.getState() == State.RETIRED
+ || ns.getState() == State.INITIALIZING);
+ }
+
+ private static long countAvailableNodesOfType(final NodeType type,
+ final ContentCluster cluster,
+ final ClusterState state)
+ {
+ return cluster.getConfiguredNodes().values().stream()
+ .map(node -> state.getNodeState(new Node(type, node.index())))
+ .filter(ClusterStateGenerator::nodeStateIsConsideredAvailable)
+ .count();
+ }
+
+ private static Optional<ClusterStateReason> clusterDownReason(final ClusterState state, final Params params) {
+ final ContentCluster cluster = params.cluster;
+
+ final long upStorageCount = countAvailableNodesOfType(NodeType.STORAGE, cluster, state);
+ final long upDistributorCount = countAvailableNodesOfType(NodeType.DISTRIBUTOR, cluster, state);
+ // There's a 1-1 relationship between distributors and storage nodes, so don't need to
+ // keep track of separate node counts for computing availability ratios.
+ final long nodeCount = cluster.getConfiguredNodes().size();
+
+ if (upStorageCount < params.minStorageNodesUp) {
+ return Optional.of(ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE);
+ }
+ if (upDistributorCount < params.minDistributorNodesUp) {
+ return Optional.of(ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE);
+ }
+ if (params.minRatioOfStorageNodesUp * nodeCount > upStorageCount) {
+ return Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO);
+ }
+ if (params.minRatioOfDistributorNodesUp * nodeCount > upDistributorCount) {
+ return Optional.of(ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO);
+ }
+ return Optional.empty();
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java
new file mode 100644
index 00000000000..3963fcaa45b
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateHistoryEntry.java
@@ -0,0 +1,46 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+import java.util.Objects;
+
+public class ClusterStateHistoryEntry {
+
+ private final ClusterState state;
+ private final long time;
+
+ ClusterStateHistoryEntry(final ClusterState state, final long time) {
+ this.state = state;
+ this.time = time;
+ }
+
+ public ClusterState state() {
+ return state;
+ }
+
+ public long time() {
+ return time;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ClusterStateHistoryEntry that = (ClusterStateHistoryEntry) o;
+ return time == that.time &&
+ Objects.equals(state, that.state);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(state, time);
+ }
+
+ // String representation only used for test expectation failures and debugging output.
+ // Actual status page history entry rendering emits formatted date/time.
+ public String toString() {
+ return String.format("state '%s' at time %d", state, time);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java
new file mode 100644
index 00000000000..3557ed1ceb8
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateReason.java
@@ -0,0 +1,15 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+/**
+ * Explicit reasons for why a cluster has been assigned a particular global state.
+ * This only includes reasons that aren't directly possible to infer from diffing
+ * two cluster states; i.e. distribution bit changes aren't listed here because
+ * they are obvious from direct inspection.
+ */
+public enum ClusterStateReason {
+ TOO_FEW_STORAGE_NODES_AVAILABLE,
+ TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE,
+ TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO,
+ TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO,
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java
index 328acfb4dbe..644d6b28b05 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateView.java
@@ -41,6 +41,10 @@ public class ClusterStateView {
return new ClusterStateView(clusterState, createNewAggregator(clusterState, metricUpdater), metricUpdater);
}
+ public static ClusterStateView create(final ClusterState clusterState, final MetricUpdater metricUpdater) {
+ return new ClusterStateView(clusterState, createNewAggregator(clusterState, metricUpdater), metricUpdater);
+ }
+
private static ClusterStatsAggregator createNewAggregator(ClusterState clusterState, MetricUpdater metricUpdater) {
Set<Integer> upDistributors = getIndicesOfUpNodes(clusterState, NodeType.DISTRIBUTOR);
Set<Integer> upStorageNodes = getIndicesOfUpNodes(clusterState, NodeType.STORAGE);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java
new file mode 100644
index 00000000000..2e5d99f2e67
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/EventDiffCalculator.java
@@ -0,0 +1,143 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.distribution.ConfiguredNode;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
+import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vdslib.state.State;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Responsible for inferring the difference between two cluster states and their
+ * state annotations and producing a set of events that describe the changes between
+ * the two. Diffing the states directly provides a clear picture of _what_ has changed,
+ * while the annotations are generally required to explain _why_ the changes happened
+ * in the first place.
+ *
+ * Events are primarily used for administrative/user visibility into what's happening
+ * in the cluster and are output to the Vespa log as well as kept in a circular history
+ * buffer per node and for the cluster as a whole.
+ */
+public class EventDiffCalculator {
+
+ static class Params {
+ ContentCluster cluster;
+ AnnotatedClusterState fromState;
+ AnnotatedClusterState toState;
+ long currentTime;
+
+ public Params cluster(ContentCluster cluster) {
+ this.cluster = cluster;
+ return this;
+ }
+ public Params fromState(AnnotatedClusterState clusterState) {
+ this.fromState = clusterState;
+ return this;
+ }
+ public Params toState(AnnotatedClusterState clusterState) {
+ this.toState = clusterState;
+ return this;
+ }
+ public Params currentTimeMs(long time) {
+ this.currentTime = time;
+ return this;
+ }
+ }
+
+ public static List<Event> computeEventDiff(final Params params) {
+ final List<Event> events = new ArrayList<>();
+
+ emitPerNodeDiffEvents(params, events);
+ emitWholeClusterDiffEvent(params, events);
+ return events;
+ }
+
+ private static ClusterEvent createClusterEvent(String description, Params params) {
+ return new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, description, params.currentTime);
+ }
+
+ private static boolean clusterDownBecause(final Params params, ClusterStateReason wantedReason) {
+ final Optional<ClusterStateReason> actualReason = params.toState.getClusterStateReason();
+ return actualReason.isPresent() && actualReason.get().equals(wantedReason);
+ }
+
+ private static void emitWholeClusterDiffEvent(final Params params, final List<Event> events) {
+ final ClusterState fromState = params.fromState.getClusterState();
+ final ClusterState toState = params.toState.getClusterState();
+
+ if (clusterHasTransitionedToUpState(fromState, toState)) {
+ events.add(createClusterEvent("Enough nodes available for system to become up", params));
+ } else if (clusterHasTransitionedToDownState(fromState, toState)) {
+ if (clusterDownBecause(params, ClusterStateReason.TOO_FEW_STORAGE_NODES_AVAILABLE)) {
+ events.add(createClusterEvent("Too few storage nodes available in cluster. Setting cluster state down", params));
+ } else if (clusterDownBecause(params, ClusterStateReason.TOO_FEW_DISTRIBUTOR_NODES_AVAILABLE)) {
+ events.add(createClusterEvent("Too few distributor nodes available in cluster. Setting cluster state down", params));
+ } else if (clusterDownBecause(params, ClusterStateReason.TOO_LOW_AVAILABLE_STORAGE_NODE_RATIO)) {
+ events.add(createClusterEvent("Too low ratio of available storage nodes. Setting cluster state down", params));
+ } else if (clusterDownBecause(params, ClusterStateReason.TOO_LOW_AVAILABLE_DISTRIBUTOR_NODE_RATIO)) {
+ events.add(createClusterEvent("Too low ratio of available distributor nodes. Setting cluster state down", params));
+ } else {
+ events.add(createClusterEvent("Cluster is down", params));
+ }
+ }
+ }
+
+ private static NodeEvent createNodeEvent(NodeInfo nodeInfo, String description, Params params) {
+ return new NodeEvent(nodeInfo, description, NodeEvent.Type.CURRENT, params.currentTime);
+ }
+
+ private static void emitPerNodeDiffEvents(final Params params, final List<Event> events) {
+ final ContentCluster cluster = params.cluster;
+ final ClusterState fromState = params.fromState.getClusterState();
+ final ClusterState toState = params.toState.getClusterState();
+
+ for (ConfiguredNode node : cluster.getConfiguredNodes().values()) {
+ for (NodeType nodeType : NodeType.getTypes()) {
+ final Node n = new Node(nodeType, node.index());
+ emitSingleNodeEvents(params, events, cluster, fromState, toState, n);
+ }
+ }
+ }
+
+ private static void emitSingleNodeEvents(Params params, List<Event> events, ContentCluster cluster, ClusterState fromState, ClusterState toState, Node n) {
+ final NodeState nodeFrom = fromState.getNodeState(n);
+ final NodeState nodeTo = toState.getNodeState(n);
+ if (!nodeTo.equals(nodeFrom)) {
+ final NodeInfo info = cluster.getNodeInfo(n);
+ events.add(createNodeEvent(info, String.format("Altered node state in cluster state from '%s' to '%s'",
+ nodeFrom.toString(true), nodeTo.toString(true)), params));
+
+ NodeStateReason prevReason = params.fromState.getNodeStateReasons().get(n);
+ NodeStateReason currReason = params.toState.getNodeStateReasons().get(n);
+ if (isGroupDownEdge(prevReason, currReason)) {
+ events.add(createNodeEvent(info, "Group node availability is below configured threshold", params));
+ } else if (isGroupUpEdge(prevReason, currReason)) {
+ events.add(createNodeEvent(info, "Group node availability has been restored", params));
+ }
+ }
+ }
+
+ private static boolean isGroupUpEdge(NodeStateReason prevReason, NodeStateReason currReason) {
+ return prevReason == NodeStateReason.GROUP_IS_DOWN && currReason != NodeStateReason.GROUP_IS_DOWN;
+ }
+
+ private static boolean isGroupDownEdge(NodeStateReason prevReason, NodeStateReason currReason) {
+ return prevReason != NodeStateReason.GROUP_IS_DOWN && currReason == NodeStateReason.GROUP_IS_DOWN;
+ }
+
+ private static boolean clusterHasTransitionedToUpState(ClusterState prevState, ClusterState currentState) {
+ return prevState.getClusterState() != State.UP && currentState.getClusterState() == State.UP;
+ }
+
+ private static boolean clusterHasTransitionedToDownState(ClusterState prevState, ClusterState currentState) {
+ return prevState.getClusterState() != State.DOWN && currentState.getClusterState() == State.DOWN;
+ }
+
+ public static Params params() { return new Params(); }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index ceeeddf49fa..b21cae4ed71 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -7,6 +7,7 @@ import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
import com.yahoo.vespa.clustercontroller.core.listeners.*;
@@ -37,8 +38,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private final ContentCluster cluster;
private final Communicator communicator;
private final NodeStateGatherer stateGatherer;
- private final SystemStateGenerator systemStateGenerator;
+ private final StateChangeHandler stateChangeHandler;
private final SystemStateBroadcaster systemStateBroadcaster;
+ private final StateVersionTracker stateVersionTracker;
private final StatusPageServerInterface statusPageServer;
private final RpcServer rpcServer;
private final DatabaseHandler database;
@@ -59,7 +61,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private final List<com.yahoo.vdslib.state.ClusterState> newStates = new ArrayList<>();
private long configGeneration = -1;
private long nextConfigGeneration = -1;
- private List<RemoteClusterControllerTask> remoteTasks = new ArrayList<>();
+ private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>();
private final MetricUpdater metricUpdater;
private boolean isMaster = false;
@@ -69,7 +71,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
- public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return systemStateGenerator.getClusterState(); }
+ public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@Override
public FleetControllerOptions getOptions() { return options; }
@Override
@@ -87,7 +89,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
RpcServer server,
NodeLookup nodeLookup,
DatabaseHandler database,
- SystemStateGenerator systemStateGenerator,
+ StateChangeHandler stateChangeHandler,
SystemStateBroadcaster systemStateBroadcaster,
MasterElectionHandler masterElectionHandler,
MetricUpdater metricUpdater,
@@ -103,8 +105,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
this.communicator = communicator;
this.database = database;
this.stateGatherer = nodeStateGatherer;
- this.systemStateGenerator = systemStateGenerator;
+ this.stateChangeHandler = stateChangeHandler;
this.systemStateBroadcaster = systemStateBroadcaster;
+ this.stateVersionTracker = new StateVersionTracker(metricUpdater);
this.metricUpdater = metricUpdater;
this.statusPageServer = statusPage;
@@ -120,12 +123,12 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
new NodeHealthRequestHandler(dataExtractor));
this.statusRequestRouter.addHandler(
"^/clusterstate",
- new ClusterStateRequestHandler(systemStateGenerator));
+ new ClusterStateRequestHandler(stateVersionTracker));
this.statusRequestRouter.addHandler(
"^/$",
new LegacyIndexPageRequestHandler(
timer, options.showLocalSystemStatesInEventLog, cluster,
- masterElectionHandler, systemStateGenerator,
+ masterElectionHandler, stateVersionTracker,
eventLog, timer.getCurrentTimeInMillis(), dataExtractor));
propagateOptions();
@@ -169,7 +172,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.nodeStateRequestRoundTripTimeMaxSeconds);
DatabaseHandler database = new DatabaseHandler(timer, options.zooKeeperServerAddress, options.fleetControllerIndex, timer);
NodeLookup lookUp = new SlobrokClient(timer);
- SystemStateGenerator stateGenerator = new SystemStateGenerator(timer, log, metricUpdater);
+ StateChangeHandler stateGenerator = new StateChangeHandler(timer, log, metricUpdater);
SystemStateBroadcaster stateBroadcaster = new SystemStateBroadcaster(timer, timer);
MasterElectionHandler masterElectionHandler = new MasterElectionHandler(options.fleetControllerIndex, options.fleetControllerCount, timer, timer);
FleetController controller = new FleetController(
@@ -246,7 +249,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
public com.yahoo.vdslib.state.ClusterState getSystemState() {
synchronized(monitor) {
- return systemStateGenerator.getClusterState();
+ return stateVersionTracker.getVersionedClusterState();
}
}
@@ -299,41 +302,41 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
@Override
public void handleNewNodeState(NodeInfo node, NodeState newState) {
verifyInControllerThread();
- systemStateGenerator.handleNewReportedNodeState(node, newState, this);
+ stateChangeHandler.handleNewReportedNodeState(stateVersionTracker.getVersionedClusterState(), node, newState, this);
}
@Override
public void handleNewWantedNodeState(NodeInfo node, NodeState newState) {
verifyInControllerThread();
wantedStateChanged = true;
- systemStateGenerator.proposeNewNodeState(node, newState);
+ stateChangeHandler.proposeNewNodeState(stateVersionTracker.getVersionedClusterState(), node, newState);
}
@Override
public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo newHostInfo) {
verifyInControllerThread();
- systemStateGenerator.handleUpdatedHostInfo(nodeInfo, newHostInfo);
+ stateVersionTracker.handleUpdatedHostInfo(stateChangeHandler.getHostnames(), nodeInfo, newHostInfo);
}
@Override
public void handleNewNode(NodeInfo node) {
verifyInControllerThread();
- systemStateGenerator.handleNewNode(node);
+ stateChangeHandler.handleNewNode(node);
}
@Override
public void handleMissingNode(NodeInfo node) {
verifyInControllerThread();
- systemStateGenerator.handleMissingNode(node, this);
+ stateChangeHandler.handleMissingNode(stateVersionTracker.getVersionedClusterState(), node, this);
}
@Override
public void handleNewRpcAddress(NodeInfo node) {
verifyInControllerThread();
- systemStateGenerator.handleNewRpcAddress(node);
+ stateChangeHandler.handleNewRpcAddress(node);
}
@Override
public void handleReturnedRpcAddress(NodeInfo node) {
verifyInControllerThread();
- systemStateGenerator.handleReturnedRpcAddress(node);
+ stateChangeHandler.handleReturnedRpcAddress(node);
}
public void handleNewSystemState(com.yahoo.vdslib.state.ClusterState state) {
@@ -370,7 +373,9 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
/** Called when all distributors have acked newest cluster state version. */
public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException {
- systemStateGenerator.handleAllDistributorsInSync(database, context);
+ Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values());
+ stateChangeHandler.handleAllDistributorsInSync(
+ stateVersionTracker.getVersionedClusterState(), nodes, database, context);
}
private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) {
@@ -409,17 +414,11 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
database.setZooKeeperSessionTimeout(options.zooKeeperSessionTimeout);
stateGatherer.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod);
stateGatherer.setNodeStateRequestTimeout(options.nodeStateRequestTimeoutMS);
- systemStateGenerator.setNodes(cluster.clusterInfo());
- systemStateGenerator.setMaxTransitionTime(options.maxTransitionTime);
- systemStateGenerator.setMaxInitProgressTime(options.maxInitProgressTime);
- systemStateGenerator.setMaxPrematureCrashes(options.maxPrematureCrashes);
- systemStateGenerator.setStableStateTimePeriod(options.stableStateTimePeriod);
- systemStateGenerator.setMinNodesUp(options.minDistributorNodesUp, options.minStorageNodesUp,
- options.minRatioOfDistributorNodesUp, options.minRatioOfStorageNodesUp);
- systemStateGenerator.setMinNodeRatioPerGroup(options.minNodeRatioPerGroup);
- systemStateGenerator.setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod);
- systemStateGenerator.setDistributionBits(options.distributionBits);
- systemStateGenerator.setDistribution(options.storageDistribution);
+
+ // TODO: remove as many temporal parameter dependencies as possible here. Currently duplication of state.
+ stateChangeHandler.reconfigureFromOptions(options);
+ stateChangeHandler.setStateChangedFlag(); // Always trigger state recomputation after reconfig
+
masterElectionHandler.setFleetControllerCount(options.fleetControllerCount);
masterElectionHandler.setMasterZooKeeperCooldownPeriod(options.masterZooKeeperCooldownPeriod);
@@ -491,7 +490,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
didWork = database.doNextZooKeeperTask(databaseContext);
didWork |= updateMasterElectionState();
didWork |= handleLeadershipEdgeTransitions();
- systemStateGenerator.setMaster(isMaster);
+ stateChangeHandler.setMaster(isMaster);
// Process zero or more getNodeState responses that we have received.
didWork |= stateGatherer.processResponses(this);
@@ -510,10 +509,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
didWork |= processAnyPendingStatusPageRequest();
if (rpcServer != null) {
- didWork |= rpcServer.handleRpcRequests(cluster, systemStateGenerator.getClusterState(), this, this);
+ didWork |= rpcServer.handleRpcRequests(cluster, consolidatedClusterState(), this, this);
}
- processAllQueuedRemoteTasks();
+ didWork |= processNextQueuedRemoteTask();
processingCycle = false;
++cycleCount;
@@ -606,25 +605,52 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
}
- private void processAllQueuedRemoteTasks() {
+ private boolean processNextQueuedRemoteTask() {
if ( ! remoteTasks.isEmpty()) {
- RemoteClusterControllerTask.Context context = new RemoteClusterControllerTask.Context();
- context.cluster = cluster;
- context.currentState = systemStateGenerator.getConsolidatedClusterState();
- context.masterInfo = masterElectionHandler;
- context.nodeStateOrHostInfoChangeHandler = this;
- context.nodeAddedOrRemovedListener = this;
- for (RemoteClusterControllerTask task : remoteTasks) {
- log.finest("Processing remote task " + task.getClass().getName());
- task.doRemoteFleetControllerTask(context);
- task.notifyCompleted();
- log.finest("Done processing remote task " + task.getClass().getName());
- }
- log.fine("Completed processing remote tasks");
- remoteTasks.clear();
+ final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext();
+ final RemoteClusterControllerTask task = remoteTasks.poll();
+ log.finest("Processing remote task " + task.getClass().getName());
+ task.doRemoteFleetControllerTask(context);
+ task.notifyCompleted();
+ log.finest("Done processing remote task " + task.getClass().getName());
+ return true;
}
+ return false;
}
+ private RemoteClusterControllerTask.Context createRemoteTaskProcessingContext() {
+ final RemoteClusterControllerTask.Context context = new RemoteClusterControllerTask.Context();
+ context.cluster = cluster;
+ context.currentState = consolidatedClusterState();
+ context.masterInfo = masterElectionHandler;
+ context.nodeStateOrHostInfoChangeHandler = this;
+ context.nodeAddedOrRemovedListener = this;
+ return context;
+ }
+
+ /**
+ * A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are
+ * up or down even when the whole cluster is down. The regular, published cluster state is not
+ * normally updated to reflect node events when the cluster is down.
+ */
+ ClusterState consolidatedClusterState() {
+ final ClusterState publishedState = stateVersionTracker.getVersionedClusterState();
+ if (publishedState.getClusterState() == State.UP) {
+ return publishedState; // Short-circuit; already represents latest node state
+ }
+ // Latest candidate state contains the most up to date state information, even if it may not
+ // have been published yet.
+ final ClusterState current = stateVersionTracker.getLatestCandidateState().getClusterState().clone();
+ current.setVersion(publishedState.getVersion());
+ return current;
+ }
+
+ /*
+ System test observations:
+ - a node that stops normally (U -> S) then goes down erroneously triggers premature crash handling
+ - long time before content node state convergence (though this seems to be the case for legacy impl as well)
+ */
+
private boolean resyncLocallyCachedState() throws InterruptedException {
boolean didWork = false;
// Let non-master state gatherers update wanted states once in a while, so states generated and shown are close to valid.
@@ -637,31 +663,99 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Send getNodeState requests to zero or more nodes.
didWork |= stateGatherer.sendMessages(cluster, communicator, this);
- didWork |= systemStateGenerator.watchTimers(cluster, this);
- didWork |= systemStateGenerator.notifyIfNewSystemState(cluster, this);
+ // Important: timer events must use a consolidated state, or they might trigger edge events multiple times.
+ didWork |= stateChangeHandler.watchTimers(cluster, consolidatedClusterState(), this);
+
+ didWork |= recomputeClusterStateIfRequired();
if ( ! isStateGatherer) {
if ( ! isMaster) {
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became node state gatherer as we are fleetcontroller master candidate.", timer.getCurrentTimeInMillis()));
// Update versions to use so what is shown is closer to what is reality on the master
- systemStateGenerator.setLatestSystemStateVersion(database.getLatestSystemStateVersion());
+ stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
}
}
isStateGatherer = true;
return didWork;
}
+ private boolean recomputeClusterStateIfRequired() {
+ if (mustRecomputeCandidateClusterState()) {
+ stateChangeHandler.unsetStateChangedFlag();
+ final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
+ stateVersionTracker.updateLatestCandidateState(candidate);
+
+ if (stateVersionTracker.candidateChangedEnoughFromCurrentToWarrantPublish()
+ || stateVersionTracker.hasReceivedNewVersionFromZooKeeper())
+ {
+ final long timeNowMs = timer.getCurrentTimeInMillis();
+ final AnnotatedClusterState before = stateVersionTracker.getAnnotatedVersionedClusterState();
+
+ stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
+ emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
+ handleNewSystemState(stateVersionTracker.getVersionedClusterState());
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private AnnotatedClusterState computeCurrentAnnotatedState() {
+ ClusterStateGenerator.Params params = ClusterStateGenerator.Params.fromOptions(options);
+ params.currentTimeInMilllis(timer.getCurrentTimeInMillis())
+ .cluster(cluster)
+ .lowestObservedDistributionBitCount(stateVersionTracker.getLowestObservedDistributionBits());
+ return ClusterStateGenerator.generatedStateFrom(params);
+ }
+
+ private void emitEventsForAlteredStateEdges(final AnnotatedClusterState fromState,
+ final AnnotatedClusterState toState,
+ final long timeNowMs) {
+ final List<Event> deltaEvents = EventDiffCalculator.computeEventDiff(
+ EventDiffCalculator.params()
+ .cluster(cluster)
+ .fromState(fromState)
+ .toState(toState)
+ .currentTimeMs(timeNowMs));
+ for (Event event : deltaEvents) {
+ eventLog.add(event, isMaster);
+ }
+
+ emitStateAppliedEvents(timeNowMs, fromState.getClusterState(), toState.getClusterState());
+ }
+
+ private void emitStateAppliedEvents(long timeNowMs, ClusterState fromClusterState, ClusterState toClusterState) {
+ eventLog.add(new ClusterEvent(
+ ClusterEvent.Type.SYSTEMSTATE,
+ "New cluster state version " + toClusterState.getVersion() + ". Change from last: " +
+ fromClusterState.getTextualDifference(toClusterState),
+ timeNowMs), isMaster);
+
+ if (toClusterState.getDistributionBitCount() != fromClusterState.getDistributionBitCount()) {
+ eventLog.add(new ClusterEvent(
+ ClusterEvent.Type.SYSTEMSTATE,
+ "Altering distribution bits in system from "
+ + fromClusterState.getDistributionBitCount() + " to " +
+ toClusterState.getDistributionBitCount(),
+ timeNowMs), isMaster);
+ }
+ }
+
+ private boolean mustRecomputeCandidateClusterState() {
+ return stateChangeHandler.stateMayHaveChanged() || stateVersionTracker.hasReceivedNewVersionFromZooKeeper();
+ }
+
private boolean handleLeadershipEdgeTransitions() throws InterruptedException {
boolean didWork = false;
if (masterElectionHandler.isMaster()) {
if ( ! isMaster) {
metricUpdater.becameMaster();
// If we just became master, restore wanted states from database
- systemStateGenerator.setLatestSystemStateVersion(database.getLatestSystemStateVersion());
+ stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion());
didWork = database.loadStartTimestamps(cluster);
didWork |= database.loadWantedStates(databaseContext);
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node just became fleetcontroller master. Bumped version to "
- + systemStateGenerator.getClusterState().getVersion() + " to be in line.", timer.getCurrentTimeInMillis()));
+ + stateVersionTracker.getCurrentVersion() + " to be in line.", timer.getCurrentTimeInMillis()));
long currentTime = timer.getCurrentTimeInMillis();
firstAllowedStateBroadcast = currentTime + options.minTimeBeforeFirstSystemStateBroadcast;
log.log(LogLevel.DEBUG, "At time " + currentTime + " we set first system state broadcast time to be "
@@ -693,6 +787,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
} catch (InterruptedException e) {
log.log(LogLevel.DEBUG, "Event thread stopped by interrupt exception: " + e);
} catch (Throwable t) {
+ t.printStackTrace();
log.log(LogLevel.ERROR, "Fatal error killed fleet controller", t);
synchronized (monitor) { running = false; }
System.exit(1);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java
index 74b15b61ac3..e24e5f6914e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/GroupAvailabilityCalculator.java
@@ -10,6 +10,7 @@ import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vdslib.state.State;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.stream.Stream;
@@ -105,6 +106,9 @@ class GroupAvailabilityCalculator {
}
public Set<Integer> nodesThatShouldBeDown(ClusterState state) {
+ if (distribution == null) { // FIXME: for tests that don't set distribution properly!
+ return Collections.emptySet();
+ }
if (isFlatCluster(distribution.getRootGroup())) {
// Implicit group takedown only applies to hierarchic cluster setups.
return new HashSet<>();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java
index 6c48bdf12d0..1a48b088ca3 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MasterElectionHandler.java
@@ -240,7 +240,7 @@ public class MasterElectionHandler implements MasterInterface {
.append(".</p>");
} else if (masterGoneFromZooKeeperTime + masterZooKeeperCooldownPeriod > timer.getCurrentTimeInMillis()) {
long time = timer.getCurrentTimeInMillis() - masterGoneFromZooKeeperTime;
- sb.append("<p>There is currently no master. Only " + (time / 1000) + " seconds have past since")
+ sb.append("<p>There is currently no master. Only " + (time / 1000) + " seconds have passed since")
.append(" old master disappeared. At least " + (masterZooKeeperCooldownPeriod / 1000) + " must pass")
.append(" before electing new master unless all possible master candidates are online.</p>");
}
@@ -249,7 +249,7 @@ public class MasterElectionHandler implements MasterInterface {
sb.append("<p>As we are number ").append(nextInLineCount)
.append(" in line for taking over as master, we're gathering state from nodes.</p>");
sb.append("<p><font color=\"red\">As we are not the master, we don't know about nodes current system state"
- + " or wanted states, so some statistics below are a bit incorrect. Look at status page on master "
+ + " or wanted states, so some statistics below may be stale. Look at status page on master "
+ "for updated data.</font></p>");
}
if (index * 2 > totalCount) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java
index d9d83c705b1..944cbd02082 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeEvent.java
@@ -45,4 +45,8 @@ public class NodeEvent implements Event {
public String getCategory() {
return type.toString();
}
+
+ public Type getType() {
+ return type;
+ }
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index c261a4bb194..87a32e1e088 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -35,6 +35,18 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
/** Whether this node has been configured to be retired and should therefore always return retired as its wanted state */
private boolean configuredRetired;
+ /**
+ * Node has been observed transitioning from Init to Down at least once during the last "premature crash count"
+ * period. Gets reset whenever the crash count is reset to zero after a period of stability.
+ *
+ * Flag can also be explicitly toggled by external code, such as if a reported node state
+ * handler discovers "reverse" init progress. This indicates a "silent" down edge and should be
+ * handled as such.
+ *
+ * It is an explicit choice that we only do this on an edge to Down (and not Stopping). Stopping implies
+ * an administrative action, not that the node itself is unstable.
+ */
+ private boolean recentlyObservedUnstableDuringInit;
/** The time we set the current state last. */
private long nextAttemptTime;
@@ -97,6 +109,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
this.version = getLatestVersion();
this.connectionVersion = getLatestVersion();
this.configuredRetired = configuredRetired;
+ this.recentlyObservedUnstableDuringInit = false;
this.rpcAddress = rpcAddress;
this.lastSeenInSlobrok = null;
this.nextAttemptTime = 0;
@@ -132,7 +145,17 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public int getConnectionAttemptCount() { return connectionAttemptCount; }
+ public boolean recentlyObservedUnstableDuringInit() {
+ return recentlyObservedUnstableDuringInit;
+ }
+ public void setRecentlyObservedUnstableDuringInit(boolean unstable) {
+ recentlyObservedUnstableDuringInit = unstable;
+ }
+
public void setPrematureCrashCount(int count) {
+ if (count == 0) {
+ recentlyObservedUnstableDuringInit = false;
+ }
if (prematureCrashCount != count) {
prematureCrashCount = count;
log.log(LogLevel.DEBUG, "Premature crash count on " + toString() + " set to " + count);
@@ -213,6 +236,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public ContentCluster getCluster() { return cluster; }
/** Returns true if the node is currentl registered in slobrok */
+ // FIXME why is this called "isRpcAddressOutdated" then???
public boolean isRpcAddressOutdated() { return lastSeenInSlobrok != null; }
public Long getRpcAddressOutdatedTimestamp() { return lastSeenInSlobrok; }
@@ -277,8 +301,10 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (state.getState().equals(State.DOWN) && !reportedState.getState().oneOf("d")) {
downStableStateTime = time;
log.log(LogLevel.DEBUG, "Down stable state on " + toString() + " altered to " + time);
- }
- else if (state.getState().equals(State.UP) && !reportedState.getState().oneOf("u")) {
+ if (reportedState.getState() == State.INITIALIZING) {
+ recentlyObservedUnstableDuringInit = true;
+ }
+ } else if (state.getState().equals(State.UP) && !reportedState.getState().oneOf("u")) {
upStableStateTime = time;
log.log(LogLevel.DEBUG, "Up stable state on " + toString() + " altered to " + time);
}
@@ -403,7 +429,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public void setSystemStateVersionSent(ClusterState state) {
if (state == null) throw new Error("Should not clear info for last version sent");
if (systemStateVersionSent.containsKey(state.getVersion())) {
- throw new IllegalStateException("We have already sent cluster state version " + version + " to " + node);
+ throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node);
}
systemStateVersionSent.put(state.getVersion(), state);
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java
new file mode 100644
index 00000000000..da338626d5d
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeStateReason.java
@@ -0,0 +1,10 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+public enum NodeStateReason {
+ // FIXME some of these reasons may be unnecessary as they are reported implicitly by reported/wanted state changes
+ NODE_TOO_UNSTABLE,
+ WITHIN_MAINTENANCE_GRACE_PERIOD,
+ FORCED_INTO_MAINTENANCE,
+ GROUP_IS_DOWN
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java
new file mode 100644
index 00000000000..83ba274c422
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateChangeHandler.java
@@ -0,0 +1,530 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.jrt.Spec;
+import com.yahoo.log.LogLevel;
+import com.yahoo.vdslib.distribution.ConfiguredNode;
+import com.yahoo.vdslib.state.*;
+import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
+
+import java.util.*;
+import java.util.logging.Logger;
+
+/**
+ * This class gets node state updates and timer events and uses these to decide
+ * whether a new cluster state should be generated.
+ *
+ * TODO refactor logic out into smaller, separate components. Still state duplication
+ * between ClusterStateGenerator and StateChangeHandler, especially for temporal
+ * state transition configuration parameters.
+ */
+public class StateChangeHandler {
+
+ private static Logger log = Logger.getLogger(StateChangeHandler.class.getName());
+
+ private final Timer timer;
+ private final EventLogInterface eventLog;
+ private boolean stateMayHaveChanged = false;
+ private boolean isMaster = false;
+
+ private Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
+ private int maxInitProgressTime = 5000;
+ private int maxPrematureCrashes = 4;
+ private long stableStateTimePeriod = 60 * 60 * 1000;
+ private Map<Integer, String> hostnames = new HashMap<>();
+ private int maxSlobrokDisconnectGracePeriod = 1000;
+ private static final boolean disableUnstableNodes = true;
+
+ /**
+ * @param metricUpdater may be null, in which case no metrics will be recorded.
+ */
+ public StateChangeHandler(Timer timer, EventLogInterface eventLog, MetricUpdater metricUpdater) {
+ this.timer = timer;
+ this.eventLog = eventLog;
+ maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000);
+ maxTransitionTime.put(NodeType.STORAGE, 5000);
+ }
+
+ public void handleAllDistributorsInSync(final ClusterState currentState,
+ final Set<ConfiguredNode> nodes,
+ final DatabaseHandler database,
+ final DatabaseHandler.Context dbContext) throws InterruptedException {
+ int startTimestampsReset = 0;
+ log.log(LogLevel.DEBUG, String.format("handleAllDistributorsInSync invoked for state version %d", currentState.getVersion()));
+ for (NodeType nodeType : NodeType.getTypes()) {
+ for (ConfiguredNode configuredNode : nodes) {
+ final Node node = new Node(nodeType, configuredNode.index());
+ final NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node);
+ final NodeState nodeState = currentState.getNodeState(node);
+ if (nodeInfo != null && nodeState != null) {
+ if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, String.format("Storing away new start timestamp for node %s (%d)",
+ node, nodeState.getStartTimestamp()));
+ }
+ nodeInfo.setStartTimestamp(nodeState.getStartTimestamp());
+ }
+ if (nodeState.getStartTimestamp() > 0) {
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, String.format("Resetting timestamp in cluster state for node %s", node));
+ }
+ ++startTimestampsReset;
+ }
+ } else if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, node + ": " +
+ (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " +
+ (nodeState == null ? "null" : nodeState.getStartTimestamp()));
+ }
+ }
+ }
+ if (startTimestampsReset > 0) {
+ eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset +
+ " start timestamps as all available distributors have seen newest cluster state.",
+ timer.getCurrentTimeInMillis()));
+ stateMayHaveChanged = true;
+ database.saveStartTimestamps(dbContext);
+ } else {
+ log.log(LogLevel.DEBUG, "Found no start timestamps to reset in cluster state.");
+ }
+ }
+
+ public boolean stateMayHaveChanged() {
+ return stateMayHaveChanged;
+ }
+
+ public void setStateChangedFlag() { stateMayHaveChanged = true; }
+ public void unsetStateChangedFlag() {
+ stateMayHaveChanged = false;
+ }
+
+ public void setMaster(boolean isMaster) {
+ this.isMaster = isMaster;
+ }
+
+ public void setMaxTransitionTime(Map<NodeType, Integer> map) { maxTransitionTime = map; }
+ public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; }
+ public void setMaxSlobrokDisconnectGracePeriod(int millisecs) {
+ maxSlobrokDisconnectGracePeriod = millisecs;
+ }
+ public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; }
+ public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; }
+
+ // TODO nodeListener is only used via updateNodeInfoFromReportedState -> handlePrematureCrash
+ // TODO this will recursively invoke proposeNewNodeState, which will presumably (i.e. hopefully) be a no-op...
+ public void handleNewReportedNodeState(final ClusterState currentClusterState,
+ final NodeInfo node,
+ final NodeState reportedState,
+ final NodeStateOrHostInfoChangeHandler nodeListener)
+ {
+ final NodeState currentState = currentClusterState.getNodeState(node.getNode());
+ final LogLevel level = (currentState.equals(reportedState) && node.getVersion() == 0) ? LogLevel.SPAM : LogLevel.DEBUG;
+ if (log.isLoggable(level)) {
+ log.log(level, String.format("Got nodestate reply from %s: %s (Current state is %s)",
+ node, node.getReportedState().getTextualDifference(reportedState), currentState.toString(true)));
+ }
+ final long currentTime = timer.getCurrentTimeInMillis();
+
+ if (reportedState.getState().equals(State.DOWN)) {
+ node.setTimeOfFirstFailingConnectionAttempt(currentTime);
+ }
+
+ // *** LOGGING ONLY
+ if ( ! reportedState.similarTo(node.getReportedState())) {
+ if (reportedState.getState().equals(State.DOWN)) {
+ eventLog.addNodeOnlyEvent(new NodeEvent(node, "Failed to get node state: " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.INFO);
+ } else {
+ eventLog.addNodeOnlyEvent(new NodeEvent(node, "Now reporting state " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.DEBUG);
+ }
+ }
+
+ if (reportedState.equals(node.getReportedState()) && ! reportedState.getState().equals(State.INITIALIZING)) {
+ return;
+ }
+
+ updateNodeInfoFromReportedState(node, currentState, reportedState, nodeListener);
+
+ if (reportedState.getMinUsedBits() != currentState.getMinUsedBits()) {
+ final int oldCount = currentState.getMinUsedBits();
+ final int newCount = reportedState.getMinUsedBits();
+ log.log(LogLevel.DEBUG,
+ String.format("Altering node state to reflect that min distribution bit count has changed from %d to %d",
+ oldCount, newCount));
+ eventLog.add(new NodeEvent(node, String.format("Altered min distribution bit count from %d to %d", oldCount, newCount),
+ NodeEvent.Type.CURRENT, currentTime), isMaster);
+ } else if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, String.format("Not altering state of %s in cluster state because new state is too similar: %s",
+ node, currentState.getTextualDifference(reportedState)));
+ }
+
+ stateMayHaveChanged = true;
+ }
+
+ public void handleNewNode(NodeInfo node) {
+ setHostName(node);
+ String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress();
+ eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
+ }
+
+ public void handleMissingNode(final ClusterState currentClusterState,
+ final NodeInfo node,
+ final NodeStateOrHostInfoChangeHandler nodeListener)
+ {
+ removeHostName(node);
+
+ final long timeNow = timer.getCurrentTimeInMillis();
+
+ if (node.getLatestNodeStateRequestTime() != null) {
+ eventLog.add(new NodeEvent(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster);
+ } else {
+ eventLog.add(new NodeEvent(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster);
+ }
+
+ if (node.getReportedState().getState().equals(State.STOPPING)) {
+ log.log(LogLevel.DEBUG, "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down");
+ NodeState ns = node.getReportedState().clone();
+ ns.setState(State.DOWN);
+ handleNewReportedNodeState(currentClusterState, node, ns.clone(), nodeListener);
+ } else {
+ log.log(LogLevel.DEBUG, "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok");
+ }
+
+ stateMayHaveChanged = true;
+ }
+
+ /**
+ * Propose a new state for a node. This may happen due to an administrator action, orchestration, or
+ * a configuration change.
+ *
+ * If the newly proposed state differs from the state the node currently has in the system,
+ * a cluster state regeneration will be triggered.
+ */
+ public void proposeNewNodeState(final ClusterState currentClusterState, final NodeInfo node, final NodeState proposedState) {
+ final NodeState currentState = currentClusterState.getNodeState(node.getNode());
+ final NodeState currentReported = node.getReportedState();
+
+ if (currentState.getState().equals(proposedState.getState())) {
+ return;
+ }
+ stateMayHaveChanged = true;
+
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, String.format("Got new wanted nodestate for %s: %s", node, currentState.getTextualDifference(proposedState)));
+ }
+ // Should be checked earlier before state was set in cluster
+ assert(proposedState.getState().validWantedNodeState(node.getNode().getType()));
+ long timeNow = timer.getCurrentTimeInMillis();
+ if (proposedState.above(currentReported)) {
+ eventLog.add(new NodeEvent(node, String.format("Wanted state %s, but we cannot force node into that " +
+ "state yet as it is currently in %s", proposedState, currentReported),
+ NodeEvent.Type.REPORTED, timeNow), isMaster);
+ return;
+ }
+ if ( ! proposedState.similarTo(currentState)) {
+ eventLog.add(new NodeEvent(node, String.format("Node state set to %s.", proposedState),
+ NodeEvent.Type.WANTED, timeNow), isMaster);
+ }
+ }
+
+ public void handleNewRpcAddress(NodeInfo node) {
+ setHostName(node);
+ String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress();
+ eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
+ }
+
+ public void handleReturnedRpcAddress(NodeInfo node) {
+ setHostName(node);
+ String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress();
+ eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
+ }
+
+ private void setHostName(NodeInfo node) {
+ String rpcAddress = node.getRpcAddress();
+ if (rpcAddress == null) {
+ // This may happen if we haven't seen the node in Slobrok yet.
+ return;
+ }
+
+ Spec address = new Spec(rpcAddress);
+ if (address.malformed()) {
+ return;
+ }
+
+ hostnames.put(node.getNodeIndex(), address.host());
+ }
+
+ void reconfigureFromOptions(FleetControllerOptions options) {
+ setMaxPrematureCrashes(options.maxPrematureCrashes);
+ setStableStateTimePeriod(options.stableStateTimePeriod);
+ setMaxInitProgressTime(options.maxInitProgressTime);
+ setMaxSlobrokDisconnectGracePeriod(options.maxSlobrokDisconnectGracePeriod);
+ setMaxTransitionTime(options.maxTransitionTime);
+ }
+
+ private void removeHostName(NodeInfo node) {
+ hostnames.remove(node.getNodeIndex());
+ }
+
+ Map<Integer, String> getHostnames() {
+ return Collections.unmodifiableMap(hostnames);
+ }
+
+ // TODO too many hidden behavior dependencies between this and the actually
+ // generated cluster state. Still a bit of a mine field...
+ // TODO remove all node state mutation from this function entirely in favor of ClusterStateGenerator!
+ // `--> this will require adding more event edges and premature crash handling to it. Which is fine.
+ public boolean watchTimers(final ContentCluster cluster,
+ final ClusterState currentClusterState,
+ final NodeStateOrHostInfoChangeHandler nodeListener)
+ {
+ boolean triggeredAnyTimers = false;
+ final long currentTime = timer.getCurrentTimeInMillis();
+
+ for(NodeInfo node : cluster.getNodeInfo()) {
+ triggeredAnyTimers |= handleTimeDependentOpsForNode(currentClusterState, nodeListener, currentTime, node);
+ }
+
+ if (triggeredAnyTimers) {
+ stateMayHaveChanged = true;
+ }
+ return triggeredAnyTimers;
+ }
+
+ private boolean handleTimeDependentOpsForNode(final ClusterState currentClusterState,
+ final NodeStateOrHostInfoChangeHandler nodeListener,
+ final long currentTime,
+ final NodeInfo node)
+ {
+ final NodeState currentStateInSystem = currentClusterState.getNodeState(node.getNode());
+ final NodeState lastReportedState = node.getReportedState();
+ boolean triggeredAnyTimers = false;
+
+ triggeredAnyTimers = reportDownIfOutdatedSlobrokNode(
+ currentClusterState, nodeListener, currentTime, node, lastReportedState);
+
+ if (nodeStillUnavailableAfterTransitionTimeExceeded(
+ currentTime, node, currentStateInSystem, lastReportedState))
+ {
+ eventLog.add(new NodeEvent(node, String.format(
+ "%d milliseconds without contact. Marking node down.",
+ currentTime - node.getTransitionTime()),
+ NodeEvent.Type.CURRENT, currentTime), isMaster);
+ triggeredAnyTimers = true;
+ }
+
+ if (nodeInitProgressHasTimedOut(currentTime, node, currentStateInSystem, lastReportedState)) {
+ eventLog.add(new NodeEvent(node, String.format(
+ "%d milliseconds without initialize progress. Marking node down. " +
+ "Premature crash count is now %d.",
+ currentTime - node.getInitProgressTime(),
+ node.getPrematureCrashCount() + 1),
+ NodeEvent.Type.CURRENT, currentTime), isMaster);
+ handlePrematureCrash(node, nodeListener);
+ triggeredAnyTimers = true;
+ }
+
+ if (mayResetCrashCounterOnStableUpNode(currentTime, node, lastReportedState)) {
+ node.setPrematureCrashCount(0);
+ log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been up for a long time.");
+ triggeredAnyTimers = true;
+ } else if (mayResetCrashCounterOnStableDownNode(currentTime, node, lastReportedState)) {
+ node.setPrematureCrashCount(0);
+ log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been down for a long time.");
+ triggeredAnyTimers = true;
+ }
+
+ return triggeredAnyTimers;
+ }
+
+ private boolean nodeInitProgressHasTimedOut(long currentTime, NodeInfo node, NodeState currentStateInSystem, NodeState lastReportedState) {
+ return !currentStateInSystem.getState().equals(State.DOWN)
+ && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN))
+ && lastReportedState.getState().equals(State.INITIALIZING)
+ && maxInitProgressTime != 0
+ && node.getInitProgressTime() + maxInitProgressTime <= currentTime
+ && node.getNode().getType().equals(NodeType.STORAGE);
+ }
+
+ private boolean mayResetCrashCounterOnStableDownNode(long currentTime, NodeInfo node, NodeState lastReportedState) {
+ return node.getDownStableStateTime() + stableStateTimePeriod <= currentTime
+ && lastReportedState.getState().equals(State.DOWN)
+ && node.getPrematureCrashCount() <= maxPrematureCrashes
+ && node.getPrematureCrashCount() != 0;
+ }
+
+ private boolean mayResetCrashCounterOnStableUpNode(long currentTime, NodeInfo node, NodeState lastReportedState) {
+ return node.getUpStableStateTime() + stableStateTimePeriod <= currentTime
+ && lastReportedState.getState().equals(State.UP)
+ && node.getPrematureCrashCount() <= maxPrematureCrashes
+ && node.getPrematureCrashCount() != 0;
+ }
+
+ private boolean nodeStillUnavailableAfterTransitionTimeExceeded(
+ long currentTime,
+ NodeInfo node,
+ NodeState currentStateInSystem,
+ NodeState lastReportedState)
+ {
+ return currentStateInSystem.getState().equals(State.MAINTENANCE)
+ && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN))
+ && (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated())
+ && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime;
+ }
+
+ private boolean reportDownIfOutdatedSlobrokNode(ClusterState currentClusterState,
+ NodeStateOrHostInfoChangeHandler nodeListener,
+ long currentTime,
+ NodeInfo node,
+ NodeState lastReportedState)
+ {
+ if (node.isRpcAddressOutdated()
+ && !lastReportedState.getState().equals(State.DOWN)
+ && node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime)
+ {
+ final String desc = String.format(
+ "Set node down as it has been out of slobrok for %d ms which " +
+ "is more than the max limit of %d ms.",
+ currentTime - node.getRpcAddressOutdatedTimestamp(),
+ maxSlobrokDisconnectGracePeriod);
+ node.abortCurrentNodeStateRequests();
+ NodeState state = lastReportedState.clone();
+ state.setState(State.DOWN);
+ if (!state.hasDescription()) {
+ state.setDescription(desc);
+ }
+ eventLog.add(new NodeEvent(node, desc, NodeEvent.Type.CURRENT, currentTime), isMaster);
+ handleNewReportedNodeState(currentClusterState, node, state.clone(), nodeListener);
+ node.setReportedState(state, currentTime);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean isControlledShutdown(NodeState state) {
+ return (state.getState() == State.STOPPING
+ && (state.getDescription().contains("Received signal 15 (SIGTERM - Termination signal)")
+ || state.getDescription().contains("controlled shutdown")));
+ }
+
+ /**
+ * Modify a node's cross-state information in the cluster based on a newly arrived reported state.
+ *
+ * @param node the node we are computing the state of
+ * @param currentState the current state of the node
+ * @param reportedState the new state reported by (or, in the case of down - inferred from) the node
+ * @param nodeListener this listener is notified for some of the system state changes that this will return
+ */
+ private void updateNodeInfoFromReportedState(final NodeInfo node,
+ final NodeState currentState,
+ final NodeState reportedState,
+ final NodeStateOrHostInfoChangeHandler nodeListener) {
+ final long timeNow = timer.getCurrentTimeInMillis();
+ if (log.isLoggable(LogLevel.DEBUG)) {
+ log.log(LogLevel.DEBUG, String.format("Finding new cluster state entry for %s switching state %s",
+ node, currentState.getTextualDifference(reportedState)));
+ }
+
+ if (handleReportedNodeCrashEdge(node, currentState, reportedState, nodeListener, timeNow)) {
+ return;
+ }
+ if (initializationProgressHasIncreased(currentState, reportedState)) {
+ node.setInitProgressTime(timeNow);
+ if (log.isLoggable(LogLevel.SPAM)) {
+ log.log(LogLevel.SPAM, "Reset initialize timer on " + node + " to " + node.getInitProgressTime());
+ }
+ }
+ if (handleImplicitCrashEdgeFromReverseInitProgress(node, currentState, reportedState, nodeListener, timeNow)) {
+ return;
+ }
+ markNodeUnstableIfDownEdgeDuringInit(node, currentState, reportedState, nodeListener, timeNow);
+ }
+
+ // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up.
+ private void markNodeUnstableIfDownEdgeDuringInit(final NodeInfo node,
+ final NodeState currentState,
+ final NodeState reportedState,
+ final NodeStateOrHostInfoChangeHandler nodeListener,
+ final long timeNow) {
+ if (currentState.getState().equals(State.INITIALIZING)
+ && reportedState.getState().oneOf("ds")
+ && !isControlledShutdown(reportedState))
+ {
+ eventLog.add(new NodeEvent(node, String.format("Stop or crash during initialization. " +
+ "Premature crash count is now %d.", node.getPrematureCrashCount() + 1),
+ NodeEvent.Type.CURRENT, timeNow), isMaster);
+ handlePrematureCrash(node, nodeListener);
+ }
+ }
+
+ // TODO do we need this when we have startup timestamps? at least it's unit tested.
+ // TODO this seems fairly contrived...
+ // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up.
+ private boolean handleImplicitCrashEdgeFromReverseInitProgress(final NodeInfo node,
+ final NodeState currentState,
+ final NodeState reportedState,
+ final NodeStateOrHostInfoChangeHandler nodeListener,
+ final long timeNow) {
+ if (currentState.getState().equals(State.INITIALIZING) &&
+ (reportedState.getState().equals(State.INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress()))
+ {
+ eventLog.add(new NodeEvent(node, String.format(
+ "Stop or crash during initialization detected from reverse initializing progress." +
+ " Progress was %g but is now %g. Premature crash count is now %d.",
+ currentState.getInitProgress(), reportedState.getInitProgress(),
+ node.getPrematureCrashCount() + 1),
+ NodeEvent.Type.CURRENT, timeNow), isMaster);
+ node.setRecentlyObservedUnstableDuringInit(true);
+ handlePrematureCrash(node, nodeListener);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean handleReportedNodeCrashEdge(NodeInfo node, NodeState currentState,
+ NodeState reportedState, NodeStateOrHostInfoChangeHandler nodeListener,
+ long timeNow) {
+ if (nodeUpToDownEdge(node, currentState, reportedState)) {
+ node.setTransitionTime(timeNow);
+ if (node.getUpStableStateTime() + stableStateTimePeriod > timeNow && !isControlledShutdown(reportedState)) {
+ log.log(LogLevel.DEBUG, "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + timeNow);
+ eventLog.add(new NodeEvent(node,
+ String.format("Stopped or possibly crashed after %d ms, which is before " +
+ "stable state time period. Premature crash count is now %d.",
+ timeNow - node.getUpStableStateTime(), node.getPrematureCrashCount() + 1),
+ NodeEvent.Type.CURRENT,
+ timeNow), isMaster);
+ if (handlePrematureCrash(node, nodeListener)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private boolean initializationProgressHasIncreased(NodeState currentState, NodeState reportedState) {
+ return reportedState.getState().equals(State.INITIALIZING) &&
+ (!currentState.getState().equals(State.INITIALIZING) ||
+ reportedState.getInitProgress() > currentState.getInitProgress());
+ }
+
+ private boolean nodeUpToDownEdge(NodeInfo node, NodeState currentState, NodeState reportedState) {
+ return currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis")
+ && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING));
+ }
+
+ private boolean handlePrematureCrash(NodeInfo node, NodeStateOrHostInfoChangeHandler changeListener) {
+ node.setPrematureCrashCount(node.getPrematureCrashCount() + 1);
+ if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) {
+ NodeState wantedState = new NodeState(node.getNode().getType(), State.DOWN)
+ .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row");
+ NodeState oldState = node.getWantedState();
+ node.setWantedState(wantedState);
+ if ( ! oldState.equals(wantedState)) {
+ changeListener.handleNewWantedNodeState(node, wantedState);
+ }
+ return true;
+ }
+ return false;
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
new file mode 100644
index 00000000000..f5a67ca9434
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/StateVersionTracker.java
@@ -0,0 +1,140 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Keeps track of the active cluster state and handles the transition edges between
+ * one state to the next. In particular, it ensures that states have strictly increasing
+ * version numbers.
+ *
+ * Wraps ClusterStateView to ensure its knowledge of available nodes stays up to date.
+ */
+public class StateVersionTracker {
+
+ // We always increment the version _before_ publishing, so the effective first cluster
+ // state version when starting from 1 will be 2. This matches legacy behavior and a bunch
+ // of existing tests expect it.
+ private int currentVersion = 1;
+ private int lastZooKeeperVersion = 0;
+
+ // The lowest published distribution bit count for the lifetime of this controller.
+ // TODO this mirrors legacy behavior, but should be moved into stable ZK state.
+ private int lowestObservedDistributionBits = 16;
+
+ private ClusterState currentUnversionedState = ClusterState.emptyState();
+ private AnnotatedClusterState latestCandidateState = AnnotatedClusterState.emptyState();
+ private AnnotatedClusterState currentClusterState = latestCandidateState;
+
+ private final MetricUpdater metricUpdater;
+ private ClusterStateView clusterStateView;
+
+ private final LinkedList<ClusterStateHistoryEntry> clusterStateHistory = new LinkedList<>();
+ private int maxHistoryEntryCount = 50;
+
+ StateVersionTracker(final MetricUpdater metricUpdater) {
+ this.metricUpdater = metricUpdater;
+ clusterStateView = ClusterStateView.create(currentUnversionedState, metricUpdater);
+ }
+
+ void setVersionRetrievedFromZooKeeper(final int version) {
+ this.currentVersion = Math.max(1, version);
+ this.lastZooKeeperVersion = this.currentVersion;
+ }
+
+ /**
+ * Sets limit on how many cluster states can be kept in the in-memory queue. Once
+ * the list exceeds this limit, the oldest state is repeatedly removed until the limit
+ * is no longer exceeded.
+ *
+ * Takes effect upon the next invocation of promoteCandidateToVersionedState().
+ */
+ void setMaxHistoryEntryCount(final int maxHistoryEntryCount) {
+ this.maxHistoryEntryCount = maxHistoryEntryCount;
+ }
+
+ int getCurrentVersion() {
+ return this.currentVersion;
+ }
+
+ boolean hasReceivedNewVersionFromZooKeeper() {
+ return currentVersion <= lastZooKeeperVersion;
+ }
+
+ int getLowestObservedDistributionBits() {
+ return lowestObservedDistributionBits;
+ }
+
+ AnnotatedClusterState getAnnotatedVersionedClusterState() {
+ return currentClusterState;
+ }
+
+ public ClusterState getVersionedClusterState() {
+ return currentClusterState.getClusterState();
+ }
+
+ public void updateLatestCandidateState(final AnnotatedClusterState candidate) {
+ assert(latestCandidateState.getClusterState().getVersion() == 0);
+ latestCandidateState = candidate;
+ }
+
+ /**
+ * Returns the last state provided to updateLatestCandidateState, which _may or may not_ be
+ * a published state. Primary use case for this function is a caller which is interested in
+ * changes that may not be reflected in the published state. The best example of this would
+ * be node state changes when a cluster is marked as Down.
+ */
+ public AnnotatedClusterState getLatestCandidateState() {
+ return latestCandidateState;
+ }
+
+ public List<ClusterStateHistoryEntry> getClusterStateHistory() {
+ return Collections.unmodifiableList(clusterStateHistory);
+ }
+
+ boolean candidateChangedEnoughFromCurrentToWarrantPublish() {
+ return !currentUnversionedState.similarToIgnoringInitProgress(latestCandidateState.getClusterState());
+ }
+
+ void promoteCandidateToVersionedState(final long currentTimeMs) {
+ final int newVersion = currentVersion + 1;
+ updateStatesForNewVersion(latestCandidateState, newVersion);
+ currentVersion = newVersion;
+
+ recordCurrentStateInHistoryAtTime(currentTimeMs);
+ }
+
+ private void updateStatesForNewVersion(final AnnotatedClusterState newState, final int newVersion) {
+ currentClusterState = new AnnotatedClusterState(
+ newState.getClusterState().clone(), // Because we mutate version below
+ newState.getClusterStateReason(),
+ newState.getNodeStateReasons());
+ currentClusterState.getClusterState().setVersion(newVersion);
+ currentUnversionedState = newState.getClusterState().clone();
+ lowestObservedDistributionBits = Math.min(
+ lowestObservedDistributionBits,
+ newState.getClusterState().getDistributionBitCount());
+ // TODO should this take place in updateLatestCandidateState instead? I.e. does it require a consolidated state?
+ clusterStateView = ClusterStateView.create(currentClusterState.getClusterState(), metricUpdater);
+ }
+
+ private void recordCurrentStateInHistoryAtTime(final long currentTimeMs) {
+ clusterStateHistory.addFirst(new ClusterStateHistoryEntry(
+ currentClusterState.getClusterState(), currentTimeMs));
+ while (clusterStateHistory.size() > maxHistoryEntryCount) {
+ clusterStateHistory.removeLast();
+ }
+ }
+
+ void handleUpdatedHostInfo(final Map<Integer, String> hostnames, final NodeInfo node, final HostInfo hostInfo) {
+ // TODO the wiring here isn't unit tested. Need mockable integration points.
+ clusterStateView.handleUpdatedHostInfo(hostnames, node, hostInfo);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java
deleted file mode 100644
index 7edff399633..00000000000
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateGenerator.java
+++ /dev/null
@@ -1,941 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.clustercontroller.core;
-
-import com.yahoo.jrt.Spec;
-import com.yahoo.log.LogLevel;
-import com.yahoo.vdslib.distribution.ConfiguredNode;
-import com.yahoo.vdslib.distribution.Distribution;
-import com.yahoo.vdslib.state.*;
-import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
-import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
-import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
-import com.yahoo.vespa.clustercontroller.core.listeners.SystemStateListener;
-
-import java.util.*;
-import java.util.logging.Logger;
-import java.text.ParseException;
-import java.util.stream.Collectors;
-
-/**
- * This class get node state updates and uses them to decide the cluster state.
- */
-// TODO: Remove all current state from this and make it rely on state from ClusterInfo instead
-// TODO: Do this ASAP! SystemStateGenerator should ideally behave as a pure function!
-public class SystemStateGenerator {
-
- private static Logger log = Logger.getLogger(SystemStateGenerator.class.getName());
-
- private final Timer timer;
- private final EventLogInterface eventLog;
- private ClusterStateView currentClusterStateView;
- private ClusterStateView nextClusterStateView;
- private Distribution distribution;
- private boolean nextStateViewChanged = false;
- private boolean isMaster = false;
-
- private Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
- private int maxInitProgressTime = 5000;
- private int maxPrematureCrashes = 4;
- private long stableStateTimePeriod = 60 * 60 * 1000;
- private static final int maxHistorySize = 50;
- private Set<ConfiguredNode> nodes;
- private Map<Integer, String> hostnames = new HashMap<>();
- private int minDistributorNodesUp = 1;
- private int minStorageNodesUp = 1;
- private double minRatioOfDistributorNodesUp = 0.50;
- private double minRatioOfStorageNodesUp = 0.50;
- private double minNodeRatioPerGroup = 0.0;
- private int maxSlobrokDisconnectGracePeriod = 1000;
- private int idealDistributionBits = 16;
- private static final boolean disableUnstableNodes = true;
-
- private final LinkedList<SystemStateHistoryEntry> systemStateHistory = new LinkedList<>();
-
- /**
- * @param metricUpdater may be null, in which case no metrics will be recorded.
- */
- public SystemStateGenerator(Timer timer, EventLogInterface eventLog, MetricUpdater metricUpdater) {
- try {
- currentClusterStateView = ClusterStateView.create("", metricUpdater);
- nextClusterStateView = ClusterStateView.create("", metricUpdater);
- } catch (ParseException e) {
- throw new RuntimeException("Parsing empty string should always work");
- }
- this.timer = timer;
- this.eventLog = eventLog;
- maxTransitionTime.put(NodeType.DISTRIBUTOR, 5000);
- maxTransitionTime.put(NodeType.STORAGE, 5000);
- }
-
- public void handleAllDistributorsInSync(DatabaseHandler database,
- DatabaseHandler.Context dbContext) throws InterruptedException {
- int startTimestampsReset = 0;
- for (NodeType nodeType : NodeType.getTypes()) {
- for (ConfiguredNode configuredNode : nodes) {
- Node node = new Node(nodeType, configuredNode.index());
- NodeInfo nodeInfo = dbContext.getCluster().getNodeInfo(node);
- NodeState nodeState = nextClusterStateView.getClusterState().getNodeState(node);
- if (nodeInfo != null && nodeState != null) {
- if (nodeState.getStartTimestamp() > nodeInfo.getStartTimestamp()) {
- log.log(LogLevel.DEBUG, "Storing away new start timestamp for node " + node);
- nodeInfo.setStartTimestamp(nodeState.getStartTimestamp());
- }
- if (nodeState.getStartTimestamp() > 0) {
- log.log(LogLevel.DEBUG, "Resetting timestamp in cluster state for node " + node);
- nodeState.setStartTimestamp(0);
- nextClusterStateView.getClusterState().setNodeState(node, nodeState);
- ++startTimestampsReset;
- }
- } else {
- log.log(LogLevel.DEBUG, node + ": " +
- (nodeInfo == null ? "null" : nodeInfo.getStartTimestamp()) + ", " +
- (nodeState == null ? "null" : nodeState.getStartTimestamp()));
- }
- }
- }
- if (startTimestampsReset > 0) {
- eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE, "Reset " + startTimestampsReset +
- " start timestamps as all available distributors have seen newest cluster state.", timer.getCurrentTimeInMillis()));
- nextStateViewChanged = true;
- database.saveStartTimestamps(dbContext);
- } else {
- log.log(LogLevel.DEBUG, "Found no start timestamps to reset in cluster state.");
- }
- }
-
- public void setMaxTransitionTime(Map<NodeType, Integer> map) { maxTransitionTime = map; }
- public void setMaxInitProgressTime(int millisecs) { maxInitProgressTime = millisecs; }
- public void setMaxPrematureCrashes(int count) { maxPrematureCrashes = count; }
- public void setStableStateTimePeriod(long millisecs) { stableStateTimePeriod = millisecs; }
-
- public ClusterStateView currentClusterStateView() { return currentClusterStateView; }
-
- /** Returns an immutable list of the historical states this has generated */
- public List<SystemStateHistoryEntry> systemStateHistory() {
- return Collections.unmodifiableList(systemStateHistory);
- }
-
- public void setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) {
- minDistributorNodesUp = minDistNodes;
- minStorageNodesUp = minStorNodes;
- minRatioOfDistributorNodesUp = minDistRatio;
- minRatioOfStorageNodesUp = minStorRatio;
- nextStateViewChanged = true;
- }
-
- public void setMinNodeRatioPerGroup(double upRatio) {
- this.minNodeRatioPerGroup = upRatio;
- nextStateViewChanged = true;
- }
-
- /** Sets the nodes of this and attempts to keep the node state in sync */
- public void setNodes(ClusterInfo newClusterInfo) {
- this.nodes = new HashSet<>(newClusterInfo.getConfiguredNodes().values());
-
- for (ConfiguredNode node : this.nodes) {
- NodeInfo newNodeInfo = newClusterInfo.getStorageNodeInfo(node.index());
- NodeState currentState = currentClusterStateView.getClusterState().getNodeState(new Node(NodeType.STORAGE, node.index()));
- if (currentState.getState() == State.RETIRED || currentState.getState() == State.UP) { // then correct to configured state
- proposeNewNodeState(newNodeInfo, new NodeState(NodeType.STORAGE, node.retired() ? State.RETIRED : State.UP));
- }
- }
-
- // Ensure that any nodes that have been removed from the config are also
- // promptly removed from the next (and subsequent) generated cluster states.
- pruneAllNodesNotContainedInConfig();
-
- nextStateViewChanged = true;
- }
-
- private void pruneAllNodesNotContainedInConfig() {
- Set<Integer> configuredIndices = this.nodes.stream().map(ConfiguredNode::index).collect(Collectors.toSet());
- final ClusterState candidateNextState = nextClusterStateView.getClusterState();
- pruneNodesNotContainedInConfig(candidateNextState, configuredIndices, NodeType.DISTRIBUTOR);
- pruneNodesNotContainedInConfig(candidateNextState, configuredIndices, NodeType.STORAGE);
- }
-
- public void setDistribution(Distribution distribution) {
- this.distribution = distribution;
- nextStateViewChanged = true;
- }
-
- public void setMaster(boolean isMaster) {
- this.isMaster = isMaster;
- }
- public void setMaxSlobrokDisconnectGracePeriod(int millisecs) { maxSlobrokDisconnectGracePeriod = millisecs; }
-
- public void setDistributionBits(int bits) {
- if (bits == idealDistributionBits) return;
- idealDistributionBits = bits;
- int currentDistributionBits = calculateMinDistributionBitCount();
- if (currentDistributionBits != nextClusterStateView.getClusterState().getDistributionBitCount()) {
- nextClusterStateView.getClusterState().setDistributionBits(currentDistributionBits);
- nextStateViewChanged = true;
- }
- }
-
- public int getDistributionBits() { return idealDistributionBits; }
-
- public int calculateMinDistributionBitCount() {
- int currentDistributionBits = idealDistributionBits;
- int minNode = -1;
- for (ConfiguredNode node : nodes) {
- NodeState ns = nextClusterStateView.getClusterState().getNodeState(new Node(NodeType.STORAGE, node.index()));
- if (ns.getState().oneOf("iur")) {
- if (ns.getMinUsedBits() < currentDistributionBits) {
- currentDistributionBits = ns.getMinUsedBits();
- minNode = node.index();
- }
- }
- }
- if (minNode == -1) {
- log.log(LogLevel.DEBUG, "Distribution bit count should still be default as all available nodes have at least split to " + idealDistributionBits + " bits");
- } else {
- log.log(LogLevel.DEBUG, "Distribution bit count is limited to " + currentDistributionBits + " due to storage node " + minNode);
- }
- return currentDistributionBits;
- }
-
- public ClusterState getClusterState() { return currentClusterStateView.getClusterState(); }
-
- /**
- * Return the current cluster state, but if the cluster is down, modify the node states with the
- * actual node states from the temporary next state.
- */
- public ClusterState getConsolidatedClusterState() {
- ClusterState currentState = currentClusterStateView.getClusterState();
- if (currentState.getClusterState().equals(State.UP)) {
- return currentState;
- }
-
- ClusterState nextState = nextClusterStateView.getClusterState();
- if (!currentState.getClusterState().equals(nextState.getClusterState())) {
- log.warning("Expected current cluster state object to have same global state as the under creation instance.");
- }
- ClusterState state = nextState.clone();
- state.setVersion(currentState.getVersion());
- state.setOfficial(false);
- return state;
- }
-
- private Optional<Event> getDownDueToTooFewNodesEvent(ClusterState nextClusterState) {
- int upStorageCount = 0, upDistributorCount = 0;
- int dcount = nodes.size();
- int scount = nodes.size();
- for (NodeType type : NodeType.getTypes()) {
- for (ConfiguredNode node : nodes) {
- NodeState ns = nextClusterState.getNodeState(new Node(type, node.index()));
- if (ns.getState() == State.UP || ns.getState() == State.RETIRED || ns.getState() == State.INITIALIZING) {
- if (type.equals(NodeType.STORAGE))
- ++upStorageCount;
- else
- ++upDistributorCount;
- }
- }
- }
-
- long timeNow = timer.getCurrentTimeInMillis();
- if (upStorageCount < minStorageNodesUp) {
- return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE,
- "Less than " + minStorageNodesUp + " storage nodes available (" + upStorageCount + "). Setting cluster state down.",
- timeNow));
- }
- if (upDistributorCount < minDistributorNodesUp) {
- return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE,
- "Less than " + minDistributorNodesUp + " distributor nodes available (" + upDistributorCount + "). Setting cluster state down.",
- timeNow));
- }
- if (minRatioOfStorageNodesUp * scount > upStorageCount) {
- return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE,
- "Less than " + (100 * minRatioOfStorageNodesUp) + " % of storage nodes are available ("
- + upStorageCount + "/" + scount + "). Setting cluster state down.",
- timeNow));
- }
- if (minRatioOfDistributorNodesUp * dcount > upDistributorCount) {
- return Optional.of(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE,
- "Less than " + (100 * minRatioOfDistributorNodesUp) + " % of distributor nodes are available ("
- + upDistributorCount + "/" + dcount + "). Setting cluster state down.",
- timeNow));
- }
- return Optional.empty();
- }
-
- private static Node storageNode(int index) {
- return new Node(NodeType.STORAGE, index);
- }
-
- private void performImplicitStorageNodeStateTransitions(ClusterState candidateState, ContentCluster cluster) {
- if (distribution == null) {
- return; // FIXME due to tests that don't bother setting distr config! Never happens in prod.
- }
- // First clear the states of any nodes that according to reported/wanted state alone
- // should have their states cleared. We might still take these down again based on the
- // decisions of the group availability calculator, but this way we ensure that groups
- // that no longer should be down will have their nodes implicitly made available again.
- // TODO this will be void once SystemStateGenerator has been rewritten to be stateless.
- final Set<Integer> clearedNodes = clearDownStateForStorageNodesThatCanBeUp(candidateState, cluster);
-
- final GroupAvailabilityCalculator calc = new GroupAvailabilityCalculator.Builder()
- .withMinNodeRatioPerGroup(minNodeRatioPerGroup)
- .withDistribution(distribution)
- .build();
- final Set<Integer> nodesToTakeDown = calc.nodesThatShouldBeDown(candidateState);
- markNodesAsDownDueToGroupUnavailability(cluster, candidateState, nodesToTakeDown, clearedNodes);
-
- clearedNodes.removeAll(nodesToTakeDown);
- logEventsForNodesThatWereTakenUp(clearedNodes, cluster);
- }
-
- private void logEventsForNodesThatWereTakenUp(Set<Integer> newlyUpNodes, ContentCluster cluster) {
- newlyUpNodes.forEach(i -> {
- final NodeInfo info = cluster.getNodeInfo(storageNode(i)); // Should always be non-null here.
- // TODO the fact that this only happens for group up events is implementation specific
- // should generalize this if we get other such events.
- eventLog.addNodeOnlyEvent(new NodeEvent(info,
- "Group availability restored; taking node back up",
- NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), LogLevel.INFO);
- });
- }
-
- private void markNodesAsDownDueToGroupUnavailability(ContentCluster cluster,
- ClusterState candidateState,
- Set<Integer> nodesToTakeDown,
- Set<Integer> clearedNodes)
- {
- for (Integer idx : nodesToTakeDown) {
- final Node node = storageNode(idx);
- NodeState newState = new NodeState(NodeType.STORAGE, State.DOWN);
- newState.setDescription("group node availability below configured threshold");
- candidateState.setNodeState(node, newState);
-
- logNodeGroupDownEdgeEventOnce(clearedNodes, node, cluster);
- }
- }
-
- private void logNodeGroupDownEdgeEventOnce(Set<Integer> clearedNodes, Node node, ContentCluster cluster) {
- final NodeInfo nodeInfo = cluster.getNodeInfo(node);
- // If clearedNodes contains the index it means we're just re-downing a node
- // that was previously down. If this is the case, we'd cause a duplicate
- // event if we logged it now as well.
- if (nodeInfo != null && !clearedNodes.contains(node.getIndex())) {
- eventLog.addNodeOnlyEvent(new NodeEvent(nodeInfo,
- "Setting node down as the total availability of its group is " +
- "below the configured threshold",
- NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), LogLevel.INFO);
- }
- }
-
- private NodeState baselineNodeState(NodeInfo info) {
- NodeState reported = info.getReportedState();
- NodeState wanted = info.getWantedState();
-
- final NodeState baseline = reported.clone();
- if (wanted.getState() != State.UP) {
- baseline.setDescription(wanted.getDescription());
- if (reported.above(wanted)) {
- baseline.setState(wanted.getState());
- }
- }
- // Don't reintroduce start timestamp to the node's state if it has already been
- // observed by all distributors. This matches how handleNewReportedNodeState() sets timestamps.
- // TODO make timestamp semantics clearer. Non-obvious what the two different timestamp stores imply.
- // For posterity: reported.getStartTimestamp() is the start timestamp the node itself has stated.
- // info.getStartTimestamp() is the timestamp written as having been observed by all distributors
- // (which is done in handleAllDistributorsInSync()).
- if (reported.getStartTimestamp() <= info.getStartTimestamp()) {
- baseline.setStartTimestamp(0);
- }
-
- return baseline;
- }
-
- // Returns set of nodes whose state was cleared
- private Set<Integer> clearDownStateForStorageNodesThatCanBeUp(
- ClusterState candidateState, ContentCluster cluster)
- {
- final int nodeCount = candidateState.getNodeCount(NodeType.STORAGE);
- final Set<Integer> clearedNodes = new HashSet<>();
- for (int i = 0; i < nodeCount; ++i) {
- final Node node = storageNode(i);
- final NodeInfo info = cluster.getNodeInfo(node);
- final NodeState currentState = candidateState.getNodeState(node);
- if (mayClearCurrentNodeState(currentState, info)) {
- candidateState.setNodeState(node, baselineNodeState(info));
- clearedNodes.add(i);
- }
- }
- return clearedNodes;
- }
-
- private boolean mayClearCurrentNodeState(NodeState currentState, NodeInfo info) {
- if (currentState.getState() != State.DOWN) {
- return false;
- }
- if (info == null) {
- // Nothing known about node in cluster info; we definitely don't want it
- // to be taken up at this point.
- return false;
- }
- // There exists an edge in watchTimers where a node in Maintenance is implicitly
- // transitioned into Down without being Down in either reported or wanted states
- // iff isRpcAddressOutdated() is true. To avoid getting into an edge where we
- // inadvertently clear this state because its reported/wanted states seem fine,
- // we must also check if that particular edge could have happened. I.e. whether
- // the node's RPC address is marked as outdated.
- // It also makes sense in general to not allow taking a node back up automatically
- // if its RPC connectivity appears to be bad.
- if (info.isRpcAddressOutdated()) {
- return false;
- }
- // Rationale: we can only enter this statement if the _current_ (generated) state
- // of the node is Down. Aside from the group take-down logic, there should not exist
- // any other edges in the cluster controller state transition logic where a node
- // may be set Down while both its reported state and wanted state imply that a better
- // state should already have been chosen. Consequently we allow the node to have its
- // Down-state cleared.
- return (info.getReportedState().getState() != State.DOWN
- && !info.getWantedState().getState().oneOf("d"));
- }
-
- private ClusterStateView createNextVersionOfClusterStateView(ContentCluster cluster) {
- // If you change this method, see *) in notifyIfNewSystemState
- ClusterStateView candidateClusterStateView = nextClusterStateView.cloneForNewState();
- ClusterState candidateClusterState = candidateClusterStateView.getClusterState();
-
- int currentDistributionBits = calculateMinDistributionBitCount();
- if (currentDistributionBits != nextClusterStateView.getClusterState().getDistributionBitCount()) {
- candidateClusterState.setDistributionBits(currentDistributionBits);
- }
- performImplicitStorageNodeStateTransitions(candidateClusterState, cluster);
-
- return candidateClusterStateView;
- }
-
- private void pruneNodesNotContainedInConfig(ClusterState candidateClusterState,
- Set<Integer> configuredIndices,
- NodeType nodeType)
- {
- final int nodeCount = candidateClusterState.getNodeCount(nodeType);
- for (int i = 0; i < nodeCount; ++i) {
- final Node node = new Node(nodeType, i);
- final NodeState currentState = candidateClusterState.getNodeState(node);
- if (!configuredIndices.contains(i) && !currentState.getState().equals(State.DOWN)) {
- log.log(LogLevel.INFO, "Removing node " + node + " from state as it is no longer present in config");
- candidateClusterState.setNodeState(node, new NodeState(nodeType, State.DOWN));
- }
- }
- }
-
- private void recordNewClusterStateHasBeenChosen(
- ClusterState currentClusterState, ClusterState newClusterState, Event clusterEvent) {
- long timeNow = timer.getCurrentTimeInMillis();
-
- if (!currentClusterState.getClusterState().equals(State.UP) &&
- newClusterState.getClusterState().equals(State.UP)) {
- eventLog.add(new ClusterEvent(ClusterEvent.Type.SYSTEMSTATE,
- "Enough nodes available for system to become up.", timeNow), isMaster);
- } else if (currentClusterState.getClusterState().equals(State.UP) &&
- ! newClusterState.getClusterState().equals(State.UP)) {
- assert(clusterEvent != null);
- eventLog.add(clusterEvent, isMaster);
- }
-
- if (newClusterState.getDistributionBitCount() != currentClusterState.getDistributionBitCount()) {
- eventLog.add(new ClusterEvent(
- ClusterEvent.Type.SYSTEMSTATE,
- "Altering distribution bits in system from "
- + currentClusterState.getDistributionBitCount() + " to " +
- currentClusterState.getDistributionBitCount(),
- timeNow), isMaster);
- }
-
- eventLog.add(new ClusterEvent(
- ClusterEvent.Type.SYSTEMSTATE,
- "New cluster state version " + newClusterState.getVersion() + ". Change from last: " +
- currentClusterState.getTextualDifference(newClusterState),
- timeNow), isMaster);
-
- log.log(LogLevel.DEBUG, "Created new cluster state version: " + newClusterState.toString(true));
- systemStateHistory.addFirst(new SystemStateHistoryEntry(newClusterState, timeNow));
- if (systemStateHistory.size() > maxHistorySize) {
- systemStateHistory.removeLast();
- }
- }
-
- private void mergeIntoNextClusterState(ClusterState sourceState) {
- final ClusterState nextState = nextClusterStateView.getClusterState();
- final int nodeCount = sourceState.getNodeCount(NodeType.STORAGE);
- for (int i = 0; i < nodeCount; ++i) {
- final Node node = storageNode(i);
- final NodeState stateInSource = sourceState.getNodeState(node);
- final NodeState stateInTarget = nextState.getNodeState(node);
- if (stateInSource.getState() != stateInTarget.getState()) {
- nextState.setNodeState(node, stateInSource);
- }
- }
- }
-
- public boolean notifyIfNewSystemState(ContentCluster cluster, SystemStateListener stateListener) {
- if ( ! nextStateViewChanged) return false;
-
- ClusterStateView newClusterStateView = createNextVersionOfClusterStateView(cluster);
-
- ClusterState newClusterState = newClusterStateView.getClusterState();
- // Creating the next version of the state may implicitly take down nodes, so our checks
- // for taking the entire cluster down must happen _after_ this
- Optional<Event> clusterDown = getDownDueToTooFewNodesEvent(newClusterState);
- newClusterState.setClusterState(clusterDown.isPresent() ? State.DOWN : State.UP);
-
- if (newClusterState.similarTo(currentClusterStateView.getClusterState())) {
- log.log(LogLevel.DEBUG,
- "State hasn't changed enough to warrant new cluster state. Not creating new state: " +
- currentClusterStateView.getClusterState().getTextualDifference(newClusterState));
- return false;
- }
-
- // Update the version of newClusterState now. This cannot be done prior to similarTo(),
- // since it makes the cluster states different. From now on, the new cluster state is immutable.
- newClusterState.setVersion(currentClusterStateView.getClusterState().getVersion() + 1);
-
- recordNewClusterStateHasBeenChosen(currentClusterStateView.getClusterState(),
- newClusterStateView.getClusterState(), clusterDown.orElse(null));
-
- // *) Ensure next state is still up to date.
- // This should make nextClusterStateView a deep-copy of currentClusterStateView.
- // If more than the distribution bits and state are deep-copied in
- // createNextVersionOfClusterStateView(), we need to add corresponding statements here.
- // This seems like a hack...
- nextClusterStateView.getClusterState().setDistributionBits(newClusterState.getDistributionBitCount());
- nextClusterStateView.getClusterState().setClusterState(newClusterState.getClusterState());
- mergeIntoNextClusterState(newClusterState);
-
- currentClusterStateView = newClusterStateView;
- nextStateViewChanged = false;
-
- stateListener.handleNewSystemState(currentClusterStateView.getClusterState());
-
- return true;
- }
-
- public void setLatestSystemStateVersion(int version) {
- currentClusterStateView.getClusterState().setVersion(Math.max(1, version));
- nextStateViewChanged = true;
- }
-
- private void setNodeState(NodeInfo node, NodeState newState) {
- NodeState oldState = nextClusterStateView.getClusterState().getNodeState(node.getNode());
-
- // Correct UP to RETIRED if the node wants to be retired
- if (newState.above(node.getWantedState()))
- newState.setState(node.getWantedState().getState());
-
- // Keep old description if a new one is not set and we're not going up or in initializing mode
- if ( ! newState.getState().oneOf("ui") && oldState.hasDescription()) {
- newState.setDescription(oldState.getDescription());
- }
-
- // Keep disk information if not set in new state
- if (newState.getDiskCount() == 0 && oldState.getDiskCount() != 0) {
- newState.setDiskCount(oldState.getDiskCount());
- for (int i=0; i<oldState.getDiskCount(); ++i) {
- newState.setDiskState(i, oldState.getDiskState(i));
- }
- }
- if (newState.equals(oldState)) {
- return;
- }
-
- eventLog.add(new NodeEvent(node, "Altered node state in cluster state from '" + oldState.toString(true)
- + "' to '" + newState.toString(true) + "'.",
- NodeEvent.Type.CURRENT, timer.getCurrentTimeInMillis()), isMaster);
- nextClusterStateView.getClusterState().setNodeState(node.getNode(), newState);
- nextStateViewChanged = true;
- }
-
- public void handleNewReportedNodeState(NodeInfo node, NodeState reportedState, NodeStateOrHostInfoChangeHandler nodeListener) {
- ClusterState nextState = nextClusterStateView.getClusterState();
- NodeState currentState = nextState.getNodeState(node.getNode());
- log.log(currentState.equals(reportedState) && node.getVersion() == 0 ? LogLevel.SPAM : LogLevel.DEBUG,
- "Got nodestate reply from " + node + ": "
- + node.getReportedState().getTextualDifference(reportedState) + " (Current state is " + currentState.toString(true) + ")");
- long currentTime = timer.getCurrentTimeInMillis();
- if (reportedState.getState().equals(State.DOWN)) {
- node.setTimeOfFirstFailingConnectionAttempt(currentTime);
- }
- if ( ! reportedState.similarTo(node.getReportedState())) {
- if (reportedState.getState().equals(State.DOWN)) {
- eventLog.addNodeOnlyEvent(new NodeEvent(node, "Failed to get node state: " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.INFO);
- } else {
- eventLog.addNodeOnlyEvent(new NodeEvent(node, "Now reporting state " + reportedState.toString(true), NodeEvent.Type.REPORTED, currentTime), LogLevel.DEBUG);
- }
- }
- if (reportedState.equals(node.getReportedState()) && ! reportedState.getState().equals(State.INITIALIZING))
- return;
-
- NodeState alteredState = decideNodeStateGivenReportedState(node, currentState, reportedState, nodeListener);
- if (alteredState != null) {
- ClusterState clusterState = currentClusterStateView.getClusterState();
-
- if (alteredState.above(node.getWantedState())) {
- log.log(LogLevel.DEBUG, "Cannot set node in state " + alteredState.getState() + " when wanted state is " + node.getWantedState());
- alteredState.setState(node.getWantedState().getState());
- }
- if (reportedState.getStartTimestamp() > node.getStartTimestamp()) {
- alteredState.setStartTimestamp(reportedState.getStartTimestamp());
- } else {
- alteredState.setStartTimestamp(0);
- }
- if (!alteredState.similarTo(currentState)) {
- setNodeState(node, alteredState);
- } else if (!alteredState.equals(currentState)) {
- if (currentState.getState().equals(State.INITIALIZING) && alteredState.getState().equals(State.INITIALIZING) &&
- Math.abs(currentState.getInitProgress() - alteredState.getInitProgress()) > 0.000000001)
- {
- log.log(LogLevel.DEBUG, "Only silently updating init progress for " + node + " in cluster state because new "
- + "state is too similar to tag new version: " + currentState.getTextualDifference(alteredState));
- currentState.setInitProgress(alteredState.getInitProgress());
- nextState.setNodeState(node.getNode(), currentState);
-
- NodeState currentNodeState = clusterState.getNodeState(node.getNode());
- if (currentNodeState.getState().equals(State.INITIALIZING)) {
- currentNodeState.setInitProgress(alteredState.getInitProgress());
- clusterState.setNodeState(node.getNode(), currentNodeState);
- }
- } else if (alteredState.getMinUsedBits() != currentState.getMinUsedBits()) {
- log.log(LogLevel.DEBUG, "Altering node state to reflect that min distribution bit count have changed from "
- + currentState.getMinUsedBits() + " to " + alteredState.getMinUsedBits());
- int oldCount = currentState.getMinUsedBits();
- currentState.setMinUsedBits(alteredState.getMinUsedBits());
- nextState.setNodeState(node.getNode(), currentState);
- int minDistBits = calculateMinDistributionBitCount();
- if (minDistBits < nextState.getDistributionBitCount()
- || (nextState.getDistributionBitCount() < this.idealDistributionBits && minDistBits >= this.idealDistributionBits))
- {
- // If this will actually affect global cluster state.
- eventLog.add(new NodeEvent(node, "Altered min distribution bit count from " + oldCount
- + " to " + currentState.getMinUsedBits() + ". Updated cluster state.", NodeEvent.Type.CURRENT, currentTime), isMaster);
- nextStateViewChanged = true;
- } else {
- log.log(LogLevel.DEBUG, "Altered min distribution bit count from " + oldCount
- + " to " + currentState.getMinUsedBits() + ". No effect for cluster state with ideal " + this.idealDistributionBits
- + ", new " + minDistBits + ", old " + nextState.getDistributionBitCount() + " though.");
- clusterState.setNodeState(node.getNode(), currentState);
- }
- } else {
- log.log(LogLevel.DEBUG, "Not altering state of " + node + " in cluster state because new state is too similar: "
- + currentState.getTextualDifference(alteredState));
- }
- } else if (alteredState.getDescription().contains("Listing buckets")) {
- currentState.setDescription(alteredState.getDescription());
- nextState.setNodeState(node.getNode(), currentState);
- NodeState currentNodeState = clusterState.getNodeState(node.getNode());
- currentNodeState.setDescription(alteredState.getDescription());
- clusterState.setNodeState(node.getNode(), currentNodeState);
- }
- }
- }
-
- public void handleNewNode(NodeInfo node) {
- setHostName(node);
- String message = "Found new node " + node + " in slobrok at " + node.getRpcAddress();
- eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
- }
-
- public void handleMissingNode(NodeInfo node, NodeStateOrHostInfoChangeHandler nodeListener) {
- removeHostName(node);
-
- long timeNow = timer.getCurrentTimeInMillis();
-
- if (node.getLatestNodeStateRequestTime() != null) {
- eventLog.add(new NodeEvent(node, "Node is no longer in slobrok, but we still have a pending state request.", NodeEvent.Type.REPORTED, timeNow), isMaster);
- } else {
- eventLog.add(new NodeEvent(node, "Node is no longer in slobrok. No pending state request to node.", NodeEvent.Type.REPORTED, timeNow), isMaster);
- }
- if (node.getReportedState().getState().equals(State.STOPPING)) {
- log.log(LogLevel.DEBUG, "Node " + node.getNode() + " is no longer in slobrok. Was in stopping state, so assuming it has shut down normally. Setting node down");
- NodeState ns = node.getReportedState().clone();
- ns.setState(State.DOWN);
- handleNewReportedNodeState(node, ns.clone(), nodeListener);
- node.setReportedState(ns, timer.getCurrentTimeInMillis()); // Must reset it to null to get connection attempts counted
- } else {
- log.log(LogLevel.DEBUG, "Node " + node.getNode() + " no longer in slobrok was in state " + node.getReportedState() + ". Waiting to see if it reappears in slobrok");
- }
- }
-
- /**
- * Propose a new state for a node. This may happen due to an administrator action, orchestration, or
- * a configuration change.
- */
- public void proposeNewNodeState(NodeInfo node, NodeState proposedState) {
- NodeState currentState = nextClusterStateView.getClusterState().getNodeState(node.getNode());
- NodeState currentReported = node.getReportedState(); // TODO: Is there a reason to have both of this and the above?
-
- NodeState newCurrentState = currentReported.clone();
-
- newCurrentState.setState(proposedState.getState()).setDescription(proposedState.getDescription());
-
- if (currentState.getState().equals(newCurrentState.getState())) return;
-
- log.log(LogLevel.DEBUG, "Got new wanted nodestate for " + node + ": " + currentState.getTextualDifference(proposedState));
- // Should be checked earlier before state was set in cluster
- assert(newCurrentState.getState().validWantedNodeState(node.getNode().getType()));
- long timeNow = timer.getCurrentTimeInMillis();
- if (newCurrentState.above(currentReported)) {
- eventLog.add(new NodeEvent(node, "Wanted state " + newCurrentState + ", but we cannot force node into that state yet as it is currently in " + currentReported, NodeEvent.Type.REPORTED, timeNow), isMaster);
- return;
- }
- if ( ! newCurrentState.similarTo(currentState)) {
- eventLog.add(new NodeEvent(node, "Node state set to " + newCurrentState + ".", NodeEvent.Type.WANTED, timeNow), isMaster);
- }
- setNodeState(node, newCurrentState);
- }
-
- public void handleNewRpcAddress(NodeInfo node) {
- setHostName(node);
- String message = "Node " + node + " has a new address in slobrok: " + node.getRpcAddress();
- eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
- }
-
- public void handleReturnedRpcAddress(NodeInfo node) {
- setHostName(node);
- String message = "Node got back into slobrok with same address as before: " + node.getRpcAddress();
- eventLog.add(new NodeEvent(node, message, NodeEvent.Type.REPORTED, timer.getCurrentTimeInMillis()), isMaster);
- }
-
- private void setHostName(NodeInfo node) {
- String rpcAddress = node.getRpcAddress();
- if (rpcAddress == null) {
- // This may happen if we haven't seen the node in Slobrok yet.
- return;
- }
-
- Spec address = new Spec(rpcAddress);
- if (address.malformed()) {
- return;
- }
-
- hostnames.put(node.getNodeIndex(), address.host());
- }
-
- private void removeHostName(NodeInfo node) {
- hostnames.remove(node.getNodeIndex());
- }
-
- public boolean watchTimers(ContentCluster cluster, NodeStateOrHostInfoChangeHandler nodeListener) {
- boolean triggeredAnyTimers = false;
- long currentTime = timer.getCurrentTimeInMillis();
- for(NodeInfo node : cluster.getNodeInfo()) {
- NodeState currentStateInSystem = nextClusterStateView.getClusterState().getNodeState(node.getNode());
- NodeState lastReportedState = node.getReportedState();
-
- // If we haven't had slobrok contact in a given amount of time and node is still not considered down,
- // mark it down.
- if (node.isRpcAddressOutdated()
- && !lastReportedState.getState().equals(State.DOWN)
- && node.getRpcAddressOutdatedTimestamp() + maxSlobrokDisconnectGracePeriod <= currentTime)
- {
- StringBuilder sb = new StringBuilder().append("Set node down as it has been out of slobrok for ")
- .append(currentTime - node.getRpcAddressOutdatedTimestamp()).append(" ms which is more than the max limit of ")
- .append(maxSlobrokDisconnectGracePeriod).append(" ms.");
- node.abortCurrentNodeStateRequests();
- NodeState state = lastReportedState.clone();
- state.setState(State.DOWN);
- if (!state.hasDescription()) state.setDescription(sb.toString());
- eventLog.add(new NodeEvent(node, sb.toString(), NodeEvent.Type.CURRENT, currentTime), isMaster);
- handleNewReportedNodeState(node, state.clone(), nodeListener);
- node.setReportedState(state, currentTime);
- triggeredAnyTimers = true;
- }
-
- // If node is still unavailable after transition time, mark it down
- if (currentStateInSystem.getState().equals(State.MAINTENANCE)
- && ( ! nextStateViewChanged || ! this.nextClusterStateView.getClusterState().getNodeState(node.getNode()).getState().equals(State.DOWN))
- && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN))
- && (lastReportedState.getState().equals(State.DOWN) || node.isRpcAddressOutdated())
- && node.getTransitionTime() + maxTransitionTime.get(node.getNode().getType()) < currentTime)
- {
- eventLog.add(new NodeEvent(node, (currentTime - node.getTransitionTime())
- + " milliseconds without contact. Marking node down.", NodeEvent.Type.CURRENT, currentTime), isMaster);
- NodeState newState = new NodeState(node.getNode().getType(), State.DOWN).setDescription(
- (currentTime - node.getTransitionTime()) + " ms without contact. Too long to keep in maintenance. Marking node down");
- // Keep old description if there is one as it is likely closer to the cause of the problem
- if (currentStateInSystem.hasDescription()) newState.setDescription(currentStateInSystem.getDescription());
- setNodeState(node, newState);
- triggeredAnyTimers = true;
- }
-
- // If node hasn't increased its initializing progress within initprogresstime, mark it down.
- if (!currentStateInSystem.getState().equals(State.DOWN)
- && node.getWantedState().above(new NodeState(node.getNode().getType(), State.DOWN))
- && lastReportedState.getState().equals(State.INITIALIZING)
- && maxInitProgressTime != 0
- && node.getInitProgressTime() + maxInitProgressTime <= currentTime
- && node.getNode().getType().equals(NodeType.STORAGE))
- {
- eventLog.add(new NodeEvent(node, (currentTime - node.getInitProgressTime()) + " milliseconds "
- + "without initialize progress. Marking node down."
- + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".", NodeEvent.Type.CURRENT, currentTime), isMaster);
- NodeState newState = new NodeState(node.getNode().getType(), State.DOWN).setDescription(
- (currentTime - node.getInitProgressTime()) + " ms without initialize progress. Assuming node has deadlocked.");
- setNodeState(node, newState);
- handlePrematureCrash(node, nodeListener);
- triggeredAnyTimers = true;
- }
- if (node.getUpStableStateTime() + stableStateTimePeriod <= currentTime
- && lastReportedState.getState().equals(State.UP)
- && node.getPrematureCrashCount() <= maxPrematureCrashes
- && node.getPrematureCrashCount() != 0)
- {
- node.setPrematureCrashCount(0);
- log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been up for a long time.");
- triggeredAnyTimers = true;
- } else if (node.getDownStableStateTime() + stableStateTimePeriod <= currentTime
- && lastReportedState.getState().equals(State.DOWN)
- && node.getPrematureCrashCount() <= maxPrematureCrashes
- && node.getPrematureCrashCount() != 0)
- {
- node.setPrematureCrashCount(0);
- log.log(LogLevel.DEBUG, "Resetting premature crash count on node " + node + " as it has been down for a long time.");
- triggeredAnyTimers = true;
- }
- }
- return triggeredAnyTimers;
- }
-
- private boolean isControlledShutdown(NodeState state) {
- return (state.getState() == State.STOPPING && (state.getDescription().contains("Received signal 15 (SIGTERM - Termination signal)")
- || state.getDescription().contains("controlled shutdown")));
- }
-
- /**
- * Decide the state assigned to a new node given the state it reported
- *
- * @param node the node we are computing the state of
- * @param currentState the current state of the node
- * @param reportedState the new state reported by (or, in the case of down - inferred from) the node
- * @param nodeListener this listener is notified for some of the system state changes that this will return
- * @return the node node state, or null to keep the nodes current state
- */
- private NodeState decideNodeStateGivenReportedState(NodeInfo node, NodeState currentState, NodeState reportedState,
- NodeStateOrHostInfoChangeHandler nodeListener) {
- long timeNow = timer.getCurrentTimeInMillis();
-
- log.log(LogLevel.DEBUG, "Finding new cluster state entry for " + node + " switching state " + currentState.getTextualDifference(reportedState));
-
- // Set nodes in maintenance if 1) down, or 2) initializing but set retired, to avoid migrating data
- // to the retired node while it is initializing
- if (currentState.getState().oneOf("ur") && reportedState.getState().oneOf("dis")
- && (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING)))
- {
- long currentTime = timer.getCurrentTimeInMillis();
- node.setTransitionTime(currentTime);
- if (node.getUpStableStateTime() + stableStateTimePeriod > currentTime && !isControlledShutdown(reportedState)) {
- log.log(LogLevel.DEBUG, "Stable state: " + node.getUpStableStateTime() + " + " + stableStateTimePeriod + " > " + currentTime);
- eventLog.add(new NodeEvent(node,
- "Stopped or possibly crashed after " + (currentTime - node.getUpStableStateTime())
- + " ms, which is before stable state time period."
- + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".",
- NodeEvent.Type.CURRENT,
- timeNow), isMaster);
- if (handlePrematureCrash(node, nodeListener)) return null;
- }
- if (maxTransitionTime.get(node.getNode().getType()) != 0) {
- return new NodeState(node.getNode().getType(), State.MAINTENANCE).setDescription(reportedState.getDescription());
- }
- }
-
- // If we got increasing initialization progress, reset initialize timer
- if (reportedState.getState().equals(State.INITIALIZING) &&
- (!currentState.getState().equals(State.INITIALIZING) ||
- reportedState.getInitProgress() > currentState.getInitProgress()))
- {
- node.setInitProgressTime(timer.getCurrentTimeInMillis());
- log.log(LogLevel.DEBUG, "Reset initialize timer on " + node + " to " + node.getInitProgressTime());
- }
-
- // If we get reverse initialize progress, mark node unstable, such that we don't mark it initializing again before it is up.
- if (currentState.getState().equals(State.INITIALIZING) &&
- (reportedState.getState().equals(State.INITIALIZING) && reportedState.getInitProgress() < currentState.getInitProgress()))
- {
- eventLog.add(new NodeEvent(node, "Stop or crash during initialization detected from reverse initializing progress."
- + " Progress was " + currentState.getInitProgress() + " but is now " + reportedState.getInitProgress() + "."
- + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".",
- NodeEvent.Type.CURRENT, timeNow), isMaster);
- return (handlePrematureCrash(node, nodeListener) ? null : new NodeState(node.getNode().getType(), State.DOWN).setDescription(
- "Got reverse intialize progress. Assuming node have prematurely crashed"));
- }
-
- // If we go down while initializing, mark node unstable, such that we don't mark it initializing again before it is up.
- if (currentState.getState().equals(State.INITIALIZING) && reportedState.getState().oneOf("ds") && !isControlledShutdown(reportedState))
- {
- eventLog.add(new NodeEvent(node, "Stop or crash during initialization."
- + " Premature crash count is now " + (node.getPrematureCrashCount() + 1) + ".",
- NodeEvent.Type.CURRENT, timeNow), isMaster);
- return (handlePrematureCrash(node, nodeListener) ? null : new NodeState(node.getNode().getType(), State.DOWN).setDescription(reportedState.getDescription()));
- }
-
- // Ignore further unavailable states when node is set in maintenance
- if (currentState.getState().equals(State.MAINTENANCE) && reportedState.getState().oneOf("dis"))
- {
- if (node.getWantedState().getState().equals(State.RETIRED) || !reportedState.getState().equals(State.INITIALIZING)
- || reportedState.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001) {
- log.log(LogLevel.DEBUG, "Ignoring down and initializing reports while in maintenance mode on " + node + ".");
- return null;
- }
- }
-
- // Hide initializing state if node has been unstable. (Not for distributors as these own buckets while initializing)
- if ((currentState.getState().equals(State.DOWN) || currentState.getState().equals(State.UP)) &&
- reportedState.getState().equals(State.INITIALIZING) && node.getPrematureCrashCount() > 0 &&
- !node.isDistributor())
- {
- log.log(LogLevel.DEBUG, "Not setting " + node + " initializing again as it crashed prematurely earlier.");
- return new NodeState(node.getNode().getType(), State.DOWN).setDescription("Not setting node back up as it failed prematurely at last attempt");
- }
- // Hide initializing state in cluster state if initialize progress is so low that we haven't listed buckets yet
- if (!node.isDistributor() && reportedState.getState().equals(State.INITIALIZING) &&
- reportedState.getInitProgress() <= NodeState.getListingBucketsInitProgressLimit() + 0.00001)
- {
- log.log(LogLevel.DEBUG, "Not setting " + node + " initializing in cluster state quite yet, as initializing progress still indicate it is listing buckets.");
- return new NodeState(node.getNode().getType(), State.DOWN).setDescription("Listing buckets. Progress " + (100 * reportedState.getInitProgress()) + " %.");
- }
- return reportedState.clone();
- }
-
- public boolean handlePrematureCrash(NodeInfo node, NodeStateOrHostInfoChangeHandler changeListener) {
- node.setPrematureCrashCount(node.getPrematureCrashCount() + 1);
- if (disableUnstableNodes && node.getPrematureCrashCount() > maxPrematureCrashes) {
- NodeState wantedState = new NodeState(node.getNode().getType(), State.DOWN)
- .setDescription("Disabled by fleet controller as it prematurely shut down " + node.getPrematureCrashCount() + " times in a row");
- NodeState oldState = node.getWantedState();
- node.setWantedState(wantedState);
- if ( ! oldState.equals(wantedState)) {
- changeListener.handleNewWantedNodeState(node, wantedState);
- }
- return true;
- }
- return false;
- }
-
- public void handleUpdatedHostInfo(NodeInfo nodeInfo, HostInfo hostInfo) {
- // Only pass the host info to the latest cluster state view.
- currentClusterStateView.handleUpdatedHostInfo(hostnames, nodeInfo, hostInfo);
- }
-
- public class SystemStateHistoryEntry {
-
- private final ClusterState state;
- private final long time;
-
- SystemStateHistoryEntry(ClusterState state, long time) {
- this.state = state;
- this.time = time;
- }
-
- public ClusterState state() { return state; }
-
- public long time() { return time; }
-
- }
-
-}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
index a21ed994d5d..c4e7c6897e1 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/DatabaseHandler.java
@@ -248,6 +248,8 @@ public class DatabaseHandler {
log.log(LogLevel.DEBUG, "Fleetcontroller " + nodeIndex
+ ": Attempting to store last system state version " + pendingStore.lastSystemStateVersion
+ " into zookeeper.");
+ // TODO guard version write with a CaS predicated on the version we last read/wrote.
+ // TODO Drop leadership status if there is a mismatch, as it implies we're racing with another leader.
if (database.storeLatestSystemStateVersion(pendingStore.lastSystemStateVersion)) {
currentlyStored.lastSystemStateVersion = pendingStore.lastSystemStateVersion;
pendingStore.lastSystemStateVersion = null;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java
index cd9c66d18f0..f952f842151 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/database/MasterDataGatherer.java
@@ -51,7 +51,7 @@ public class MasterDataGatherer {
public void process(WatchedEvent watchedEvent) {
switch (watchedEvent.getType()) {
case NodeChildrenChanged: // Fleetcontrollers have either connected or disconnected to ZooKeeper
- log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": A change occured in the list of registered fleetcontrollers. Requesting new information");
+ log.log(LogLevel.INFO, "Fleetcontroller " + nodeIndex + ": A change occurred in the list of registered fleetcontrollers. Requesting new information");
session.getChildren(zooKeeperRoot + "indexes", this, childListener, null);
break;
case NodeDataChanged: // A fleetcontroller has changed what node it is voting for
@@ -160,7 +160,7 @@ public class MasterDataGatherer {
}
}
- /** Calling restart, ignores what we currently know and starts another circly. Typically called after reconnecting to ZooKeeperServer. */
+ /** Calling restart, ignores what we currently know and starts another cycle. Typically called after reconnecting to ZooKeeperServer. */
public void restart() {
synchronized (nextMasterData) {
masterData = new TreeMap<Integer, Integer>();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java
index 6de9205bbe3..9428370faf5 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/ClusterStateRequestHandler.java
@@ -2,19 +2,19 @@
package com.yahoo.vespa.clustercontroller.core.status;
import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.StateVersionTracker;
import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageResponse;
import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageServer;
-import com.yahoo.vespa.clustercontroller.core.SystemStateGenerator;
public class ClusterStateRequestHandler implements StatusPageServer.RequestHandler {
- private final SystemStateGenerator systemStateGenerator;
+ private final StateVersionTracker stateVersionTracker;
- public ClusterStateRequestHandler(SystemStateGenerator systemStateGenerator) {
- this.systemStateGenerator = systemStateGenerator;
+ public ClusterStateRequestHandler(StateVersionTracker stateVersionTracker) {
+ this.stateVersionTracker = stateVersionTracker;
}
@Override
public StatusPageResponse handle(StatusPageServer.HttpRequest request) {
- ClusterState cs = systemStateGenerator.getClusterState();
+ ClusterState cs = stateVersionTracker.getVersionedClusterState();
StatusPageResponse response = new StatusPageResponse();
response.setContentType("text/plain");
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java
index 85db0ac0ef9..ec75ba3532d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/LegacyIndexPageRequestHandler.java
@@ -17,21 +17,22 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
private final Timer timer;
private final ContentCluster cluster;
private final MasterElectionHandler masterElectionHandler;
- private final SystemStateGenerator systemStateGenerator;
+ private final StateVersionTracker stateVersionTracker;
private final EventLog eventLog;
private final long startedTime;
private final RunDataExtractor data;
private boolean showLocalSystemStatesInLog = true;
public LegacyIndexPageRequestHandler(Timer timer, boolean showLocalSystemStatesInLog, ContentCluster cluster,
- MasterElectionHandler masterElectionHandler, SystemStateGenerator systemStateGenerator,
+ MasterElectionHandler masterElectionHandler,
+ StateVersionTracker stateVersionTracker,
EventLog eventLog, long startedTime, RunDataExtractor data)
{
this.timer = timer;
this.showLocalSystemStatesInLog = showLocalSystemStatesInLog;
this.cluster = cluster;
this.masterElectionHandler = masterElectionHandler;
- this.systemStateGenerator = systemStateGenerator;
+ this.stateVersionTracker = stateVersionTracker;
this.eventLog = eventLog;
this.startedTime = startedTime;
this.data = data;
@@ -63,7 +64,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
new VdsClusterHtmlRendrer(),
content,
timer,
- systemStateGenerator.getClusterState(),
+ stateVersionTracker.getVersionedClusterState(),
data.getOptions().storageDistribution,
data.getOptions(),
eventLog,
@@ -71,7 +72,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
// Overview of current config
data.getOptions().writeHtmlState(content, request);
// Current cluster state and cluster state history
- writeHtmlState(systemStateGenerator, content, request);
+ writeHtmlState(stateVersionTracker, content, request);
} else {
// Overview of current config
data.getOptions().writeHtmlState(content, request);
@@ -84,7 +85,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
return response;
}
- public void writeHtmlState(SystemStateGenerator systemStateGenerator, StringBuilder sb, StatusPageServer.HttpRequest request) {
+ public void writeHtmlState(StateVersionTracker stateVersionTracker, StringBuilder sb, StatusPageServer.HttpRequest request) {
boolean showLocal = showLocalSystemStatesInLog;
if (request.hasQueryParameter("showlocal")) {
showLocal = true;
@@ -93,9 +94,9 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
}
sb.append("<h2 id=\"clusterstates\">Cluster states</h2>\n")
- .append("<p>Current cluster state:<br><code>").append(systemStateGenerator.currentClusterStateView().toString()).append("</code></p>\n");
+ .append("<p>Current cluster state:<br><code>").append(stateVersionTracker.getVersionedClusterState().toString()).append("</code></p>\n");
- if ( ! systemStateGenerator.systemStateHistory().isEmpty()) {
+ if ( ! stateVersionTracker.getClusterStateHistory().isEmpty()) {
TimeZone tz = TimeZone.getTimeZone("UTC");
sb.append("<h3 id=\"clusterstatehistory\">Cluster state history</h3>\n");
if (showLocal) {
@@ -106,10 +107,10 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
.append(" <th>Cluster state</th>\n")
.append("</tr>\n");
// Write cluster state history in reverse order (newest on top)
- Iterator<SystemStateGenerator.SystemStateHistoryEntry> stateIterator = systemStateGenerator.systemStateHistory().iterator();
- SystemStateGenerator.SystemStateHistoryEntry current = null;
+ Iterator<ClusterStateHistoryEntry> stateIterator = stateVersionTracker.getClusterStateHistory().iterator();
+ ClusterStateHistoryEntry current = null;
while (stateIterator.hasNext()) {
- SystemStateGenerator.SystemStateHistoryEntry nextEntry = stateIterator.next();
+ ClusterStateHistoryEntry nextEntry = stateIterator.next();
if (nextEntry.state().isOfficial() || showLocal) {
if (current != null) writeClusterStateEntry(current, nextEntry, sb, tz);
current = nextEntry;
@@ -120,7 +121,7 @@ public class LegacyIndexPageRequestHandler implements StatusPageServer.RequestHa
}
}
- private void writeClusterStateEntry(SystemStateGenerator.SystemStateHistoryEntry entry, SystemStateGenerator.SystemStateHistoryEntry last, StringBuilder sb, TimeZone tz) {
+ private void writeClusterStateEntry(ClusterStateHistoryEntry entry, ClusterStateHistoryEntry last, StringBuilder sb, TimeZone tz) {
sb.append("<tr><td>").append(RealTimer.printDate(entry.time(), tz))
.append("</td><td>").append(entry.state().isOfficial() ? "" : "<font color=\"grey\">");
sb.append(entry.state());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java
deleted file mode 100644
index fa8128753f6..00000000000
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/StaticResourceRequestHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-package com.yahoo.vespa.clustercontroller.core.status;
-
-import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageResponse;
-import com.yahoo.vespa.clustercontroller.core.status.statuspage.StatusPageServer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.InputStream;
-import java.io.IOException;
-
-/**
- * HTTP request handler for serving a single JAR resource as if it were
- * a regular file hosted on the server. Always serves the content verbatim
- * (i.e. as a byte stream), specifying a Content-Type provided when creating
- * the handler.
- *
- * @author <a href="mailto:vekterli@yahoo-inc.com">Tor Brede Vekterli</a>
- * @since 5.28
- */
-public class StaticResourceRequestHandler implements StatusPageServer.RequestHandler {
- private final byte[] resourceData;
- private final String contentType;
-
- public StaticResourceRequestHandler(String resourcePath,
- String contentType)
- throws IOException
- {
- this.resourceData = loadResource(resourcePath);
- this.contentType = contentType;
- }
-
- private byte[] loadResource(String resourcePath) throws IOException {
- InputStream resourceStream = getClass().getClassLoader().getResourceAsStream(resourcePath);
- if (resourceStream == null) {
- throw new IOException("No resource with path '" + resourcePath + "' could be found");
- }
- return readStreamData(resourceStream);
- }
-
- @Override
- public StatusPageResponse handle(StatusPageServer.HttpRequest request) {
- final StatusPageResponse response = new StatusPageResponse();
- response.setClientCachingEnabled(true);
- response.setContentType(contentType);
- try {
- response.getOutputStream().write(resourceData);
- } catch (IOException e) {
- response.setResponseCode(StatusPageResponse.ResponseCode.INTERNAL_SERVER_ERROR);
- }
- return response;
- }
-
- private byte[] readStreamData(InputStream resourceStream) throws IOException {
- final byte[] buf = new byte[4096];
- final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
- while (true) {
- int read = resourceStream.read(buf);
- if (read < 0) {
- break;
- }
- outputStream.write(buf, 0, read);
- }
- outputStream.close();
- return outputStream.toByteArray();
- }
-}