aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-26 12:51:28 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-26 12:51:28 +0100
commit239a3c7977af8a2fdb17d66cbf986a400a75617a (patch)
treede3f37047ca465e0d7e2b892c8ff7e87de85a13c /clustercontroller-core
parente4eb8366f7143562afff346441c787325dd1119c (diff)
Address code review feedback for cluster controller changes
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java15
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java4
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java59
3 files changed, 42 insertions, 36 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 7898e90f77a..5ecb57a1c76 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -100,8 +100,13 @@ public class SystemStateBroadcaster {
"implicitly treating state %d as activated on node", info, version));
}
} else if (reply.getActualVersion() != version) {
- log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " +
- "reports it has an actual pending version of %d", version, info, reply.getActualVersion()));
+ boolean nodeOk = nodeReportsSelfAsAvailable(info);
+ // Avoid spamming the logs since this will happen on all resends until (presumably) the controller
+ // loses election status.
+ // TODO this should trigger a loss of current controller's leadership!
+ reportNodeError(nodeOk, info, String.format("Activation of version %d did not take effect, node %s " +
+ "reports it has an actual pending version of %d. Racing with another controller?",
+ version, info, reply.getActualVersion()));
success = false;
} else {
log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " +
@@ -114,6 +119,10 @@ public class SystemStateBroadcaster {
activateClusterStateVersionReplies.clear();
}
+ private static boolean nodeReportsSelfAsAvailable(NodeInfo info) {
+ return info.getReportedState().getState().oneOf("uir");
+ }
+
private void processSetClusterStateResponses() {
for (SetClusterStateRequest req : setClusterStateReplies) {
NodeInfo info = req.getNodeInfo();
@@ -123,7 +132,7 @@ public class SystemStateBroadcaster {
info.setClusterStateBundleVersionAcknowledged(version, false);
if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
if (info.getNewestSystemStateVersionSent() == version) {
- boolean nodeOk = info.getReportedState().getState().oneOf("uir");
+ boolean nodeOk = nodeReportsSelfAsAvailable(info);
reportNodeError(nodeOk, info,
String.format("Got error response %d: %s from %s setdistributionstates request.",
req.getReply().getReturnCode(), req.getReply().getReturnMessage(), info));
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index d8994ff52df..bd68f0fa343 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -307,6 +307,10 @@ public class DummyVdsNode {
public ClusterStateBundle getClusterStateBundle() {
synchronized(timer) {
+ // In a two-phase state activation scenario, bundles are added to `clusterStateBundles` _before_
+ // the version has been activated. Since we want this method to only return _activated_ bundles
+ // we filter out versions that are not yet activated. In a non two-phase scenario the activated
+ // version is implicitly the same as the most recently received bundle, so the filter is a no-op.
return clusterStateBundles.stream()
.filter(b -> b.getVersion() <= activatedClusterStateVersion)
.findFirst() // Most recent cluster state bundle first in list
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index b18a57d2e85..f99df6a25b2 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -49,7 +49,7 @@ public class SystemStateBroadcasterTest {
} catch (Exception e) {
throw new RuntimeException(e);
}
- broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); // nope!
+ broadcaster.broadcastStateActivationsIfRequired(dbContextFrom(cf.cluster()), mockCommunicator);
}
}
@@ -142,21 +142,21 @@ public class SystemStateBroadcasterTest {
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
- private class MockSetClusterStateRequest extends SetClusterStateRequest {
+ private static class MockSetClusterStateRequest extends SetClusterStateRequest {
public MockSetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) {
super(nodeInfo, clusterStateVersion);
}
}
- private class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
+ private static class MockActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
public MockActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) {
super(nodeInfo, systemStateVersion);
}
}
- private void respondToSetClusterStateBundle(NodeInfo nodeInfo,
- ClusterStateBundle stateBundle,
- Communicator.Waiter<SetClusterStateRequest> waiter) {
+ private static void respondToSetClusterStateBundle(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<SetClusterStateRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
nodeInfo.setClusterStateVersionBundleSent(stateBundle);
@@ -165,10 +165,10 @@ public class SystemStateBroadcasterTest {
waiter.done(req);
}
- private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
- ClusterStateBundle stateBundle,
- int actualVersion,
- Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
+ private static void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ int actualVersion,
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
// Have to patch in that we've actually sent the bundle in the first place...
nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion());
@@ -177,9 +177,9 @@ public class SystemStateBroadcasterTest {
waiter.done(req);
}
- private void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
- ClusterStateBundle stateBundle,
- Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
+ private static void respondToActivateClusterStateVersion(NodeInfo nodeInfo,
+ ClusterStateBundle stateBundle,
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter);
}
@@ -214,6 +214,16 @@ public class SystemStateBroadcasterTest {
});
}
+ @SuppressWarnings("unchecked") // Type erasure of Waiter in mocked argument capture
+ void ackStateBundleFromBothDistributors() {
+ expectSetSystemStateInvocationsToBothDistributors();
+ simulateBroadcastTick(cf);
+
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), stateBundle, d0Waiter.getValue());
+ respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), stateBundle, d1Waiter.getValue());
+ simulateBroadcastTick(cf);
+ }
+
static StateActivationFixture withTwoPhaseEnabled() {
return new StateActivationFixture(true);
}
@@ -259,13 +269,7 @@ public class SystemStateBroadcasterTest {
var f = StateActivationFixture.withTwoPhaseEnabled();
var cf = f.cf;
- f.expectSetSystemStateInvocationsToBothDistributors();
- f.simulateBroadcastTick(cf);
- // ACK state bundle from both distributors
- respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
- respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
-
- f.simulateBroadcastTick(cf);
+ f.ackStateBundleFromBothDistributors();
final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
@@ -295,12 +299,7 @@ public class SystemStateBroadcasterTest {
var f = StateActivationFixture.withTwoPhaseDisabled();
var cf = f.cf;
- f.expectSetSystemStateInvocationsToBothDistributors();
- f.simulateBroadcastTick(cf);
- // ACK state bundle from both distributors
- respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
- respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
- f.simulateBroadcastTick(cf);
+ f.ackStateBundleFromBothDistributors();
// At this point the cluster state shall be considered converged.
assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged());
@@ -317,13 +316,7 @@ public class SystemStateBroadcasterTest {
var f = StateActivationFixture.withTwoPhaseEnabled();
var cf = f.cf;
- f.expectSetSystemStateInvocationsToBothDistributors();
- f.simulateBroadcastTick(cf);
- // ACK state bundle from both distributors
- respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
- respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
-
- f.simulateBroadcastTick(cf);
+ f.ackStateBundleFromBothDistributors();
final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class);