diff options
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java')
-rw-r--r-- | clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java | 47 |
1 files changed, 43 insertions, 4 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 98ad19b954a..a7c7845fea9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor; import java.util.stream.Stream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -157,7 +158,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle, Communicator.Waiter<SetClusterStateRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... - nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState()); + nodeInfo.setClusterStateVersionBundleSent(stateBundle); var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion()); req.setReply(new ClusterStateVersionSpecificRequest.Reply()); @@ -166,15 +167,22 @@ public class SystemStateBroadcasterTest { private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, ClusterStateBundle stateBundle, + int actualVersion, Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion()); var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion()); - req.setReply(new ClusterStateVersionSpecificRequest.Reply()); + req.setReply(ClusterStateVersionSpecificRequest.Reply.withActualVersion(actualVersion)); waiter.done(req); } + private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter); + } + private static class StateActivationFixture extends Fixture { ClusterStateBundle stateBundle; ClusterFixture cf; @@ -226,8 +234,7 @@ public class SystemStateBroadcasterTest { // No activations should be sent yet cf.cluster().getNodeInfo().forEach(nodeInfo -> { - verify(f.mockCommunicator, times(0)).activateClusterStateVersion( - eq(123), eq(nodeInfo), any()); + verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any()); }); assertNull(f.broadcaster.getLastClusterStateBundleConverged()); @@ -298,4 +305,36 @@ public class SystemStateBroadcasterTest { }); } + @Test + public void activation_convergence_considers_actual_version_returned_from_node() { + var f = StateActivationFixture.withTwoPhaseEnabled(); + var cf = f.cf; + + f.expectSetSystemStateInvocationsToBothDistributors(); + f.simulateBroadcastTick(cf); + // ACK state bundle from both distributors + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); + + f.simulateBroadcastTick(cf); + + final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> { + verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo), + (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture()); + }); + + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)), + f.stateBundle, d0ActivateWaiter.getValue()); + // Distributor 1 reports higher actual version, should not cause this version to be + // considered converged since it's not an exact version match. + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)), + f.stateBundle, 124, d1ActivateWaiter.getValue()); + f.simulateBroadcastTick(cf); + + assertNull(f.broadcaster.getLastClusterStateBundleConverged()); + } + } |