From fcf30d54d977cf37c12b1ef984d2729fb5dfd843 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 26 Feb 2021 15:23:16 +0100 Subject: Pass current ZK-persisted version directly to broadcast method instead of indirectly --- .../clustercontroller/core/FleetController.java | 10 ++---- .../core/SystemStateBroadcaster.java | 14 ++------ .../core/SystemStateBroadcasterTest.java | 37 ++++++++++------------ 3 files changed, 22 insertions(+), 39 deletions(-) diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 52a17fe88ab..b47846bd321 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -407,10 +407,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd try { database.saveLatestSystemStateVersion(databaseContext, stateBundle.getVersion()); database.saveLatestClusterStateBundle(databaseContext, stateBundle); - // It's possible that due to transient ZK quorum errors, we were not able to actually - // fully store the state. Gate state broadcasting on the actually synchronously ACKed - // stored state instead of what's _hopefully_ stored. - systemStateBroadcaster.setLastClusterStateVersionWrittenToZooKeeper(database.getLastKnownStateBundleVersionWrittenBySelf()); } catch (InterruptedException e) { // Rethrow as RuntimeException to propagate exception up to main thread method. // Don't want to hide failures to write cluster state version. @@ -583,9 +579,6 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd synchronized (monitor) { boolean didWork; didWork = database.doNextZooKeeperTask(databaseContext); - // In case of delayed ZK writes caused by intermittent failures, make sure to tag any newly written versions. - systemStateBroadcaster.setLastClusterStateVersionWrittenToZooKeeper( - database.getLastKnownStateBundleVersionWrittenBySelf()); didWork |= updateMasterElectionState(); didWork |= handleLeadershipEdgeTransitions(); stateChangeHandler.setMaster(isMaster); @@ -702,7 +695,8 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Reset timer to only see warning once. firstAllowedStateBroadcast = currentTime; } - sentAny = systemStateBroadcaster.broadcastNewStateBundleIfRequired(databaseContext, communicator); + sentAny = systemStateBroadcaster.broadcastNewStateBundleIfRequired( + databaseContext, communicator, database.getLastKnownStateBundleVersionWrittenBySelf()); if (sentAny) { // FIXME won't this inhibit resending to unresponsive nodes? nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 9fefd0e6a12..5a1399c8f29 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -28,7 +28,6 @@ public class SystemStateBroadcaster { private int lastStateVersionBundleAcked = 0; private int lastClusterStateVersionConverged = 0; - private int lastClusterStateVersionWrittenToZooKeeper = 0; private ClusterStateBundle lastClusterStateBundleConverged; private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter(); @@ -256,19 +255,12 @@ public class SystemStateBroadcaster { return lastClusterStateVersionConverged == clusterStateBundle.getVersion(); } - public void setLastClusterStateVersionWrittenToZooKeeper(int version) { - lastClusterStateVersionWrittenToZooKeeper = version; - } - - private boolean currentStateVersionHasBeenWrittenToZooKeeper() { + public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator, + int lastClusterStateVersionWrittenToZooKeeper) { if (clusterStateBundle == null) { return false; } - return clusterStateBundle.getVersion() == lastClusterStateVersionWrittenToZooKeeper; - } - - public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) { - if (!currentStateVersionHasBeenWrittenToZooKeeper()) { + if (clusterStateBundle.getVersion() != lastClusterStateVersionWrittenToZooKeeper) { return false; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index c49cc77aabb..bb1c32638a2 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -42,9 +42,9 @@ public class SystemStateBroadcasterTest { cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000); } - void simulateBroadcastTick(ClusterFixture cf) { + void simulateBroadcastTick(ClusterFixture cf, int stateVersion) { broadcaster.processResponses(); - broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); + broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator, stateVersion); try { broadcaster.checkIfClusterStateIsAckedByAllDistributors( mockDatabaseHandler, dbContextFrom(cf.cluster()), mockFleetController); @@ -89,7 +89,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0); cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); } @@ -100,7 +100,7 @@ public class SystemStateBroadcasterTest { ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0); clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { // Only distributor 0 should observe startup timestamps @@ -118,7 +118,7 @@ public class SystemStateBroadcasterTest { StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0); cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); } @@ -132,7 +132,7 @@ public class SystemStateBroadcasterTest { ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 0); clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { // Only distributor 0 should observe startup timestamps @@ -150,8 +150,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("version:100 distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.setLastClusterStateVersionWrittenToZooKeeper(99); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 99); cf.cluster().getNodeInfo().forEach(nodeInfo -> { verify(f.mockCommunicator, times(0)).setSystemState(any(), eq(nodeInfo), any()); @@ -164,8 +163,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("version:100 distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.setLastClusterStateVersionWrittenToZooKeeper(100); - f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator, 100); cf.cluster().getNodeInfo().forEach(nodeInfo -> { verify(f.mockCommunicator, times(1)).setSystemState(any(), eq(nodeInfo), any()); @@ -230,8 +228,7 @@ public class SystemStateBroadcasterTest { .deriveAndBuild(); cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); broadcaster.handleNewClusterStates(stateBundle); - broadcaster.setLastClusterStateVersionWrittenToZooKeeper(stateBundle.getVersion()); - broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator); + broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), mockCommunicator, stateBundle.getVersion()); d0Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class); d1Waiter = ArgumentCaptor.forClass(Communicator.Waiter.class); @@ -248,11 +245,11 @@ public class SystemStateBroadcasterTest { @SuppressWarnings("unchecked") // Type erasure of Waiter in mocked argument capture void ackStateBundleFromBothDistributors() { expectSetSystemStateInvocationsToBothDistributors(); - simulateBroadcastTick(cf); + simulateBroadcastTick(cf, 123); respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), stateBundle, d0Waiter.getValue()); respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), stateBundle, d1Waiter.getValue()); - simulateBroadcastTick(cf); + simulateBroadcastTick(cf, 123); } static StateActivationFixture withTwoPhaseEnabled() { @@ -271,11 +268,11 @@ public class SystemStateBroadcasterTest { var cf = f.cf; f.expectSetSystemStateInvocationsToBothDistributors(); - f.simulateBroadcastTick(cf); + f.simulateBroadcastTick(cf, 123); // Respond from distributor 0, but not yet from distributor 1 respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); - f.simulateBroadcastTick(cf); + f.simulateBroadcastTick(cf, 123); // No activations should be sent yet cf.cluster().getNodeInfo().forEach(nodeInfo -> { @@ -284,7 +281,7 @@ public class SystemStateBroadcasterTest { assertNull(f.broadcaster.getLastClusterStateBundleConverged()); respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); - f.simulateBroadcastTick(cf); + f.simulateBroadcastTick(cf, 123); // Activation should now be sent to _all_ nodes (distributor and storage) cf.cluster().getNodeInfo().forEach(nodeInfo -> { @@ -312,13 +309,13 @@ public class SystemStateBroadcasterTest { respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, d0ActivateWaiter.getValue()); - f.simulateBroadcastTick(cf); + f.simulateBroadcastTick(cf, 123); assertNull(f.broadcaster.getLastClusterStateBundleConverged()); // Not yet converged respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, d1ActivateWaiter.getValue()); - f.simulateBroadcastTick(cf); + f.simulateBroadcastTick(cf, 123); // Finally, all distributors have ACKed the version! State is marked as converged. assertEquals(f.stateBundle, f.broadcaster.getLastClusterStateBundleConverged()); @@ -363,7 +360,7 @@ public class SystemStateBroadcasterTest { // considered converged since it's not an exact version match. respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, 124, d1ActivateWaiter.getValue()); - f.simulateBroadcastTick(cf); + f.simulateBroadcastTick(cf, 123); assertNull(f.broadcaster.getLastClusterStateBundleConverged()); } -- cgit v1.2.3