aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-21 14:52:41 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-21 15:17:33 +0100
commitb84a65296f8b63d7dbc6ad72bf9de7aa724ca4f4 (patch)
treef3ea495507e20126f6eecf7dc7be2141a546800c /clustercontroller-core
parent9c7c9ceecbb58f1ee82d212a6347bd5ec676490d (diff)
Activation reply processing must inspect actual version returned
Version mismatches in backend do not return explicit RPC errors, so actual vs. desired versions must be checked in order to avoid potentially spurious activation of other versions. Also do some minor code cleanup.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java19
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java31
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java7
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java4
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java6
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java7
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java1
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java47
13 files changed, 113 insertions, 35 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
index b57004eedce..26d63f7ba60 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
@@ -1,6 +1,9 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+/**
+ * Wrapper for a cluster state activation request towards a single node.
+ */
public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest {
public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
index fda6ec19752..9e8abc0608e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
@@ -1,6 +1,10 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+/**
+ * Base class for distributor/content node node RPC requests that are bound
+ * to a particular cluster state version.
+ */
public abstract class ClusterStateVersionSpecificRequest {
private final NodeInfo nodeInfo;
@@ -24,6 +28,7 @@ public abstract class ClusterStateVersionSpecificRequest {
final int returnCode;
final String returnMessage;
+ final int actualVersion;
public Reply() {
this(0, null);
@@ -32,6 +37,17 @@ public abstract class ClusterStateVersionSpecificRequest {
public Reply(int returnCode, String returnMessage) {
this.returnCode = returnCode;
this.returnMessage = returnMessage;
+ this.actualVersion = -1;
+ }
+
+ private Reply(int actualVersion) {
+ this.returnCode = 0;
+ this.returnMessage = null;
+ this.actualVersion = actualVersion;
+ }
+
+ public static Reply withActualVersion(int version) {
+ return new Reply(version);
}
/** Returns whether this is an error response */
@@ -43,6 +59,9 @@ public abstract class ClusterStateVersionSpecificRequest {
/** Returns the message returned, or null if none */
public String getReturnMessage() { return returnMessage; }
+ /** Returns actual cluster state version active on node, or -1 if reply does not contain this information */
+ public int getActualVersion() { return actualVersion; }
+
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 791315bb3d1..ba35243c14d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -688,7 +688,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! newStates.isEmpty()) {
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : newStates) {
- for(SystemStateListener listener : systemStateListeners) {
+ for (SystemStateListener listener : systemStateListeners) {
listener.handleNewPublishedState(stateBundle);
}
}
@@ -698,7 +698,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! convergedStates.isEmpty()) {
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : convergedStates) {
- for(SystemStateListener listener : systemStateListeners) {
+ for (SystemStateListener listener : systemStateListeners) {
listener.handleStateConvergedInCluster(stateBundle);
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 58e2fa14d4f..82d13e2d9ef 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -77,9 +77,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
*/
private int version;
- // Mapping of cluster state version -> cluster state instance
- private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>();
- private ClusterState clusterStateVersionBundleAcknowledged;
+ // Mapping of cluster state version -> cluster state bundle instance
+ private TreeMap<Integer, ClusterStateBundle> clusterStateVersionBundleSent = new TreeMap<>();
+ private ClusterStateBundle clusterStateVersionBundleAcknowledged;
private int clusterStateVersionActivationSent = -1;
private int clusterStateVersionActivationAcked = -1;
@@ -420,7 +420,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (clusterStateVersionBundleSent.isEmpty()) {
return null;
}
- return clusterStateVersionBundleSent.lastEntry().getValue();
+ return clusterStateVersionBundleSent.lastEntry().getValue().getBaselineClusterState();
}
public int getNewestSystemStateVersionSent() {
ClusterState last = getNewestSystemStateSent();
@@ -430,14 +430,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public int getClusterStateVersionBundleAcknowledged() {
return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion());
}
- public void setClusterStateVersionBundleSent(ClusterState state) {
- if (state == null) {
+ public void setClusterStateVersionBundleSent(ClusterStateBundle stateBundle) {
+ if (stateBundle == null) {
throw new Error("Should not clear info for last version sent");
}
- if (clusterStateVersionBundleSent.containsKey(state.getVersion())) {
- throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node);
+ if (clusterStateVersionBundleSent.containsKey(stateBundle.getVersion())) {
+ throw new IllegalStateException("We have already sent cluster state version " + stateBundle.getVersion() + " to " + node);
}
- clusterStateVersionBundleSent.put(state.getVersion(), state);
+ clusterStateVersionBundleSent.put(stateBundle.getVersion(), stateBundle);
}
public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) {
if (version == null) {
@@ -446,14 +446,15 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (!clusterStateVersionBundleSent.containsKey(version)) {
throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node);
}
- ClusterState state = clusterStateVersionBundleSent.remove(version);
- if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) {
- clusterStateVersionBundleAcknowledged = state;
+ var stateBundle = clusterStateVersionBundleSent.remove(version);
+ if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < stateBundle.getVersion())) {
+ clusterStateVersionBundleAcknowledged = stateBundle;
if (wentDownWithStartTime != 0
- && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion())
- && !state.getNodeState(node).getState().oneOf("dsm"))
+ && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < stateBundle.getVersion())
+ && !stateBundle.getBaselineClusterState().getNodeState(node).getState().oneOf("dsm"))
{
- log.log(LogLevel.DEBUG, "Clearing going down timestamp of node " + node + " after receiving ack of cluster state " + state);
+ log.log(LogLevel.DEBUG, () -> String.format("Clearing going down timestamp of node %s after " +
+ "receiving ack of cluster state bundle %s", node, stateBundle));
wentDownWithStartTime = 0;
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 84d8accd7f4..7898e90f77a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -87,17 +87,25 @@ public class SystemStateBroadcaster {
NodeInfo info = req.getNodeInfo();
int version = req.getClusterStateVersion();
boolean success = true;
- if (req.getReply().isError()) {
+ var reply = req.getReply();
+ if (reply.isError()) {
// NO_SUCH_METHOD implies node is on a version that does not understand explicit activations
// and it has already merrily started using the state version. Treat as if it had been ACKed.
- if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
+ if (reply.getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
log.log(LogLevel.DEBUG, () -> String.format("Activation NACK for node %s with version %d, message %s",
- info, version, req.getReply().getReturnMessage()));
+ info, version, reply.getReturnMessage()));
success = false;
} else {
log.log(LogLevel.DEBUG, () -> String.format("Node %s did not understand state activation RPC; " +
"implicitly treating state %d as activated on node", info, version));
}
+ } else if (reply.getActualVersion() != version) {
+ log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " +
+ "reports it has an actual pending version of %d", version, info, reply.getActualVersion()));
+ success = false;
+ } else {
+ log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " +
+ "version %d", info, version));
}
info.setSystemStateVersionActivationAcked(version, success);
// TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
index 32d4046c181..c2f48ccf589 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
@@ -5,6 +5,9 @@ import com.yahoo.jrt.Request;
import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
+/**
+ * FRT RPC state implementation of a single cluster state activation request.
+ */
public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
Request request;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
index c5b1da0cb66..175a0b50cd6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
@@ -9,6 +9,10 @@ import com.yahoo.vespa.clustercontroller.core.Communicator;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.Timer;
+/**
+ * Binds together the reply received for a particular cluster state activation RPC and
+ * the cluster controller-internal callback handler which expects to receive it.
+ */
public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter;
@@ -29,7 +33,8 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
} else if (!req.checkReturnTypes("i")) {
return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
- return new ActivateClusterStateVersionRequest.Reply();
+ int actualVersion = req.returnValues().get(0).asInt32();
+ return ActivateClusterStateVersionRequest.Reply.withActualVersion(actualVersion);
}
@Override
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index f5629bda343..c3c5c9e3b98 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -162,7 +162,7 @@ public class RPCCommunicator implements Communicator {
waiter.setRequest(stateRequest);
connection.invokeAsync(req, 60, waiter);
- node.setClusterStateVersionBundleSent(baselineState);
+ node.setClusterStateVersionBundleSent(stateBundle);
}
@Override
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
index 127211fc911..339d305e823 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java
@@ -154,9 +154,9 @@ public class ClusterStateBundleTest {
public void simple_bundle_without_derived_states_propagates_deferred_activation_flag() {
var bundle = ClusterStateBundle
.builder(annotatedStateOf("distributor:2 storage:2"))
- .deferredActivation(false) // defaults to true
+ .deferredActivation(true) // defaults to false
.deriveAndBuild();
- assertFalse(bundle.deferredActivation());
+ assertTrue(bundle.deferredActivation());
}
@Test
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index 185fdc0c7ca..8314839336e 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -24,7 +24,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks;
}
- public class DummyGetNodeStateRequest extends GetNodeStateRequest {
+ class DummyGetNodeStateRequest extends GetNodeStateRequest {
Waiter<GetNodeStateRequest> waiter;
public DummyGetNodeStateRequest(NodeInfo nodeInfo, Waiter<GetNodeStateRequest> waiter) {
@@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
ClusterState baselineState = stateBundle.getBaselineClusterState();
DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState);
- node.setClusterStateVersionBundleSent(baselineState);
+ node.setClusterStateVersionBundleSent(stateBundle);
req.setReply(new SetClusterStateRequest.Reply());
if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
waiter.done(req);
@@ -109,7 +109,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
@Override
public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> waiter) {
var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion);
- req.setReply(new ActivateClusterStateVersionRequest.Reply());
+ req.setReply(ActivateClusterStateVersionRequest.Reply.withActualVersion(clusterStateVersion));
waiter.done(req);
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index caa3fbcd6cd..d8994ff52df 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -580,11 +580,12 @@ public class DummyVdsNode {
synchronized(timer) {
int actualVersion = getLatestSystemStateVersion().orElse(0);
req.returnValues().add(new Int32Value(actualVersion));
- if (activateVersion != actualVersion) {
- req.setError(ErrorCode.METHOD_FAILED, "State version mismatch");
- } else {
+ if (activateVersion == actualVersion) {
activatedClusterStateVersion = activateVersion;
timer.notifyAll();
+ } else {
+ log.log(LogLevel.DEBUG, () -> String.format("Dummy node %s: got a mismatching activation (request version %d, " +
+ "actual %d), not marking version as active", this, activateVersion, actualVersion));
}
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index 4c040798eb0..ca246b3549f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -1534,7 +1534,6 @@ public class StateChangeTest extends FleetControllerTest {
public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception {
FleetControllerOptions options = defaultOptions();
options.minTimeBetweenNewSystemStates = 10_000;
- //options.enableTwoPhaseClusterStateActivation = false; ////
RemoteTaskFixture fixture = createFixtureWith(options);
// Have to increment timer here to be able to send state generated by the scheduled task
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 98ad19b954a..a7c7845fea9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -157,7 +158,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle,
Communicator.Waiter<SetClusterStateRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
- nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState());
+ nodeInfo.setClusterStateVersionBundleSent(stateBundle);
var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion());
req.setReply(new ClusterStateVersionSpecificRequest.Reply());
@@ -166,15 +167,22 @@ public class SystemStateBroadcasterTest {
private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
ClusterStateBundle stateBundle,
+ int actualVersion,
Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion());
var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion());
- req.setReply(new ClusterStateVersionSpecificRequest.Reply());
+ req.setReply(ClusterStateVersionSpecificRequest.Reply.withActualVersion(actualVersion));
waiter.done(req);
}
+ private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
+ respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter);
+ }
+
private static class StateActivationFixture extends Fixture {
ClusterStateBundle stateBundle;
ClusterFixture cf;
@@ -226,8 +234,7 @@ public class SystemStateBroadcasterTest {
// No activations should be sent yet
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
- verify(f.mockCommunicator, times(0)).activateClusterStateVersion(
- eq(123), eq(nodeInfo), any());
+ verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any());
});
assertNull(f.broadcaster.getLastClusterStateBundleConverged());
@@ -298,4 +305,36 @@ public class SystemStateBroadcasterTest {
});
}
+ @Test
+ public void activation_convergence_considers_actual_version_returned_from_node() {
+ var f = StateActivationFixture.withTwoPhaseEnabled();
+ var cf = f.cf;
+
+ f.expectSetSystemStateInvocationsToBothDistributors();
+ f.simulateBroadcastTick(cf);
+ // ACK state bundle from both distributors
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
+
+ f.simulateBroadcastTick(cf);
+
+ final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+ final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> {
+ verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo),
+ (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture());
+ });
+
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)),
+ f.stateBundle, d0ActivateWaiter.getValue());
+ // Distributor 1 reports higher actual version, should not cause this version to be
+ // considered converged since it's not an exact version match.
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)),
+ f.stateBundle, 124, d1ActivateWaiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ assertNull(f.broadcaster.getLastClusterStateBundleConverged());
+ }
+
}