summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-02-26 15:23:16 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-02-26 15:23:16 +0100
commitfcf30d54d977cf37c12b1ef984d2729fb5dfd843 (patch)
treeac9b6cb42036d6bd84ca516d0e806b79d8cf2905
parent20627141c1f57c0c199193b07abce885508152f3 (diff)
Pass current ZK-persisted version directly to broadcast method instead of indirectly
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java10
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java14
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java37
3 files changed, 22 insertions, 39 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 52a17fe88ab..b47846bd321 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -407,10 +407,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
try {
database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion());
database.saveLatestClusterStateBundle(databaseContext, stateBundle);
- // It's possible that due to transient ZK quorum errors, we were not able to actually
- // fully store the state. Gate state broadcasting on the actually synchronously ACKed
- // stored state instead of what's _hopefully_ stored.
- systemStateBroadcaster.setLastClusterStateVersionWrittenToZooKeeper(database.getLastKnownStateBundleVersionWrittenBySelf());
} catch (InterruptedException e) {
// Rethrow as RuntimeException to propagate exception up to main thread method.
// Don't want to hide failures to write cluster state version.
@@ -583,9 +579,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
synchronized (monitor) {
boolean didWork;
didWork = database.doNextZooKeeperTask(databaseContext);
- // In case of delayed ZK writes caused by intermittent failures, make sure to tag any newly written versions.
- systemStateBroadcaster.setLastClusterStateVersionWrittenToZooKeeper(
- database.getLastKnownStateBundleVersionWrittenBySelf());
didWork |= updateMasterElectionState();
didWork |= handleLeadershipEdgeTransitions();
stateChangeHandler.setMaster(isMaster);
@@ -702,7 +695,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Reset timer to only see warning once.
firstAllowedStateBroadcast = currentTime;
}
- sentAny = systemStateBroadcaster.broadcastNewStateBundleIfRequired(databaseContext, communicator);
+ sentAny = systemStateBroadcaster.broadcastNewStateBundleIfRequired(
+ databaseContext, communicator, database.getLastKnownStateBundleVersionWrittenBySelf());
if (sentAny) {
// FIXME won't this inhibit resending to unresponsive nodes?
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 9fefd0e6a12..5a1399c8f29 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -28,7 +28,6 @@ public class SystemStateBroadcaster {
private int lastStateVersionBundleAcked = 0;
private int lastClusterStateVersionConverged = 0;
- private int lastClusterStateVersionWrittenToZooKeeper = 0;
private ClusterStateBundle lastClusterStateBundleConverged;
private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter();
@@ -256,19 +255,12 @@ public class SystemStateBroadcaster {
return lastClusterStateVersionConverged == clusterStateBundle.getVersion();
}
- public void setLastClusterStateVersionWrittenToZooKeeper(int version) {
- lastClusterStateVersionWrittenToZooKeeper = version;
- }
-
- private boolean currentStateVersionHasBeenWrittenToZooKeeper() {
+ public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator,
+ int lastClusterStateVersionWrittenToZooKeeper) {
if (clusterStateBundle == null) {
return false;
}
- return clusterStateBundle.getVersion() == lastClusterStateVersionWrittenToZooKeeper;
- }
-
- public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) {
- if (!currentStateVersionHasBeenWrittenToZooKeeper()) {
+ if (clusterStateBundle.getVersion() != lastClusterStateVersionWrittenToZooKeeper) {
return false;
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index c49cc77aabb..bb1c32638a2 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -42,9 +42,9 @@ public class SystemStateBroadcasterTest {
cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000);
}
- void simulateBroadcastTick(ClusterFixture cf) {
+ void simulateBroadcastTick(ClusterFixture cf, int stateVersion) {
broadcaster.processResponses();
- broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator);
+ broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator, stateVersion);
try {
broadcaster.checkIfClusterStateIsAckedByAllDistributors(
mockDatabaseHandler, dbContextFrom(cf.cluster()), mockFleetController);
@@ -89,7 +89,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0);
cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
}
@@ -100,7 +100,7 @@ public class SystemStateBroadcasterTest {
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0);
clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
// Only distributor 0 should observe startup timestamps
@@ -118,7 +118,7 @@ public class SystemStateBroadcasterTest {
StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0);
cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
}
@@ -132,7 +132,7 @@ public class SystemStateBroadcasterTest {
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0);
clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
// Only distributor 0 should observe startup timestamps
@@ -150,8 +150,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("version:100 distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.setLastClusterStateVersionWrittenToZooKeeper(99);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 99);
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
verify(f.mockCommunicator, times(0)).setSystemState(any(), eq(nodeInfo), any());
@@ -164,8 +163,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("version:100 distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.setLastClusterStateVersionWrittenToZooKeeper(100);
- f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 100);
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
verify(f.mockCommunicator, times(1)).setSystemState(any(), eq(nodeInfo), any());
@@ -230,8 +228,7 @@ public class SystemStateBroadcasterTest {
.deriveAndBuild();
cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
broadcaster.handleNewClusterStates(stateBundle);
- broadcaster.setLastClusterStateVersionWrittenToZooKeeper(stateBundle.getVersion());
- broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator);
+ broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator, stateBundle.getVersion());
d0Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
d1Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class);
@@ -248,11 +245,11 @@ public class SystemStateBroadcasterTest {
@SuppressWarnings("unchecked") // Type erasure of Waiter in mocked argument capture
void ackStateBundleFromBothDistributors() {
expectSetSystemStateInvocationsToBothDistributors();
- simulateBroadcastTick(cf);
+ simulateBroadcastTick(cf, 123);
respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), stateBundle, d0Waiter.getValue());
respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), stateBundle, d1Waiter.getValue());
- simulateBroadcastTick(cf);
+ simulateBroadcastTick(cf, 123);
}
static StateActivationFixture withTwoPhaseEnabled() {
@@ -271,11 +268,11 @@ public class SystemStateBroadcasterTest {
var cf = f.cf;
f.expectSetSystemStateInvocationsToBothDistributors();
- f.simulateBroadcastTick(cf);
+ f.simulateBroadcastTick(cf, 123);
// Respond from distributor 0, but not yet from distributor 1
respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue());
- f.simulateBroadcastTick(cf);
+ f.simulateBroadcastTick(cf, 123);
// No activations should be sent yet
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
@@ -284,7 +281,7 @@ public class SystemStateBroadcasterTest {
assertNull(f.broadcaster.getLastClusterStateBundleConverged());
respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue());
- f.simulateBroadcastTick(cf);
+ f.simulateBroadcastTick(cf, 123);
// Activation should now be sent to _all_ nodes (distributor and storage)
cf.cluster().getNodeInfo().forEach(nodeInfo -> {
@@ -312,13 +309,13 @@ public class SystemStateBroadcasterTest {
respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)),
f.stateBundle, d0ActivateWaiter.getValue());
- f.simulateBroadcastTick(cf);
+ f.simulateBroadcastTick(cf, 123);
assertNull(f.broadcaster.getLastClusterStateBundleConverged()); // Not yet converged
respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)),
f.stateBundle, d1ActivateWaiter.getValue());
- f.simulateBroadcastTick(cf);
+ f.simulateBroadcastTick(cf, 123);
// Finally, all distributors have ACKed the version! State is marked as converged.
assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged());
@@ -363,7 +360,7 @@ public class SystemStateBroadcasterTest {
// considered converged since it's not an exact version match.
respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)),
f.stateBundle, 124, d1ActivateWaiter.getValue());
- f.simulateBroadcastTick(cf);
+ f.simulateBroadcastTick(cf, 123);
assertNull(f.broadcaster.getLastClusterStateBundleConverged());
}