aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-06-06 11:34:34 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-06-06 12:16:01 +0000
commitfa44b589802425c2d2dd5e7c0d19a7d4d3f3a0e9 (patch)
treec16aae15a079ad33b24dbb54f30acf37e8f27096 /clustercontroller-core/src
parent6ef5c172d1ab4ff880f0ae7daca5bc5f23f43fa1 (diff)
Add metric for number of nodes not converged to the latest cluster state version
There is a grace period (currently hard-coded to 30s) between the time a new state version is published and before nodes are counted as not converging. This state is not sticky for a given node, so if the state keeps changing constantly it's possible that we under- count the number of non-converging nodes. But this can be improved over time; for now it's primarily a source from which we can generate alerts, as failures to converge can manifest itself as other failures in the content cluster. Also move out of sync ratio computations to the periodic metric update functionality and reset it if the current controller is not the believed cluster leader. Start moving some CC timer code to use `Instant` instead `long`s with implicit units.
Diffstat (limited to 'clustercontroller-core/src')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java28
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MetricUpdater.java29
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RealTimer.java6
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java7
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Timer.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ContentClusterHtmlRendererTest.java5
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FakeTimer.java6
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MetricReporterTest.java81
9 files changed, 147 insertions, 25 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 3f7214c31e2..5cffa4957c6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -139,7 +139,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta
public static FleetController create(FleetControllerOptions options, MetricReporter metricReporter) throws Exception {
var context = new FleetControllerContextImpl(options);
var timer = new RealTimer();
- var metricUpdater = new MetricUpdater(metricReporter, options.fleetControllerIndex(), options.clusterName());
+ var metricUpdater = new MetricUpdater(metricReporter, timer, options.fleetControllerIndex(), options.clusterName());
var log = new EventLog(timer, metricUpdater);
var cluster = new ContentCluster(options);
var stateGatherer = new NodeStateGatherer(timer, timer, log);
@@ -348,7 +348,8 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta
ClusterState baselineState = stateBundle.getBaselineClusterState();
newStates.add(stateBundle);
metricUpdater.updateClusterStateMetrics(cluster, baselineState,
- ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit(), stateBundle.getFeedBlock()));
+ ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit(), stateBundle.getFeedBlock()),
+ systemStateBroadcaster.getLastStateBroadcastTimePoint());
lastMetricUpdateCycleCount = cycleCount;
systemStateBroadcaster.handleNewClusterStates(stateBundle);
// Iff master, always store new version in ZooKeeper _before_ publishing to any
@@ -365,12 +366,20 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta
private boolean maybePublishOldMetrics() {
verifyInControllerThread();
- if (isMaster() && cycleCount > 300 + lastMetricUpdateCycleCount) {
- ClusterStateBundle stateBundle = stateVersionTracker.getVersionedClusterStateBundle();
- ClusterState baselineState = stateBundle.getBaselineClusterState();
- metricUpdater.updateClusterStateMetrics(cluster, baselineState,
- ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit(), stateBundle.getFeedBlock()));
- lastMetricUpdateCycleCount = cycleCount;
+ if (cycleCount > 300 + lastMetricUpdateCycleCount) {
+ if (isMaster()) {
+ updateMasterClusterSyncMetrics();
+ ClusterStateBundle stateBundle = stateVersionTracker.getVersionedClusterStateBundle();
+ ClusterState baselineState = stateBundle.getBaselineClusterState();
+ metricUpdater.updateClusterStateMetrics(cluster, baselineState,
+ ResourceUsageStats.calculateFrom(cluster.getNodeInfos(), options.clusterFeedBlockLimit(), stateBundle.getFeedBlock()),
+ systemStateBroadcaster.getLastStateBroadcastTimePoint());
+ lastMetricUpdateCycleCount = cycleCount;
+ } else {
+ // If we're not the master we don't have any authoritative information about
+ // how out of sync the cluster nodes are, so reset the metric.
+ metricUpdater.updateClusterBucketsOutOfSyncRatio(0);
+ }
return true;
} else {
return false;
@@ -542,7 +551,6 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta
didWork |= metricUpdater.forWork("processNextQueuedRemoteTask", this::processNextQueuedRemoteTask);
didWork |= metricUpdater.forWork("completeSatisfiedVersionDependentTasks", this::completeSatisfiedVersionDependentTasks);
didWork |= metricUpdater.forWork("maybePublishOldMetrics", this::maybePublishOldMetrics);
- updateClusterSyncMetrics();
processingCycle = false;
++cycleCount;
@@ -564,7 +572,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta
}
}
- private void updateClusterSyncMetrics() {
+ private void updateMasterClusterSyncMetrics() {
var stats = stateVersionTracker.getAggregatedClusterStats().getAggregatedStats();
if (stats.hasUpdatesFromAllDistributors()) {
GlobalBucketSyncStatsCalculator.clusterBucketsOutOfSyncRatio(stats.getGlobalStats())
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MetricUpdater.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MetricUpdater.java
index d149d4043e4..d72ede7199e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MetricUpdater.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/MetricUpdater.java
@@ -10,6 +10,7 @@ import com.yahoo.vespa.clustercontroller.utils.util.ComponentMetricReporter;
import com.yahoo.vespa.clustercontroller.utils.util.MetricReporter;
import java.time.Duration;
+import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedSet;
@@ -19,17 +20,27 @@ import java.util.function.BooleanSupplier;
public class MetricUpdater {
private final ComponentMetricReporter metricReporter;
+ private final Timer timer;
+ // Publishing and converging on a cluster state version is never instant nor atomic, but
+ // it usually completes within a few seconds. If convergence does not happen for more than
+ // 30 seconds, it's a sign something has stalled.
+ private Duration stateVersionConvergenceGracePeriod = Duration.ofSeconds(30);
- public MetricUpdater(MetricReporter metricReporter, int controllerIndex, String clusterName) {
+ public MetricUpdater(MetricReporter metricReporter, Timer timer, int controllerIndex, String clusterName) {
this.metricReporter = new ComponentMetricReporter(metricReporter, "cluster-controller.");
this.metricReporter.addDimension("controller-index", String.valueOf(controllerIndex));
this.metricReporter.addDimension("clusterid", clusterName);
+ this.timer = timer;
}
public MetricReporter.Context createContext(Map<String, String> dimensions) {
return metricReporter.createContext(dimensions);
}
+ public void setStateVersionConvergenceGracePeriod(Duration gracePeriod) {
+ stateVersionConvergenceGracePeriod = gracePeriod;
+ }
+
private static int nodesInAvailableState(Map<State, Integer> nodeCounts) {
return nodeCounts.getOrDefault(State.INITIALIZING, 0)
+ nodeCounts.getOrDefault(State.RETIRED, 0)
@@ -39,10 +50,13 @@ public class MetricUpdater {
+ nodeCounts.getOrDefault(State.MAINTENANCE, 0);
}
- public void updateClusterStateMetrics(ContentCluster cluster, ClusterState state, ResourceUsageStats resourceUsage) {
+ public void updateClusterStateMetrics(ContentCluster cluster, ClusterState state,
+ ResourceUsageStats resourceUsage, Instant lastStateBroadcastTimePoint) {
Map<String, String> dimensions = new HashMap<>();
dimensions.put("cluster", cluster.getName());
dimensions.put("clusterid", cluster.getName());
+ Instant now = timer.getCurrentWallClockTime();
+ boolean convergenceDeadlinePassed = lastStateBroadcastTimePoint.plus(stateVersionConvergenceGracePeriod).isBefore(now);
for (NodeType type : NodeType.getTypes()) {
dimensions.put("node-type", type.toString().toLowerCase());
MetricReporter.Context context = createContext(dimensions);
@@ -50,10 +64,18 @@ public class MetricUpdater {
for (State s : State.values()) {
nodeCounts.put(s, 0);
}
+ int nodesNotConverged = 0;
for (Integer i : cluster.getConfiguredNodes().keySet()) {
- NodeState s = state.getNodeState(new Node(type, i));
+ var node = new Node(type, i);
+ NodeState s = state.getNodeState(node);
Integer count = nodeCounts.get(s.getState());
nodeCounts.put(s.getState(), count + 1);
+ var info = cluster.getNodeInfo(node);
+ if (info != null && convergenceDeadlinePassed && s.getState().oneOf("uir")) {
+ if (info.getClusterStateVersionBundleAcknowledged() != state.getVersion()) {
+ nodesNotConverged++;
+ }
+ }
}
for (State s : State.values()) {
String name = s.toString().toLowerCase() + ".count";
@@ -63,6 +85,7 @@ public class MetricUpdater {
final int availableNodes = nodesInAvailableState(nodeCounts);
final int totalNodes = Math.max(cluster.getConfiguredNodes().size(), 1); // Assumes 1-1 between distributor and storage
metricReporter.set("available-nodes.ratio", (double)availableNodes / totalNodes, context);
+ metricReporter.set("nodes-not-converged", nodesNotConverged, context);
}
dimensions.remove("node-type");
MetricReporter.Context context = createContext(dimensions);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RealTimer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RealTimer.java
index b4563c09b66..482b40381df 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RealTimer.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RealTimer.java
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import java.time.Instant;
import java.util.Calendar;
import java.util.Locale;
import java.util.TimeZone;
@@ -10,8 +11,9 @@ import java.util.TimeZone;
*/
public class RealTimer implements Timer {
- public long getCurrentTimeInMillis() {
- return System.currentTimeMillis();
+ @Override
+ public Instant getCurrentWallClockTime() {
+ return Instant.now();
}
public static String printDuration(long time) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index c74a846fe30..0337e187b8e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
+import java.time.Instant;
import java.util.logging.Level;
import java.util.LinkedList;
import java.util.List;
@@ -29,6 +30,7 @@ public class SystemStateBroadcaster {
private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000;
private final Map<Node, Long> lastErrorReported = new TreeMap<>();
+ private Instant lastStateBroadcastTimePoint = Instant.EPOCH;
private int lastOfficialStateVersion = -1;
private int lastStateVersionBundleAcked = 0;
private int lastClusterStateVersionConverged = 0;
@@ -45,6 +47,7 @@ public class SystemStateBroadcaster {
public void handleNewClusterStates(ClusterStateBundle state) {
clusterStateBundle = state;
+ lastStateBroadcastTimePoint = Instant.ofEpochMilli(timer.getCurrentTimeInMillis());
}
public ClusterState getClusterState() {
@@ -67,6 +70,10 @@ public class SystemStateBroadcaster {
return lastClusterStateBundleConverged;
}
+ public Instant getLastStateBroadcastTimePoint() {
+ return lastStateBroadcastTimePoint;
+ }
+
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
long time = timer.getCurrentTimeInMillis();
Long lastReported = lastErrorReported.get(info.getNode());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Timer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Timer.java
index 6c7da15b1a5..eaa60b3d675 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Timer.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Timer.java
@@ -1,12 +1,18 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import java.time.Instant;
+
/**
* Interface used to get time. This is separated into its own class, such that unit tests can fake timing to do timing related
* tests without relying on the speed of the unit test processing.
*/
public interface Timer {
- long getCurrentTimeInMillis();
+ Instant getCurrentWallClockTime();
+
+ default long getCurrentTimeInMillis() {
+ return getCurrentWallClockTime().toEpochMilli();
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ContentClusterHtmlRendererTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ContentClusterHtmlRendererTest.java
index 8048e77b05c..af9e2104828 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ContentClusterHtmlRendererTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ContentClusterHtmlRendererTest.java
@@ -30,8 +30,9 @@ public class ContentClusterHtmlRendererTest {
ClusterStateBundle stateBundle = ClusterStateBundle.ofBaselineOnly(
AnnotatedClusterState.withoutAnnotations(
ClusterState.stateFromString("version:34633 bits:24 distributor:211 storage:211")));
- var metricUpdater = new MetricUpdater(new NoMetricReporter(), 0, clusterName);
- EventLog eventLog = new EventLog(new FakeTimer(), metricUpdater);
+ var timer = new FakeTimer();
+ var metricUpdater = new MetricUpdater(new NoMetricReporter(), timer, 0, clusterName);
+ EventLog eventLog = new EventLog(timer, metricUpdater);
VdsClusterHtmlRenderer.Table table = renderer.createNewClusterHtmlTable(clusterName, slobrokGeneration);
ContentCluster contentCluster = mock(ContentCluster.class);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FakeTimer.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FakeTimer.java
index 9146b2812a9..1cefac311d3 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FakeTimer.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FakeTimer.java
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import java.time.Instant;
import java.util.logging.Level;
import com.yahoo.vespa.clustercontroller.core.testutils.LogFormatter;
@@ -22,8 +23,9 @@ public class FakeTimer implements Timer {
// Don't start at zero. Clock users may initialize a 'last run' entry with 0, and we want first time to always look like a timeout
private long currentTime = (long) 30 * 365 * 24 * 60 * 60 * 1000;
- public synchronized long getCurrentTimeInMillis() {
- return currentTime;
+ @Override
+ public synchronized Instant getCurrentWallClockTime() {
+ return Instant.ofEpochMilli(currentTime);
}
public synchronized void advanceTime(long time) {
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 3b26e3b6965..688d82e5ebb 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -120,7 +120,7 @@ public abstract class FleetControllerTest implements Waiter {
RpcServer rpcServer,
boolean start) {
waiter = createWaiter(timer);
- var metricUpdater = new MetricUpdater(new NoMetricReporter(), options.fleetControllerIndex(), options.clusterName());
+ var metricUpdater = new MetricUpdater(new NoMetricReporter(), timer, options.fleetControllerIndex(), options.clusterName());
var log = new EventLog(timer, metricUpdater);
var cluster = new ContentCluster(options.clusterName(), options.nodes(), options.storageDistribution());
var stateGatherer = new NodeStateGatherer(timer, timer, log);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MetricReporterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MetricReporterTest.java
index 7175aefa97c..5eeb4c55387 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MetricReporterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MetricReporterTest.java
@@ -2,10 +2,13 @@
package com.yahoo.vespa.clustercontroller.core;
import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vdslib.state.Node;
import com.yahoo.vespa.clustercontroller.core.matchers.HasMetricContext;
import com.yahoo.vespa.clustercontroller.utils.util.MetricReporter;
import org.junit.jupiter.api.Test;
+import java.time.Duration;
+import java.time.Instant;
import java.util.Map;
import static com.yahoo.vespa.clustercontroller.core.matchers.HasMetricContext.hasMetricContext;
@@ -25,7 +28,8 @@ public class MetricReporterTest {
private static class Fixture {
final MetricReporter mockReporter = mock(MetricReporter.class);
- final MetricUpdater metricUpdater = new MetricUpdater(mockReporter, 0, CLUSTER_NAME);
+ final FakeTimer timer = new FakeTimer();
+ final MetricUpdater metricUpdater = new MetricUpdater(mockReporter, timer, 0, CLUSTER_NAME);
final ClusterFixture clusterFixture;
Fixture() {
@@ -61,7 +65,7 @@ public class MetricReporterTest {
Fixture f = new Fixture();
f.metricUpdater.updateClusterStateMetrics(f.clusterFixture.cluster(),
ClusterState.stateFromString("distributor:10 .1.s:d storage:9 .1.s:d .2.s:m .4.s:d"),
- new ResourceUsageStats());
+ new ResourceUsageStats(), Instant.ofEpochMilli(12345));
verify(f.mockReporter).set(eq("cluster-controller.up.count"), eq(9),
argThat(hasMetricContext(withNodeTypeDimension("distributor"))));
@@ -78,7 +82,7 @@ public class MetricReporterTest {
private void doTestRatiosInState(String clusterState, double distributorRatio, double storageRatio) {
Fixture f = new Fixture();
f.metricUpdater.updateClusterStateMetrics(f.clusterFixture.cluster(), ClusterState.stateFromString(clusterState),
- new ResourceUsageStats());
+ new ResourceUsageStats(), Instant.ofEpochMilli(12345));
verify(f.mockReporter).set(eq("cluster-controller.available-nodes.ratio"),
doubleThat(closeTo(distributorRatio, 0.0001)),
@@ -115,7 +119,7 @@ public class MetricReporterTest {
Fixture f = new Fixture();
f.metricUpdater.updateClusterStateMetrics(f.clusterFixture.cluster(),
ClusterState.stateFromString("distributor:10 storage:10"),
- new ResourceUsageStats(0.5, 0.6, 5, 0.7, 0.8));
+ new ResourceUsageStats(0.5, 0.6, 5, 0.7, 0.8), Instant.ofEpochMilli(12345));
verify(f.mockReporter).set(eq("cluster-controller.resource_usage.max_disk_utilization"),
doubleThat(closeTo(0.5, 0.0001)),
@@ -138,4 +142,73 @@ public class MetricReporterTest {
argThat(hasMetricContext(withClusterDimension())));
}
+ private static class ConvergenceFixture extends Fixture {
+
+ String stateString;
+ Instant stateBroadcastTime;
+
+ ConvergenceFixture(String stateString) {
+ super(5);
+ this.stateString = stateString;
+ setUpFixturePendingVersions();
+
+ metricUpdater.setStateVersionConvergenceGracePeriod(Duration.ofSeconds(10));
+ stateBroadcastTime = timer.getCurrentWallClockTime();
+ }
+
+ // Sets pending state versions for 5 distributors and storage nodes:
+ // - distributor: 2 converged, 3 not converged
+ // - storage: 3 converged, 2 not converged
+ private void setUpFixturePendingVersions() {
+ var pendingBundle = ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(
+ ClusterState.stateFromString(stateString)));
+ for (int i = 0; i < 5; ++i) {
+ clusterFixture.cluster().getNodeInfo(Node.ofDistributor(i)).setClusterStateVersionBundleSent(pendingBundle);
+ clusterFixture.cluster().getNodeInfo(Node.ofStorage(i)).setClusterStateVersionBundleSent(pendingBundle);
+ }
+ clusterFixture.cluster().getNodeInfo(Node.ofDistributor(0)).setClusterStateBundleVersionAcknowledged(100, false); // NACK
+ clusterFixture.cluster().getNodeInfo(Node.ofDistributor(1)).setClusterStateBundleVersionAcknowledged(100, true);
+ clusterFixture.cluster().getNodeInfo(Node.ofDistributor(4)).setClusterStateBundleVersionAcknowledged(100, true);
+ // Heard nothing from distributors {2, 3} yet.
+ clusterFixture.cluster().getNodeInfo(Node.ofStorage(0)).setClusterStateBundleVersionAcknowledged(100, true);
+ clusterFixture.cluster().getNodeInfo(Node.ofStorage(1)).setClusterStateBundleVersionAcknowledged(100, true);
+ clusterFixture.cluster().getNodeInfo(Node.ofStorage(2)).setClusterStateBundleVersionAcknowledged(100, true);
+ // Heard nothing from storage {3, 4} yet.
+ }
+
+ void advanceTimeAndVerifyMetrics(Duration delta, int expectedDistr, int expectedStorage) {
+ timer.advanceTime(delta.toMillis());
+ metricUpdater.updateClusterStateMetrics(clusterFixture.cluster(),
+ ClusterState.stateFromString(stateString),
+ new ResourceUsageStats(), stateBroadcastTime);
+
+ verify(mockReporter).set(eq("cluster-controller.nodes-not-converged"), eq(expectedDistr),
+ argThat(hasMetricContext(withNodeTypeDimension("distributor"))));
+ verify(mockReporter).set(eq("cluster-controller.nodes-not-converged"), eq(expectedStorage),
+ argThat(hasMetricContext(withNodeTypeDimension("storage"))));
+ }
+
+ }
+
+ @Test
+ void nodes_not_converged_metric_not_incremented_when_within_grace_period() {
+ var f = new ConvergenceFixture("version:100 distributor:5 storage:5");
+ // 9 seconds passed since state broadcast; should not tag nodes as not converged
+ f.advanceTimeAndVerifyMetrics(Duration.ofMillis(9000), 0, 0);
+ }
+
+ @Test
+ void nodes_not_converged_metric_incremented_when_outside_grace_period() {
+ var f = new ConvergenceFixture("version:100 distributor:5 storage:5");
+ // 10+ seconds passed since state broadcast; _should_ tag nodes as not converged
+ f.advanceTimeAndVerifyMetrics(Duration.ofMillis(10001), 3, 2);
+ }
+
+ @Test
+ void only_count_nodes_in_available_states_as_non_converging() {
+ var f = new ConvergenceFixture("version:100 distributor:5 .0.s:d .2.s:d .3.s:d storage:5 .3.s:m .4.s:d");
+ // Should not count non-converged nodes, as they are not in an available state
+ f.advanceTimeAndVerifyMetrics(Duration.ofMillis(10001), 0, 0);
+ }
+
}