summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java19
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java31
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java7
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java4
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java6
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java7
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java1
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java47
13 files changed, 113 insertions, 35 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
index b57004eedce..26d63f7ba60 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
@@ -1,6 +1,9 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+/**
+ * Wrapper for a cluster state activation request towards a single node.
+ */
public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest {
public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
index fda6ec19752..9e8abc0608e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
@@ -1,6 +1,10 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+/**
+ * Base class for distributor/content node node RPC requests that are bound
+ * to a particular cluster state version.
+ */
public abstract class ClusterStateVersionSpecificRequest {
private final NodeInfo nodeInfo;
@@ -24,6 +28,7 @@ public abstract class ClusterStateVersionSpecificRequest {
final int returnCode;
final String returnMessage;
+ final int actualVersion;
public Reply() {
this(0, null);
@@ -32,6 +37,17 @@ public abstract class ClusterStateVersionSpecificRequest {
public Reply(int returnCode, String returnMessage) {
this.returnCode = returnCode;
this.returnMessage = returnMessage;
+ this.actualVersion = -1;
+ }
+
+ private Reply(int actualVersion) {
+ this.returnCode = 0;
+ this.returnMessage = null;
+ this.actualVersion = actualVersion;
+ }
+
+ public static Reply withActualVersion(int version) {
+ return new Reply(version);
}
/** Returns whether this is an error response */
@@ -43,6 +59,9 @@ public abstract class ClusterStateVersionSpecificRequest {
/** Returns the message returned, or null if none */
public String getReturnMessage() { return returnMessage; }
+ /** Returns actual cluster state version active on node, or -1 if reply does not contain this information */
+ public int getActualVersion() { return actualVersion; }
+
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 791315bb3d1..ba35243c14d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -688,7 +688,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! newStates.isEmpty()) {
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : newStates) {
- for(SystemStateListener listener : systemStateListeners) {
+ for (SystemStateListener listener : systemStateListeners) {
listener.handleNewPublishedState(stateBundle);
}
}
@@ -698,7 +698,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! convergedStates.isEmpty()) {
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : convergedStates) {
- for(SystemStateListener listener : systemStateListeners) {
+ for (SystemStateListener listener : systemStateListeners) {
listener.handleStateConvergedInCluster(stateBundle);
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 58e2fa14d4f..82d13e2d9ef 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -77,9 +77,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
*/
private int version;
- // Mapping of cluster state version -> cluster state instance
- private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>();
- private ClusterState clusterStateVersionBundleAcknowledged;
+ // Mapping of cluster state version -> cluster state bundle instance
+ private TreeMap<Integer, ClusterStateBundle> clusterStateVersionBundleSent = new TreeMap<>();
+ private ClusterStateBundle clusterStateVersionBundleAcknowledged;
private int clusterStateVersionActivationSent = -1;
private int clusterStateVersionActivationAcked = -1;
@@ -420,7 +420,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (clusterStateVersionBundleSent.isEmpty()) {
return null;
}
- return clusterStateVersionBundleSent.lastEntry().getValue();
+ return clusterStateVersionBundleSent.lastEntry().getValue().getBaselineClusterState();
}
public int getNewestSystemStateVersionSent() {
ClusterState last = getNewestSystemStateSent();
@@ -430,14 +430,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public int getClusterStateVersionBundleAcknowledged() {
return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion());
}
- public void setClusterStateVersionBundleSent(ClusterState state) {
- if (state == null) {
+ public void setClusterStateVersionBundleSent(ClusterStateBundle stateBundle) {
+ if (stateBundle == null) {
throw new Error("Should not clear info for last version sent");
}
- if (clusterStateVersionBundleSent.containsKey(state.getVersion())) {
- throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node);
+ if (clusterStateVersionBundleSent.containsKey(stateBundle.getVersion())) {
+ throw new IllegalStateException("We have already sent cluster state version " + stateBundle.getVersion() + " to " + node);
}
- clusterStateVersionBundleSent.put(state.getVersion(), state);
+ clusterStateVersionBundleSent.put(stateBundle.getVersion(), stateBundle);
}
public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) {
if (version == null) {
@@ -446,14 +446,15 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (!clusterStateVersionBundleSent.containsKey(version)) {
throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node);
}
- ClusterState state = clusterStateVersionBundleSent.remove(version);
- if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) {
- clusterStateVersionBundleAcknowledged = state;
+ var stateBundle = clusterStateVersionBundleSent.remove(version);
+ if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < stateBundle.getVersion())) {
+ clusterStateVersionBundleAcknowledged = stateBundle;
if (wentDownWithStartTime != 0
- && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion())
- && !state.getNodeState(node).getState().oneOf("dsm"))
+ && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < stateBundle.getVersion())
+ && !stateBundle.getBaselineClusterState().getNodeState(node).getState().oneOf("dsm"))
{
- log.log(LogLevel.DEBUG, "Clearing going down timestamp of node " + node + " after receiving ack of cluster state " + state);
+ log.log(LogLevel.DEBUG, () -> String.format("Clearing going down timestamp of node %s after " +
+ "receiving ack of cluster state bundle %s", node, stateBundle));
wentDownWithStartTime = 0;
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 84d8accd7f4..7898e90f77a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -87,17 +87,25 @@ public class SystemStateBroadcaster {
NodeInfo info = req.getNodeInfo();
int version = req.getClusterStateVersion();
boolean success = true;
- if (req.getReply().isError()) {
+ var reply = req.getReply();
+ if (reply.isError()) {
// NO_SUCH_METHOD implies node is on a version that does not understand explicit activations
// and it has already merrily started using the state version. Treat as if it had been ACKed.
- if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
+ if (reply.getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
log.log(LogLevel.DEBUG, () -> String.format("Activation NACK for node %s with version %d, message %s",
- info, version, req.getReply().getReturnMessage()));
+ info, version, reply.getReturnMessage()));
success = false;
} else {
log.log(LogLevel.DEBUG, () -> String.format("Node %s did not understand state activation RPC; " +
"implicitly treating state %d as activated on node", info, version));
}
+ } else if (reply.getActualVersion() != version) {
+ log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " +
+ "reports it has an actual pending version of %d", version, info, reply.getActualVersion()));
+ success = false;
+ } else {
+ log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " +
+ "version %d", info, version));
}
info.setSystemStateVersionActivationAcked(version, success);
// TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
index 32d4046c181..c2f48ccf589 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
@@ -5,6 +5,9 @@ import com.yahoo.jrt.Request;
import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
+/**
+ * FRT RPC state implementation of a single cluster state activation request.
+ */
public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
Request request;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
index c5b1da0cb66..175a0b50cd6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
@@ -9,6 +9,10 @@ import com.yahoo.vespa.clustercontroller.core.Communicator;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.Timer;
+/**
+ * Binds together the reply received for a particular cluster state activation RPC and
+ * the cluster controller-internal callback handler which expects to receive it.
+ */
public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter;
@@ -29,7 +33,8 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
} else if (!req.checkReturnTypes("i")) {
return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
- return new ActivateClusterStateVersionRequest.Reply();
+ int actualVersion = req.returnValues().get(0).asInt32();
+ return ActivateClusterStateVersionRequest.Reply.withActualVersion(actualVersion);
}
@Override
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index f5629bda343..c3c5c9e3b98 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -162,7 +162,7 @@ public class RPCCommunicator implements Communicator {
waiter.setRequest(stateRequest);
connection.invokeAsync(req, 60, waiter);
- node.setClusterStateVersionBundleSent(baselineState);
+ node.setClusterStateVersionBundleSent(stateBundle);
}
@Override
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
index 127211fc911..339d305e823 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
@@ -154,9 +154,9 @@ public class ClusterStateBundleTest {
public void simple_bundle_without_derived_states_propagates_deferred_activation_flag() {
var bundle = ClusterStateBundle
.builder(annotatedStateOf("distributor:2 storage:2"))
- .deferredActivation(false) // defaults to true
+ .deferredActivation(true) // defaults to false
.deriveAndBuild();
- assertFalse(bundle.deferredActivation());
+ assertTrue(bundle.deferredActivation());
}
@Test
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index 185fdc0c7ca..8314839336e 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -24,7 +24,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks;
}
- public class DummyGetNodeStateRequest extends GetNodeStateRequest {
+ class DummyGetNodeStateRequest extends GetNodeStateRequest {
Waiter<GetNodeStateRequest> waiter;
public DummyGetNodeStateRequest(NodeInfo nodeInfo, Waiter<GetNodeStateRequest> waiter) {
@@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
ClusterState baselineState = stateBundle.getBaselineClusterState();
DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState);
- node.setClusterStateVersionBundleSent(baselineState);
+ node.setClusterStateVersionBundleSent(stateBundle);
req.setReply(new SetClusterStateRequest.Reply());
if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
waiter.done(req);
@@ -109,7 +109,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
@Override
public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> waiter) {
var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion);
- req.setReply(new ActivateClusterStateVersionRequest.Reply());
+ req.setReply(ActivateClusterStateVersionRequest.Reply.withActualVersion(clusterStateVersion));
waiter.done(req);
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index caa3fbcd6cd..d8994ff52df 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -580,11 +580,12 @@ public class DummyVdsNode {
synchronized(timer) {
int actualVersion = getLatestSystemStateVersion().orElse(0);
req.returnValues().add(new Int32Value(actualVersion));
- if (activateVersion != actualVersion) {
- req.setError(ErrorCode.METHOD_FAILED, "State version mismatch");
- } else {
+ if (activateVersion == actualVersion) {
activatedClusterStateVersion = activateVersion;
timer.notifyAll();
+ } else {
+ log.log(LogLevel.DEBUG, () -> String.format("Dummy node %s: got a mismatching activation (request version %d, " +
+ "actual %d), not marking version as active", this, activateVersion, actualVersion));
}
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index 4c040798eb0..ca246b3549f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -1534,7 +1534,6 @@ public class StateChangeTest extends FleetControllerTest {
public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception {
FleetControllerOptions options = defaultOptions();
options.minTimeBetweenNewSystemStates = 10_000;
- //options.enableTwoPhaseClusterStateActivation = false; ////
RemoteTaskFixture fixture = createFixtureWith(options);
// Have to increment timer here to be able to send state generated by the scheduled task
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 98ad19b954a..a7c7845fea9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -157,7 +158,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle,
Communicator.Waiter<SetClusterStateRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
- nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState());
+ nodeInfo.setClusterStateVersionBundleSent(stateBundle);
var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion());
req.setReply(new ClusterStateVersionSpecificRequest.Reply());
@@ -166,15 +167,22 @@ public class SystemStateBroadcasterTest {
private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
ClusterStateBundle stateBundle,
+ int actualVersion,
Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion());
var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion());
- req.setReply(new ClusterStateVersionSpecificRequest.Reply());
+ req.setReply(ClusterStateVersionSpecificRequest.Reply.withActualVersion(actualVersion));
waiter.done(req);
}
+ private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
+ respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter);
+ }
+
private static class StateActivationFixture extends Fixture {
ClusterStateBundle stateBundle;
ClusterFixture cf;
@@ -226,8 +234,7 @@ public class SystemStateBroadcasterTest {
// No activations should be sent yet
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
- verify(f.mockCommunicator, times(0)).activateClusterStateVersion(
- eq(123), eq(nodeInfo), any());
+ verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any());
});
assertNull(f.broadcaster.getLastClusterStateBundleConverged());
@@ -298,4 +305,36 @@ public class SystemStateBroadcasterTest {
});
}
+ @Test
+ public void activation_convergence_considers_actual_version_returned_from_node() {
+ var f = StateActivationFixture.withTwoPhaseEnabled();
+ var cf = f.cf;
+
+ f.expectSetSystemStateInvocationsToBothDistributors();
+ f.simulateBroadcastTick(cf);
+ // ACK state bundle from both distributors
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
+
+ f.simulateBroadcastTick(cf);
+
+ final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+ final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> {
+ verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo),
+ (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture());
+ });
+
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)),
+ f.stateBundle, d0ActivateWaiter.getValue());
+ // Distributor 1 reports higher actual version, should not cause this version to be
+ // considered converged since it's not an exact version match.
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)),
+ f.stateBundle, 124, d1ActivateWaiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ assertNull(f.broadcaster.getLastClusterStateBundleConverged());
+ }
+
}