From 0293a4fed6d1d6293e907e6a2c5bf38ecb48775c Mon Sep 17 00:00:00 2001 From: Harald Musum Date: Fri, 26 May 2023 14:16:55 +0200 Subject: Cosmetic changes --- .../clustercontroller/core/FleetController.java | 4 +-- .../core/SystemStateBroadcaster.java | 30 ++++++++++++---------- 2 files changed, 19 insertions(+), 15 deletions(-) (limited to 'clustercontroller-core/src/main/java') diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 6876ac8cf56..966f6db8deb 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -1003,7 +1003,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta private boolean atFirstClusterStateSendTimeEdge() { // We only care about triggering a state recomputation for the master, which is the only // one allowed to actually broadcast any states. - if (!isMaster || systemStateBroadcaster.hasBroadcastedClusterStateBundle()) { + if (!isMaster || systemStateBroadcaster.hasBroadcastClusterStateBundle()) { return false; } return hasPassedFirstStateBroadcastTimePoint(timer.getCurrentTimeInMillis()); @@ -1021,7 +1021,7 @@ public class FleetController implements NodeListener, SlobrokListener, SystemSta if ( ! isMaster) { // If we just became master, restore state from ZooKeeper stateChangeHandler.setStateChangedFlag(); - systemStateBroadcaster.resetBroadcastedClusterStateBundle(); + systemStateBroadcaster.resetBroadcastClusterStateBundle(); stateVersionTracker.setVersionRetrievedFromZooKeeper(database.getLatestSystemStateVersion()); ClusterStateBundle previousBundle = database.getLatestClusterStateBundle(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index cc121a8b120..9e8e7a69c6e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -15,6 +15,8 @@ import java.util.Map; import java.util.TreeMap; import java.util.logging.Logger; +import static com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler.DatabaseContext; + public class SystemStateBroadcaster { private static final Logger log = Logger.getLogger(SystemStateBroadcaster.class.getName()); @@ -51,11 +53,11 @@ public class SystemStateBroadcaster { return clusterStateBundle.getBaselineClusterState(); } - public boolean hasBroadcastedClusterStateBundle() { + public boolean hasBroadcastClusterStateBundle() { return clusterStateBundle != null; } - public void resetBroadcastedClusterStateBundle() { + public void resetBroadcastClusterStateBundle() { clusterStateBundle = null; } @@ -186,7 +188,7 @@ public class SystemStateBroadcaster { return nodeIsReachable(node); } - private List resolveStateVersionSendSet(DatabaseHandler.DatabaseContext dbContext) { + private List resolveStateVersionSendSet(DatabaseContext dbContext) { return dbContext.getCluster().getNodeInfos().stream() .filter(this::nodeNeedsClusterStateBundle) .filter(node -> !newestStateBundleAlreadySentToNode(node)) @@ -194,7 +196,7 @@ public class SystemStateBroadcaster { } // Precondition: no nodes in the cluster need to receive the current cluster state version bundle - private List resolveStateActivationSendSet(DatabaseHandler.DatabaseContext dbContext) { + private List resolveStateActivationSendSet(DatabaseContext dbContext) { return dbContext.getCluster().getNodeInfos().stream() .filter(this::nodeNeedsClusterStateActivation) .filter(node -> !newestStateActivationAlreadySentToNode(node)) @@ -215,7 +217,7 @@ public class SystemStateBroadcaster { * object and updates the broadcaster's last known in-sync cluster state version. */ void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, - DatabaseHandler.DatabaseContext dbContext, + DatabaseContext dbContext, FleetController fleetController) throws InterruptedException { if ((clusterStateBundle == null) || currentClusterStateIsConverged()) { return; // Nothing to do for the current state @@ -260,7 +262,7 @@ public class SystemStateBroadcaster { lastStateVersionBundleAcked = clusterStateBundle.getVersion(); } - private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.DatabaseContext dbContext, FleetController fleetController) { + private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseContext dbContext, FleetController fleetController) { context.log(log, Level.FINE, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); lastClusterStateVersionConverged = clusterStateBundle.getVersion(); lastClusterStateBundleConverged = clusterStateBundle; @@ -279,7 +281,8 @@ public class SystemStateBroadcaster { lastOfficialStateVersion = clusterStateBundle.getVersion(); } - public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator, + public boolean broadcastNewStateBundleIfRequired(DatabaseContext dbContext, + Communicator communicator, int lastClusterStateVersionWrittenToZooKeeper) { if (clusterStateBundle == null || clusterStateBundle.getVersion() == 0) { return false; @@ -296,7 +299,7 @@ public class SystemStateBroadcaster { } List recipients = resolveStateVersionSendSet(dbContext); - ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext)); + var modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext)); for (NodeInfo node : recipients) { if (nodeNeedsToObserveStartupTimestamps(node)) { context.log(log, @@ -317,8 +320,9 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } - public boolean broadcastStateActivationsIfRequired(DatabaseHandler.DatabaseContext dbContext, Communicator communicator) { - if (clusterStateBundle == null || clusterStateBundle.getVersion() == 0 || !currentBundleVersionIsTaggedOfficial()) { + public boolean broadcastStateActivationsIfRequired(DatabaseContext dbContext, Communicator communicator) { + int clusterStateBundleVersion = clusterStateBundle.getVersion(); + if (clusterStateBundle == null || clusterStateBundleVersion == 0 || !currentBundleVersionIsTaggedOfficial()) { return false; } @@ -331,8 +335,8 @@ public class SystemStateBroadcaster { context.log(log, Level.FINE, () -> "Sending cluster state activation to node " + node + " for version " + - clusterStateBundle.getVersion()); - communicator.activateClusterStateVersion(clusterStateBundle.getVersion(), node, activateClusterStateVersionWaiter); + clusterStateBundleVersion); + communicator.activateClusterStateVersion(clusterStateBundleVersion, node, activateClusterStateVersionWaiter); } return !recipients.isEmpty(); @@ -348,7 +352,7 @@ public class SystemStateBroadcaster { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); } - private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseHandler.DatabaseContext dbContext) { + private static ClusterState buildModifiedClusterState(ClusterState sourceState, DatabaseContext dbContext) { ClusterState newState = sourceState.clone(); for (NodeInfo n : dbContext.getCluster().getNodeInfos()) { NodeState ns = newState.getNodeState(n.getNode()); -- cgit v1.2.3