diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-19 13:25:20 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-20 16:56:58 +0100 |
commit | 030f5a27899cb6e3ec09cf1dcb0cc433a871db89 (patch) | |
tree | 8f2b6eb5e619fbfb8f779ebd96999e177e1ae533 /clustercontroller-core | |
parent | c3650cea23c1297e0db87b1bef9005dedda518d2 (diff) |
Add explicit tests of `SystemStateBroadcaster` behavior
Diffstat (limited to 'clustercontroller-core')
2 files changed, 177 insertions, 14 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 8f306036301..84d8accd7f4 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -274,9 +274,8 @@ public class SystemStateBroadcaster { return false; } - if ((lastStateVersionBundleAcked != clusterStateBundle.getVersion()) - || !dbContext.getFleetController().getOptions().enableTwoPhaseClusterStateActivation) { - return false; // Not yet received bundle ACK from all nodes; wait. + if (!clusterStateBundle.deferredActivation() || !allDistributorsHaveAckedSentClusterStateBundle()) { + return false; } var recipients = resolveStateActivationSendSet(dbContext); @@ -289,6 +288,10 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } + private boolean allDistributorsHaveAckedSentClusterStateBundle() { + return (lastStateVersionBundleAcked == clusterStateBundle.getVersion()); + } + public int lastClusterStateVersionInSync() { return lastClusterStateVersionConverged; } private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) { diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 40175cd7bc9..f4afb08de7c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -6,12 +6,16 @@ import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; import org.junit.Test; +import org.mockito.ArgumentCaptor; import java.util.stream.Stream; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; public class SystemStateBroadcasterTest { @@ -21,6 +25,8 @@ public class SystemStateBroadcasterTest { final Object monitor = new Object(); SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor); Communicator mockCommunicator = mock(Communicator.class); + DatabaseHandler mockDatabaseHandler = mock(DatabaseHandler.class); + FleetController mockFleetController = mock(FleetController.class); void simulateNodePartitionedAwaySilently(ClusterFixture cf) { cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600); @@ -32,6 +38,18 @@ public class SystemStateBroadcasterTest { cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000); cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000); } + + void simulateBroadcastTick(ClusterFixture cf) { + broadcaster.processResponses(); + broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); + try { + broadcaster.checkIfClusterStateIsAckedByAllDistributors( + mockDatabaseHandler, dbContextFrom(cf.cluster()), mockFleetController); + } catch (Exception e) { + throw new RuntimeException(e); + } + broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); // nope! + } } private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) { @@ -123,21 +141,163 @@ public class SystemStateBroadcasterTest { verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } + private class MockSetClusterStateRequest extends SetClusterStateRequest { + public MockSetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) { + super(nodeInfo, clusterStateVersion); + } + } + + private class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { + public MockActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) { + super(nodeInfo, systemStateVersion); + } + } + + private void respondToSetClusterStateBundle(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<SetClusterStateRequest> waiter) { + // Have to patch in that we've actually sent the bundle in the first place... + nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState()); + + var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion()); + req.setReply(new ClusterStateVersionSpecificRequest.Reply()); + waiter.done(req); + } + + private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + // Have to patch in that we've actually sent the bundle in the first place... + nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion()); + + var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion()); + req.setReply(new ClusterStateVersionSpecificRequest.Reply()); + waiter.done(req); + } + + private static class StateActivationFixture extends Fixture { + ClusterStateBundle stateBundle; + ClusterFixture cf; + + final ArgumentCaptor<Communicator.Waiter> d0Waiter; + final ArgumentCaptor<Communicator.Waiter> d1Waiter; + + private StateActivationFixture(boolean enableDeferred) { + super(); + stateBundle = ClusterStateBundleUtil + .makeBundleBuilder("version:123 distributor:2 storage:2") + .deferredActivation(enableDeferred) + .deriveAndBuild(); + cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + broadcaster.handleNewClusterStates(stateBundle); + broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); + + d0Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + d1Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + } + + void expectSetSystemStateInvocationsToBothDistributors() { + clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> { + verify(mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), + (nodeInfo.getNodeIndex() == 0 ? d0Waiter : d1Waiter).capture()); + }); + } + + static StateActivationFixture withTwoPhaseEnabled() { + return new StateActivationFixture(true); + } + + static StateActivationFixture withTwoPhaseDisabled() { + return new StateActivationFixture(false); + } + } + @Test public void activation_not_sent_before_all_distributors_have_acked_state_bundle() { - Fixture f = new Fixture(); - ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); - ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); - f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + var f = StateActivationFixture.withTwoPhaseEnabled(); + var cf = f.cf; + + f.expectSetSystemStateInvocationsToBothDistributors(); + f.simulateBroadcastTick(cf); + + // Respond from distributor 0, but not yet from distributor 1 + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); + f.simulateBroadcastTick(cf); + + // No activations should be sent yet + cf.cluster().getNodeInfo().forEach(nodeInfo -> { + verify(f.mockCommunicator, times(0)).activateClusterStateVersion( + eq(123), eq(nodeInfo), any()); + }); + assertNull(f.broadcaster.getLastClusterStateBundleConverged()); + + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); + f.simulateBroadcastTick(cf); + + // Activation should now be sent to _all_ nodes (distributor and storage) + cf.cluster().getNodeInfo().forEach(nodeInfo -> { + verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo), any()); + }); + // But not converged yet, as activations have not been ACKed + assertNull(f.broadcaster.getLastClusterStateBundleConverged()); + } + + @Test + public void state_bundle_not_considered_converged_until_activation_acked_by_all_distributors() { + var f = StateActivationFixture.withTwoPhaseEnabled(); + var cf = f.cf; + + f.expectSetSystemStateInvocationsToBothDistributors(); + f.simulateBroadcastTick(cf); + // ACK state bundle from both distributors + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - // TODO + f.simulateBroadcastTick(cf); + + final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> { + verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo), + (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture()); + }); + + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)), + f.stateBundle, d0ActivateWaiter.getValue()); + f.simulateBroadcastTick(cf); + + assertNull(f.broadcaster.getLastClusterStateBundleConverged()); // Not yet converged + + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)), + f.stateBundle, d1ActivateWaiter.getValue()); + f.simulateBroadcastTick(cf); + + // Finally, all distributors have ACKed the version! State is marked as converged. + assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged()); + } + + @Test + public void activation_not_sent_if_deferred_activation_is_disabled_in_state_bundle() { + var f = StateActivationFixture.withTwoPhaseDisabled(); + var cf = f.cf; + + f.expectSetSystemStateInvocationsToBothDistributors(); + f.simulateBroadcastTick(cf); + // ACK state bundle from both distributors + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); + f.simulateBroadcastTick(cf); + + // At this point the cluster state shall be considered converged. + assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged()); + + // No activations shall have been sent. + clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> { + verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any()); + }); } - /* - TODO test - - activation not sent before distributors have acked - - activation not sent if two phase activation is disabled - */ + // TODO FleetControllerOptions change default 2phase option to false and override tests...? Sounds safer! } |