diff options
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java')
-rw-r--r-- | clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java | 80 |
1 files changed, 73 insertions, 7 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 47ac5259190..ab16d9a1d1d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import java.io.FileNotFoundException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private long firstAllowedStateBroadcast = Long.MAX_VALUE; private long tickStartTime = Long.MAX_VALUE; + private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>(); + // Invariant: queued task versions are monotonically increasing with queue position + private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>(); + private final RunDataExtractor dataExtractor = new RunDataExtractor() { @Override public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); } @@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd masterElectionHandler.lostDatabaseConnection(); } + private void failAllVersionDependentTasks() { + tasksPendingStateRecompute.forEach(task -> { + task.handleLeadershipLost(); + task.notifyCompleted(); + }); + tasksPendingStateRecompute.clear(); + taskCompletionQueue.forEach(task -> { + task.getTask().handleLeadershipLost(); + task.getTask().notifyCompleted(); + }); + taskCompletionQueue.clear(); + } + /** Called when all distributors have acked newest cluster state version. */ public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); - stateChangeHandler.handleAllDistributorsInSync( - stateVersionTracker.getVersionedClusterState(), nodes, database, context); + ClusterState currentState = stateVersionTracker.getVersionedClusterState(); + log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion())); + stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context); } private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) { @@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! isRunning()) { return; } didWork |= processNextQueuedRemoteTask(); + didWork |= completeSatisfiedVersionDependentTasks(); processingCycle = false; ++cycleCount; @@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! remoteTasks.isEmpty()) { final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); final RemoteClusterControllerTask task = remoteTasks.poll(); - log.finest("Processing remote task " + task.getClass().getName()); + log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName())); task.doRemoteFleetControllerTask(context); - task.notifyCompleted(); - log.finest("Done processing remote task " + task.getClass().getName()); + if (!task.hasVersionAckDependency()) { + log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); + task.notifyCompleted(); + } else { + log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); + tasksPendingStateRecompute.add(task); + } return true; } return false; @@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return context; } + private boolean completeSatisfiedVersionDependentTasks() { + int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync(); + long queueSizeBefore = taskCompletionQueue.size(); + while (!taskCompletionQueue.isEmpty()) { + VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek(); + if (publishedVersion >= taskCompletion.getMinimumVersion()) { + log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", + taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); + taskCompletion.getTask().notifyCompleted(); + taskCompletionQueue.remove(); + } else { + break; + } + } + return (taskCompletionQueue.size() != queueSizeBefore); + } + /** * A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are * up or down even when the whole cluster is down. The regular, published cluster state is not @@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } private boolean recomputeClusterStateIfRequired() { + boolean stateWasChanged = false; if (mustRecomputeCandidateClusterState()) { stateChangeHandler.unsetStateChangedFlag(); final AnnotatedClusterState candidate = computeCurrentAnnotatedState(); @@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs); handleNewSystemState(stateVersionTracker.getVersionedClusterState()); - return true; + stateWasChanged = true; } } - return false; + scheduleVersionDependentTasksForFutureCompletion(); + return stateWasChanged; + } + + /** + * Move tasks that are dependent on the most recently generated state being published into + * a completion queue with a dependency on the current version. Once that version + * has been ACKed by all distributors in the system, those tasks will be marked as completed. + * + * This works transparently for tasks that end up changing the current cluster state (i.e. + * requiring a new state to be published) and for those whose changes are idempotent. In the + * former case the tasks will depend on the version that was generated based upon them. In + * the latter case the tasks will depend on the version that is already published (or in the + * process of being published). + */ + private void scheduleVersionDependentTasksForFutureCompletion() { + for (RemoteClusterControllerTask task : tasksPendingStateRecompute) { + log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d", + task.getClass().getName(), stateVersionTracker.getCurrentVersion())); + taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task)); + } + tasksPendingStateRecompute.clear(); } private AnnotatedClusterState computeCurrentAnnotatedState() { @@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis())); firstAllowedStateBroadcast = Long.MAX_VALUE; metricUpdater.noLongerMaster(); + failAllVersionDependentTasks(); // TODO test } wantedStateChanged = false; isMaster = false; @@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd System.exit(1); } finally { running.set(false); + failAllVersionDependentTasks(); // TODO test.... synchronized (monitor) { monitor.notifyAll(); } } } |