diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-26 12:51:28 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-26 12:51:28 +0100 |
commit | 239a3c7977af8a2fdb17d66cbf986a400a75617a (patch) | |
tree | de3f37047ca465e0d7e2b892c8ff7e87de85a13c /clustercontroller-core | |
parent | e4eb8366f7143562afff346441c787325dd1119c (diff) |
Address code review feedback for cluster controller changes
Diffstat (limited to 'clustercontroller-core')
3 files changed, 42 insertions, 36 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 7898e90f77a..5ecb57a1c76 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -100,8 +100,13 @@ public class SystemStateBroadcaster { "implicitly treating state %d as activated on node", info, version)); } } else if (reply.getActualVersion() != version) { - log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " + - "reports it has an actual pending version of %d", version, info, reply.getActualVersion())); + boolean nodeOk = nodeReportsSelfAsAvailable(info); + // Avoid spamming the logs since this will happen on all resends until (presumably) the controller + // loses election status. + // TODO this should trigger a loss of current controller's leadership! + reportNodeError(nodeOk, info, String.format("Activation of version %d did not take effect, node %s " + + "reports it has an actual pending version of %d. Racing with another controller?", + version, info, reply.getActualVersion())); success = false; } else { log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " + @@ -114,6 +119,10 @@ public class SystemStateBroadcaster { activateClusterStateVersionReplies.clear(); } + private static boolean nodeReportsSelfAsAvailable(NodeInfo info) { + return info.getReportedState().getState().oneOf("uir"); + } + private void processSetClusterStateResponses() { for (SetClusterStateRequest req : setClusterStateReplies) { NodeInfo info = req.getNodeInfo(); @@ -123,7 +132,7 @@ public class SystemStateBroadcaster { info.setClusterStateBundleVersionAcknowledged(version, false); if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) { if (info.getNewestSystemStateVersionSent() == version) { - boolean nodeOk = info.getReportedState().getState().oneOf("uir"); + boolean nodeOk = nodeReportsSelfAsAvailable(info); reportNodeError(nodeOk, info, String.format("Got error response %d: %s from %s setdistributionstates request.", req.getReply().getReturnCode(), req.getReply().getReturnMessage(), info)); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index d8994ff52df..bd68f0fa343 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -307,6 +307,10 @@ public class DummyVdsNode { public ClusterStateBundle getClusterStateBundle() { synchronized(timer) { + // In a two-phase state activation scenario, bundles are added to `clusterStateBundles` _before_ + // the version has been activated. Since we want this method to only return _activated_ bundles + // we filter out versions that are not yet activated. In a non two-phase scenario the activated + // version is implicitly the same as the most recently received bundle, so the filter is a no-op. return clusterStateBundles.stream() .filter(b -> b.getVersion() <= activatedClusterStateVersion) .findFirst() // Most recent cluster state bundle first in list diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index b18a57d2e85..f99df6a25b2 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -49,7 +49,7 @@ public class SystemStateBroadcasterTest { } catch (Exception e) { throw new RuntimeException(e); } - broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); // nope! + broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); } } @@ -142,21 +142,21 @@ public class SystemStateBroadcasterTest { verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } - private class MockSetClusterStateRequest extends SetClusterStateRequest { + private static class MockSetClusterStateRequest extends SetClusterStateRequest { public MockSetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) { super(nodeInfo, clusterStateVersion); } } - private class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { + private static class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { public MockActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) { super(nodeInfo, systemStateVersion); } } - private void respondToSetClusterStateBundle(NodeInfo nodeInfo, - ClusterStateBundle stateBundle, - Communicator.Waiter<SetClusterStateRequest> waiter) { + private static void respondToSetClusterStateBundle(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<SetClusterStateRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionBundleSent(stateBundle); @@ -165,10 +165,10 @@ public class SystemStateBroadcasterTest { waiter.done(req); } - private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, - ClusterStateBundle stateBundle, - int actualVersion, - Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + private static void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + int actualVersion, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion()); @@ -177,9 +177,9 @@ public class SystemStateBroadcasterTest { waiter.done(req); } - private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, - ClusterStateBundle stateBundle, - Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + private static void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter); } @@ -214,6 +214,16 @@ public class SystemStateBroadcasterTest { }); } + @SuppressWarnings("unchecked") // Type erasure of Waiter in mocked argument capture + void ackStateBundleFromBothDistributors() { + expectSetSystemStateInvocationsToBothDistributors(); + simulateBroadcastTick(cf); + + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), stateBundle, d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), stateBundle, d1Waiter.getValue()); + simulateBroadcastTick(cf); + } + static StateActivationFixture withTwoPhaseEnabled() { return new StateActivationFixture(true); } @@ -259,13 +269,7 @@ public class SystemStateBroadcasterTest { var f = StateActivationFixture.withTwoPhaseEnabled(); var cf = f.cf; - f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); - // ACK state bundle from both distributors - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - - f.simulateBroadcastTick(cf); + f.ackStateBundleFromBothDistributors(); final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); @@ -295,12 +299,7 @@ public class SystemStateBroadcasterTest { var f = StateActivationFixture.withTwoPhaseDisabled(); var cf = f.cf; - f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); - // ACK state bundle from both distributors - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - f.simulateBroadcastTick(cf); + f.ackStateBundleFromBothDistributors(); // At this point the cluster state shall be considered converged. assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged()); @@ -317,13 +316,7 @@ public class SystemStateBroadcasterTest { var f = StateActivationFixture.withTwoPhaseEnabled(); var cf = f.cf; - f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); - // ACK state bundle from both distributors - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - - f.simulateBroadcastTick(cf); + f.ackStateBundleFromBothDistributors(); final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); |