diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-26 12:51:28 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-26 12:51:28 +0100 |
commit | 239a3c7977af8a2fdb17d66cbf986a400a75617a (patch) | |
tree | de3f37047ca465e0d7e2b892c8ff7e87de85a13c /clustercontroller-core/src/test/java/com | |
parent | e4eb8366f7143562afff346441c787325dd1119c (diff) |
Address code review feedback for cluster controller changes
Diffstat (limited to 'clustercontroller-core/src/test/java/com')
2 files changed, 30 insertions, 33 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index d8994ff52df..bd68f0fa343 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -307,6 +307,10 @@ public class DummyVdsNode { public ClusterStateBundle getClusterStateBundle() { synchronized(timer) { + // In a two-phase state activation scenario, bundles are added to `clusterStateBundles` _before_ + // the version has been activated. Since we want this method to only return _activated_ bundles + // we filter out versions that are not yet activated. In a non two-phase scenario the activated + // version is implicitly the same as the most recently received bundle, so the filter is a no-op. return clusterStateBundles.stream() .filter(b -> b.getVersion() <= activatedClusterStateVersion) .findFirst() // Most recent cluster state bundle first in list diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index b18a57d2e85..f99df6a25b2 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -49,7 +49,7 @@ public class SystemStateBroadcasterTest { } catch (Exception e) { throw new RuntimeException(e); } - broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); // nope! + broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); } } @@ -142,21 +142,21 @@ public class SystemStateBroadcasterTest { verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } - private class MockSetClusterStateRequest extends SetClusterStateRequest { + private static class MockSetClusterStateRequest extends SetClusterStateRequest { public MockSetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) { super(nodeInfo, clusterStateVersion); } } - private class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { + private static class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { public MockActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) { super(nodeInfo, systemStateVersion); } } - private void respondToSetClusterStateBundle(NodeInfo nodeInfo, - ClusterStateBundle stateBundle, - Communicator.Waiter<SetClusterStateRequest> waiter) { + private static void respondToSetClusterStateBundle(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<SetClusterStateRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionBundleSent(stateBundle); @@ -165,10 +165,10 @@ public class SystemStateBroadcasterTest { waiter.done(req); } - private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, - ClusterStateBundle stateBundle, - int actualVersion, - Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + private static void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + int actualVersion, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion()); @@ -177,9 +177,9 @@ public class SystemStateBroadcasterTest { waiter.done(req); } - private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, - ClusterStateBundle stateBundle, - Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + private static void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter); } @@ -214,6 +214,16 @@ public class SystemStateBroadcasterTest { }); } + @SuppressWarnings("unchecked") // Type erasure of Waiter in mocked argument capture + void ackStateBundleFromBothDistributors() { + expectSetSystemStateInvocationsToBothDistributors(); + simulateBroadcastTick(cf); + + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), stateBundle, d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), stateBundle, d1Waiter.getValue()); + simulateBroadcastTick(cf); + } + static StateActivationFixture withTwoPhaseEnabled() { return new StateActivationFixture(true); } @@ -259,13 +269,7 @@ public class SystemStateBroadcasterTest { var f = StateActivationFixture.withTwoPhaseEnabled(); var cf = f.cf; - f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); - // ACK state bundle from both distributors - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - - f.simulateBroadcastTick(cf); + f.ackStateBundleFromBothDistributors(); final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); @@ -295,12 +299,7 @@ public class SystemStateBroadcasterTest { var f = StateActivationFixture.withTwoPhaseDisabled(); var cf = f.cf; - f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); - // ACK state bundle from both distributors - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - f.simulateBroadcastTick(cf); + f.ackStateBundleFromBothDistributors(); // At this point the cluster state shall be considered converged. assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged()); @@ -317,13 +316,7 @@ public class SystemStateBroadcasterTest { var f = StateActivationFixture.withTwoPhaseEnabled(); var cf = f.cf; - f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); - // ACK state bundle from both distributors - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - - f.simulateBroadcastTick(cf); + f.ackStateBundleFromBothDistributors(); final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); |