summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-19 13:25:20 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-20 16:56:58 +0100
commit030f5a27899cb6e3ec09cf1dcb0cc433a871db89 (patch)
tree8f2b6eb5e619fbfb8f779ebd96999e177e1ae533 /clustercontroller-core
parentc3650cea23c1297e0db87b1bef9005dedda518d2 (diff)
Add explicit tests of `SystemStateBroadcaster` behavior
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java9
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java182
2 files changed, 177 insertions, 14 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 8f306036301..84d8accd7f4 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -274,9 +274,8 @@ public class SystemStateBroadcaster {
return false;
}
- if ((lastStateVersionBundleAcked != clusterStateBundle.getVersion())
- || !dbContext.getFleetController().getOptions().enableTwoPhaseClusterStateActivation) {
- return false; // Not yet received bundle ACK from all nodes; wait.
+ if (!clusterStateBundle.deferredActivation() || !allDistributorsHaveAckedSentClusterStateBundle()) {
+ return false;
}
var recipients = resolveStateActivationSendSet(dbContext);
@@ -289,6 +288,10 @@ public class SystemStateBroadcaster {
return !recipients.isEmpty();
}
+ private boolean allDistributorsHaveAckedSentClusterStateBundle() {
+ return (lastStateVersionBundleAcked == clusterStateBundle.getVersion());
+ }
+
public int lastClusterStateVersionInSync() { return lastClusterStateVersionConverged; }
private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 40175cd7bc9..f4afb08de7c 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -6,12 +6,16 @@ import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
import java.util.stream.Stream;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
public class SystemStateBroadcasterTest {
@@ -21,6 +25,8 @@ public class SystemStateBroadcasterTest {
final Object monitor = new Object();
SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor);
Communicator mockCommunicator = mock(Communicator.class);
+ DatabaseHandler mockDatabaseHandler = mock(DatabaseHandler.class);
+ FleetController mockFleetController = mock(FleetController.class);
void simulateNodePartitionedAwaySilently(ClusterFixture cf) {
cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600);
@@ -32,6 +38,18 @@ public class SystemStateBroadcasterTest {
cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000);
cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000);
}
+
+ void simulateBroadcastTick(ClusterFixture cf) {
+ broadcaster.processResponses();
+ broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator);
+ try {
+ broadcaster.checkIfClusterStateIsAckedByAllDistributors(
+ mockDatabaseHandler, dbContextFrom(cf.cluster()), mockFleetController);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); // nope!
+ }
}
private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) {
@@ -123,21 +141,163 @@ public class SystemStateBroadcasterTest {
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
+ private class MockSetClusterStateRequest extends SetClusterStateRequest {
+ public MockSetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) {
+ super(nodeInfo, clusterStateVersion);
+ }
+ }
+
+ private class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
+ public MockActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) {
+ super(nodeInfo, systemStateVersion);
+ }
+ }
+
+ private void respondToSetClusterStateBundle(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<SetClusterStateRequest> waiter) {
+ // Have to patch in that we've actually sent the bundle in the first place...
+ nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState());
+
+ var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion());
+ req.setReply(new ClusterStateVersionSpecificRequest.Reply());
+ waiter.done(req);
+ }
+
+ private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
+ // Have to patch in that we've actually sent the bundle in the first place...
+ nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion());
+
+ var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion());
+ req.setReply(new ClusterStateVersionSpecificRequest.Reply());
+ waiter.done(req);
+ }
+
+ private static class StateActivationFixture extends Fixture {
+ ClusterStateBundle stateBundle;
+ ClusterFixture cf;
+
+ final ArgumentCaptor<Communicator.Waiter> d0Waiter;
+ final ArgumentCaptor<Communicator.Waiter> d1Waiter;
+
+ private StateActivationFixture(boolean enableDeferred) {
+ super();
+ stateBundle = ClusterStateBundleUtil
+ .makeBundleBuilder("version:123 distributor:2 storage:2")
+ .deferredActivation(enableDeferred)
+ .deriveAndBuild();
+ cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ broadcaster.handleNewClusterStates(stateBundle);
+ broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator);
+
+ d0Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+ d1Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+ }
+
+ void expectSetSystemStateInvocationsToBothDistributors() {
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> {
+ verify(mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo),
+ (nodeInfo.getNodeIndex() == 0 ? d0Waiter : d1Waiter).capture());
+ });
+ }
+
+ static StateActivationFixture withTwoPhaseEnabled() {
+ return new StateActivationFixture(true);
+ }
+
+ static StateActivationFixture withTwoPhaseDisabled() {
+ return new StateActivationFixture(false);
+ }
+ }
+
@Test
public void activation_not_sent_before_all_distributors_have_acked_state_bundle() {
- Fixture f = new Fixture();
- ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
- ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
- f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ var f = StateActivationFixture.withTwoPhaseEnabled();
+ var cf = f.cf;
+
+ f.expectSetSystemStateInvocationsToBothDistributors();
+ f.simulateBroadcastTick(cf);
+
+ // Respond from distributor 0, but not yet from distributor 1
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ // No activations should be sent yet
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> {
+ verify(f.mockCommunicator, times(0)).activateClusterStateVersion(
+ eq(123), eq(nodeInfo), any());
+ });
+ assertNull(f.broadcaster.getLastClusterStateBundleConverged());
+
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ // Activation should now be sent to _all_ nodes (distributor and storage)
+ cf.cluster().getNodeInfo().forEach(nodeInfo -> {
+ verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo), any());
+ });
+ // But not converged yet, as activations have not been ACKed
+ assertNull(f.broadcaster.getLastClusterStateBundleConverged());
+ }
+
+ @Test
+ public void state_bundle_not_considered_converged_until_activation_acked_by_all_distributors() {
+ var f = StateActivationFixture.withTwoPhaseEnabled();
+ var cf = f.cf;
+
+ f.expectSetSystemStateInvocationsToBothDistributors();
+ f.simulateBroadcastTick(cf);
+ // ACK state bundle from both distributors
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
- // TODO
+ f.simulateBroadcastTick(cf);
+
+ final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+ final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
+
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> {
+ verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo),
+ (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture());
+ });
+
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)),
+ f.stateBundle, d0ActivateWaiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ assertNull(f.broadcaster.getLastClusterStateBundleConverged()); // Not yet converged
+
+ respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)),
+ f.stateBundle, d1ActivateWaiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ // Finally, all distributors have ACKed the version! State is marked as converged.
+ assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged());
+ }
+
+ @Test
+ public void activation_not_sent_if_deferred_activation_is_disabled_in_state_bundle() {
+ var f = StateActivationFixture.withTwoPhaseDisabled();
+ var cf = f.cf;
+
+ f.expectSetSystemStateInvocationsToBothDistributors();
+ f.simulateBroadcastTick(cf);
+ // ACK state bundle from both distributors
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
+ f.simulateBroadcastTick(cf);
+
+ // At this point the cluster state shall be considered converged.
+ assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged());
+
+ // No activations shall have been sent.
+ clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> {
+ verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any());
+ });
}
- /*
- TODO test
- - activation not sent before distributors have acked
- - activation not sent if two phase activation is disabled
- */
+ // TODO FleetControllerOptions change default 2phase option to false and override tests...? Sounds safer!
}