aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-12 15:09:43 +0200
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-12 15:09:43 +0200
commitf9fa1cc6d50aedb47cd80cb8ded9c34fe338532f (patch)
treeb0cf74a84c41e17106072ba30777bbe394ede5db /clustercontroller-core
parent96d3f5543a9ec7926a30411e8ab0dcb34cc56eb8 (diff)
Break node version ACK check out into separately called logic
Removes dependency on having to invoke broadcastNewState before being able to observe that all distributors are in sync. Invocations of broadcastNewState are gated by a grace period between each time, so unless this is done we get artificial delays before a synchronous task can be considered complete.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java1
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java43
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java19
3 files changed, 49 insertions, 14 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 31fa5635c42..69e3a9f2886 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -550,6 +550,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! isRunning()) { return; }
if (masterElectionHandler.isMaster()) {
didWork |= broadcastClusterStateToEligibleNodes();
+ didWork |= systemStateBroadcaster.checkIfClusterStateIsAckedByAllDistributors(database, databaseContext, this);
}
if ( ! isRunning()) { return; }
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index ba53cdee4e8..8860bef2fae 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -94,9 +94,10 @@ public class SystemStateBroadcaster {
return true;
}
- private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) {
+ private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
return dbContext.getCluster().getNodeInfo().stream()
.filter(this::nodeNeedsClusterState)
+ .filter(node -> !newestStateAlreadySentToNode(node))
.collect(Collectors.toList());
}
@@ -104,25 +105,44 @@ public class SystemStateBroadcaster {
return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
}
+ /**
+ * Checks if all distributor nodes have ACKed the most recent cluster state. Iff this
+ * is the case, triggers handleAllDistributorsInSync() on the provided FleetController
+ * object and updates the broadcaster's last known in-sync cluster state version.
+ *
+ * Returns true if distributor nodes were checked, false if cluster is already in sync
+ * or no state has been published yet.
+ */
+ boolean checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
+ DatabaseHandler.Context dbContext,
+ FleetController fleetController) throws InterruptedException {
+ if ((systemState == null) || (lastClusterStateInSync == systemState.getVersion())) {
+ return false; // Nothing to do for the current state
+ }
+ boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
+ .filter(NodeInfo::isDistributor)
+ .anyMatch(this::nodeNeedsClusterState);
+
+ if (!anyOutdatedDistributorNodes && (systemState.getVersion() > lastClusterStateInSync)) {
+ log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
+ lastClusterStateInSync = systemState.getVersion();
+ fleetController.handleAllDistributorsInSync(database, dbContext);
+ }
+ return true;
+ }
+
public boolean broadcastNewState(DatabaseHandler database,
DatabaseHandler.Context dbContext,
Communicator communicator,
FleetController fleetController) throws InterruptedException {
if (systemState == null) return false;
- List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext);
if (!systemState.isOfficial()) {
systemState.setOfficial(true);
}
- boolean anyOutdatedDistributorNodes = false;
+ List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
for (NodeInfo node : recipients) {
- if (node.isDistributor()) {
- anyOutdatedDistributorNodes = true;
- }
- if (newestStateAlreadySentToNode(node)) {
- continue; // No need to send anything more, but still have to mark node as outdated.
- }
if (nodeNeedsToObserveStartupTimestamps(node)) {
ClusterState newState = buildModifiedClusterState(dbContext);
log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
@@ -135,11 +155,6 @@ public class SystemStateBroadcaster {
}
}
- if (!anyOutdatedDistributorNodes && systemState.getVersion() > lastClusterStateInSync) {
- log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
- lastClusterStateInSync = systemState.getVersion();
- fleetController.handleAllDistributorsInSync(database, dbContext);
- }
return !recipients.isEmpty();
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index a452a85c870..e4898d82314 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -1489,4 +1489,23 @@ public class StateChangeTest extends FleetControllerTest {
assertTrue(task.isLeadershipLost());
}
+ @Test
+ public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception {
+ FleetControllerOptions options = defaultOptions();
+ options.minTimeBetweenNewSystemStates = 10_000;
+ RemoteTaskFixture fixture = createFixtureWith(options);
+ // Have to increment timer here to be able to send state generated by the scheduled task
+ timer.advanceTime(10_000);
+
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+ fixture.processScheduledTask();
+ assertFalse(task.isCompleted()); // Not yet acked by all nodes
+ // If tracking whether ACKs are received from the cluster is dependent on the system state
+ // send grace period, we won't observe that the task may be completed even though all nodes
+ // have ACKed. Would then have to increment timer by 10s and do another tick.
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted());
+ }
+
}