summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java')
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java47
1 files changed, 43 insertions, 4 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 98ad19b954a..a7c7845fea9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor;
import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
@@ -157,7 +158,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle,
Communicator.Waiter<SetClusterStateRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
- nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState());
+ nodeInfo.setClusterStateVersionBundleSent(stateBundle);
var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion());
req.setReply(new ClusterStateVersionSpecificRequest.Reply());
@@ -166,15 +167,22 @@ public class SystemStateBroadcasterTest {
private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
ClusterStateBundle stateBundle,
+ int actualVersion,
Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion());
var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion());
- req.setReply(new ClusterStateVersionSpecificRequest.Reply());
+ req.setReply(ClusterStateVersionSpecificRequest.Reply.withActualVersion(actualVersion));
waiter.done(req);
}
+ private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
+ respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter);
+ }
+
private static class StateActivationFixture extends Fixture {
ClusterStateBundle stateBundle;
ClusterFixture cf;
@@ -226,8 +234,7 @@ public class SystemStateBroadcasterTest {
// No activations should be sent yet
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
- verify(f.mockCommunicator, times(0)).activateClusterStateVersion(
- eq(123), eq(nodeInfo), any());
+ verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any());
});
assertNull(f.broadcaster.getLastClusterStateBundleConverged());
@@ -298,4 +305,36 @@ public class SystemStateBroadcasterTest {
});
}
+ @Test
+ public void activation_convergence_considers_actual_version_returned_from_node() {
+ var f = StateActivationFixture.withTwoPhaseEnabled();
+ var cf = f.cf;
+
+ f.expectSetSystemStateInvocationsToBothDistributors();
+ f.simulateBroadcastTick(cf);
+ // ACK state bundle from both distributors
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
+
+ f.simulateBroadcastTick(cf);
+
+ final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+ final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> {
+ verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo),
+ (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture());
+ });
+
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)),
+ f.stateBundle, d0ActivateWaiter.getValue());
+ // Distributor 1 reports higher actual version, should not cause this version to be
+ // considered converged since it's not an exact version match.
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)),
+ f.stateBundle, 124, d1ActivateWaiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ assertNull(f.broadcaster.getLastClusterStateBundleConverged());
+ }
+
}