From b84a65296f8b63d7dbc6ad72bf9de7aa724ca4f4 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 21 Mar 2019 14:52:41 +0100 Subject: Activation reply processing must inspect actual version returned Version mismatches in backend do not return explicit RPC errors, so actual vs. desired versions must be checked in order to avoid potentially spurious activation of other versions. Also do some minor code cleanup. --- .../core/ClusterStateBundleTest.java | 4 +- .../clustercontroller/core/DummyCommunicator.java | 6 +-- .../vespa/clustercontroller/core/DummyVdsNode.java | 7 ++-- .../clustercontroller/core/StateChangeTest.java | 1 - .../core/SystemStateBroadcasterTest.java | 47 ++++++++++++++++++++-- 5 files changed, 52 insertions(+), 13 deletions(-) (limited to 'clustercontroller-core/src/test/java/com/yahoo') diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java index 127211fc911..339d305e823 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java @@ -154,9 +154,9 @@ public class ClusterStateBundleTest { public void simple_bundle_without_derived_states_propagates_deferred_activation_flag() { var bundle = ClusterStateBundle .builder(annotatedStateOf("distributor:2 storage:2")) - .deferredActivation(false) // defaults to true + .deferredActivation(true) // defaults to false .deriveAndBuild(); - assertFalse(bundle.deferredActivation()); + assertTrue(bundle.deferredActivation()); } @Test diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index 185fdc0c7ca..8314839336e 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -24,7 +24,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks; } - public class DummyGetNodeStateRequest extends GetNodeStateRequest { + class DummyGetNodeStateRequest extends GetNodeStateRequest { Waiter waiter; public DummyGetNodeStateRequest(NodeInfo nodeInfo, Waiter waiter) { @@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter waiter) { ClusterState baselineState = stateBundle.getBaselineClusterState(); DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState); - node.setClusterStateVersionBundleSent(baselineState); + node.setClusterStateVersionBundleSent(stateBundle); req.setReply(new SetClusterStateRequest.Reply()); if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) { waiter.done(req); @@ -109,7 +109,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { @Override public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter waiter) { var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion); - req.setReply(new ActivateClusterStateVersionRequest.Reply()); + req.setReply(ActivateClusterStateVersionRequest.Reply.withActualVersion(clusterStateVersion)); waiter.done(req); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index caa3fbcd6cd..d8994ff52df 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -580,11 +580,12 @@ public class DummyVdsNode { synchronized(timer) { int actualVersion = getLatestSystemStateVersion().orElse(0); req.returnValues().add(new Int32Value(actualVersion)); - if (activateVersion != actualVersion) { - req.setError(ErrorCode.METHOD_FAILED, "State version mismatch"); - } else { + if (activateVersion == actualVersion) { activatedClusterStateVersion = activateVersion; timer.notifyAll(); + } else { + log.log(LogLevel.DEBUG, () -> String.format("Dummy node %s: got a mismatching activation (request version %d, " + + "actual %d), not marking version as active", this, activateVersion, actualVersion)); } } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index 4c040798eb0..ca246b3549f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -1534,7 +1534,6 @@ public class StateChangeTest extends FleetControllerTest { public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception { FleetControllerOptions options = defaultOptions(); options.minTimeBetweenNewSystemStates = 10_000; - //options.enableTwoPhaseClusterStateActivation = false; //// RemoteTaskFixture fixture = createFixtureWith(options); // Have to increment timer here to be able to send state generated by the scheduled task diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 98ad19b954a..a7c7845fea9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor; import java.util.stream.Stream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -157,7 +158,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle, Communicator.Waiter waiter) { // Have to patch in that we've actually sent the bundle in the first place... - nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState()); + nodeInfo.setClusterStateVersionBundleSent(stateBundle); var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion()); req.setReply(new ClusterStateVersionSpecificRequest.Reply()); @@ -166,15 +167,22 @@ public class SystemStateBroadcasterTest { private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, ClusterStateBundle stateBundle, + int actualVersion, Communicator.Waiter waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion()); var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion()); - req.setReply(new ClusterStateVersionSpecificRequest.Reply()); + req.setReply(ClusterStateVersionSpecificRequest.Reply.withActualVersion(actualVersion)); waiter.done(req); } + private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter waiter) { + respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter); + } + private static class StateActivationFixture extends Fixture { ClusterStateBundle stateBundle; ClusterFixture cf; @@ -226,8 +234,7 @@ public class SystemStateBroadcasterTest { // No activations should be sent yet cf.cluster().getNodeInfo().forEach(nodeInfo -> { - verify(f.mockCommunicator, times(0)).activateClusterStateVersion( - eq(123), eq(nodeInfo), any()); + verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any()); }); assertNull(f.broadcaster.getLastClusterStateBundleConverged()); @@ -298,4 +305,36 @@ public class SystemStateBroadcasterTest { }); } + @Test + public void activation_convergence_considers_actual_version_returned_from_node() { + var f = StateActivationFixture.withTwoPhaseEnabled(); + var cf = f.cf; + + f.expectSetSystemStateInvocationsToBothDistributors(); + f.simulateBroadcastTick(cf); + // ACK state bundle from both distributors + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); + + f.simulateBroadcastTick(cf); + + final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> { + verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo), + (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture()); + }); + + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)), + f.stateBundle, d0ActivateWaiter.getValue()); + // Distributor 1 reports higher actual version, should not cause this version to be + // considered converged since it's not an exact version match. + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)), + f.stateBundle, 124, d1ActivateWaiter.getValue()); + f.simulateBroadcastTick(cf); + + assertNull(f.broadcaster.getLastClusterStateBundleConverged()); + } + } -- cgit v1.2.3