diff options
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java')
-rw-r--r-- | clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index ba53cdee4e8..8860bef2fae 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -94,9 +94,10 @@ public class SystemStateBroadcaster { return true; } - private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterState) + .filter(node -> !newestStateAlreadySentToNode(node)) .collect(Collectors.toList()); } @@ -104,25 +105,44 @@ public class SystemStateBroadcaster { return (node.getNewestSystemStateVersionSent() == systemState.getVersion()); } + /** + * Checks if all distributor nodes have ACKed the most recent cluster state. Iff this + * is the case, triggers handleAllDistributorsInSync() on the provided FleetController + * object and updates the broadcaster's last known in-sync cluster state version. + * + * Returns true if distributor nodes were checked, false if cluster is already in sync + * or no state has been published yet. + */ + boolean checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, + DatabaseHandler.Context dbContext, + FleetController fleetController) throws InterruptedException { + if ((systemState == null) || (lastClusterStateInSync == systemState.getVersion())) { + return false; // Nothing to do for the current state + } + boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream() + .filter(NodeInfo::isDistributor) + .anyMatch(this::nodeNeedsClusterState); + + if (!anyOutdatedDistributorNodes && (systemState.getVersion() > lastClusterStateInSync)) { + log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); + lastClusterStateInSync = systemState.getVersion(); + fleetController.handleAllDistributorsInSync(database, dbContext); + } + return true; + } + public boolean broadcastNewState(DatabaseHandler database, DatabaseHandler.Context dbContext, Communicator communicator, FleetController fleetController) throws InterruptedException { if (systemState == null) return false; - List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext); if (!systemState.isOfficial()) { systemState.setOfficial(true); } - boolean anyOutdatedDistributorNodes = false; + List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext); for (NodeInfo node : recipients) { - if (node.isDistributor()) { - anyOutdatedDistributorNodes = true; - } - if (newestStateAlreadySentToNode(node)) { - continue; // No need to send anything more, but still have to mark node as outdated. - } if (nodeNeedsToObserveStartupTimestamps(node)) { ClusterState newState = buildModifiedClusterState(dbContext); log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion() @@ -135,11 +155,6 @@ public class SystemStateBroadcaster { } } - if (!anyOutdatedDistributorNodes && systemState.getVersion() > lastClusterStateInSync) { - log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); - lastClusterStateInSync = systemState.getVersion(); - fleetController.handleAllDistributorsInSync(database, dbContext); - } return !recipients.isEmpty(); } |