summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java43
1 files changed, 29 insertions, 14 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index ba53cdee4e8..8860bef2fae 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -94,9 +94,10 @@ public class SystemStateBroadcaster {
return true;
}
- private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) {
+ private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
return dbContext.getCluster().getNodeInfo().stream()
.filter(this::nodeNeedsClusterState)
+ .filter(node -> !newestStateAlreadySentToNode(node))
.collect(Collectors.toList());
}
@@ -104,25 +105,44 @@ public class SystemStateBroadcaster {
return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
}
+ /**
+ * Checks if all distributor nodes have ACKed the most recent cluster state. Iff this
+ * is the case, triggers handleAllDistributorsInSync() on the provided FleetController
+ * object and updates the broadcaster's last known in-sync cluster state version.
+ *
+ * Returns true if distributor nodes were checked, false if cluster is already in sync
+ * or no state has been published yet.
+ */
+ boolean checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
+ DatabaseHandler.Context dbContext,
+ FleetController fleetController) throws InterruptedException {
+ if ((systemState == null) || (lastClusterStateInSync == systemState.getVersion())) {
+ return false; // Nothing to do for the current state
+ }
+ boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
+ .filter(NodeInfo::isDistributor)
+ .anyMatch(this::nodeNeedsClusterState);
+
+ if (!anyOutdatedDistributorNodes && (systemState.getVersion() > lastClusterStateInSync)) {
+ log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
+ lastClusterStateInSync = systemState.getVersion();
+ fleetController.handleAllDistributorsInSync(database, dbContext);
+ }
+ return true;
+ }
+
public boolean broadcastNewState(DatabaseHandler database,
DatabaseHandler.Context dbContext,
Communicator communicator,
FleetController fleetController) throws InterruptedException {
if (systemState == null) return false;
- List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext);
if (!systemState.isOfficial()) {
systemState.setOfficial(true);
}
- boolean anyOutdatedDistributorNodes = false;
+ List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
for (NodeInfo node : recipients) {
- if (node.isDistributor()) {
- anyOutdatedDistributorNodes = true;
- }
- if (newestStateAlreadySentToNode(node)) {
- continue; // No need to send anything more, but still have to mark node as outdated.
- }
if (nodeNeedsToObserveStartupTimestamps(node)) {
ClusterState newState = buildModifiedClusterState(dbContext);
log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
@@ -135,11 +155,6 @@ public class SystemStateBroadcaster {
}
}
- if (!anyOutdatedDistributorNodes && systemState.getVersion() > lastClusterStateInSync) {
- log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
- lastClusterStateInSync = systemState.getVersion();
- fleetController.handleAllDistributorsInSync(database, dbContext);
- }
return !recipients.isEmpty();
}