summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java80
1 files changed, 73 insertions, 7 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 47ac5259190..ab16d9a1d1d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private long firstAllowedStateBroadcast = Long.MAX_VALUE;
private long tickStartTime = Long.MAX_VALUE;
+ private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>();
+ // Invariant: queued task versions are monotonically increasing with queue position
+ private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>();
+
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
masterElectionHandler.lostDatabaseConnection();
}
+ private void failAllVersionDependentTasks() {
+ tasksPendingStateRecompute.forEach(task -> {
+ task.handleLeadershipLost();
+ task.notifyCompleted();
+ });
+ tasksPendingStateRecompute.clear();
+ taskCompletionQueue.forEach(task -> {
+ task.getTask().handleLeadershipLost();
+ task.getTask().notifyCompleted();
+ });
+ taskCompletionQueue.clear();
+ }
+
/** Called when all distributors have acked newest cluster state version. */
public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException {
Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values());
- stateChangeHandler.handleAllDistributorsInSync(
- stateVersionTracker.getVersionedClusterState(), nodes, database, context);
+ ClusterState currentState = stateVersionTracker.getVersionedClusterState();
+ log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion()));
+ stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context);
}
private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) {
@@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! isRunning()) { return; }
didWork |= processNextQueuedRemoteTask();
+ didWork |= completeSatisfiedVersionDependentTasks();
processingCycle = false;
++cycleCount;
@@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! remoteTasks.isEmpty()) {
final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext();
final RemoteClusterControllerTask task = remoteTasks.poll();
- log.finest("Processing remote task " + task.getClass().getName());
+ log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName()));
task.doRemoteFleetControllerTask(context);
- task.notifyCompleted();
- log.finest("Done processing remote task " + task.getClass().getName());
+ if (!task.hasVersionAckDependency()) {
+ log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName()));
+ task.notifyCompleted();
+ } else {
+ log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName()));
+ tasksPendingStateRecompute.add(task);
+ }
return true;
}
return false;
@@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return context;
}
+ private boolean completeSatisfiedVersionDependentTasks() {
+ int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync();
+ long queueSizeBefore = taskCompletionQueue.size();
+ while (!taskCompletionQueue.isEmpty()) {
+ VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek();
+ if (publishedVersion >= taskCompletion.getMinimumVersion()) {
+ log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing",
+ taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion));
+ taskCompletion.getTask().notifyCompleted();
+ taskCompletionQueue.remove();
+ } else {
+ break;
+ }
+ }
+ return (taskCompletionQueue.size() != queueSizeBefore);
+ }
+
/**
* A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are
* up or down even when the whole cluster is down. The regular, published cluster state is not
@@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
private boolean recomputeClusterStateIfRequired() {
+ boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
stateChangeHandler.unsetStateChangedFlag();
final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
@@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
handleNewSystemState(stateVersionTracker.getVersionedClusterState());
- return true;
+ stateWasChanged = true;
}
}
- return false;
+ scheduleVersionDependentTasksForFutureCompletion();
+ return stateWasChanged;
+ }
+
+ /**
+ * Move tasks that are dependent on the most recently generated state being published into
+ * a completion queue with a dependency on the current version. Once that version
+ * has been ACKed by all distributors in the system, those tasks will be marked as completed.
+ *
+ * This works transparently for tasks that end up changing the current cluster state (i.e.
+ * requiring a new state to be published) and for those whose changes are idempotent. In the
+ * former case the tasks will depend on the version that was generated based upon them. In
+ * the latter case the tasks will depend on the version that is already published (or in the
+ * process of being published).
+ */
+ private void scheduleVersionDependentTasksForFutureCompletion() {
+ for (RemoteClusterControllerTask task : tasksPendingStateRecompute) {
+ log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d",
+ task.getClass().getName(), stateVersionTracker.getCurrentVersion()));
+ taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task));
+ }
+ tasksPendingStateRecompute.clear();
}
private AnnotatedClusterState computeCurrentAnnotatedState() {
@@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis()));
firstAllowedStateBroadcast = Long.MAX_VALUE;
metricUpdater.noLongerMaster();
+ failAllVersionDependentTasks(); // TODO test
}
wantedStateChanged = false;
isMaster = false;
@@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
System.exit(1);
} finally {
running.set(false);
+ failAllVersionDependentTasks(); // TODO test....
synchronized (monitor) { monitor.notifyAll(); }
}
}