diff options
Diffstat (limited to 'clustercontroller-core/src/main')
6 files changed, 164 insertions, 16 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 47ac5259190..ab16d9a1d1d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import java.io.FileNotFoundException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private long firstAllowedStateBroadcast = Long.MAX_VALUE; private long tickStartTime = Long.MAX_VALUE; + private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>(); + // Invariant: queued task versions are monotonically increasing with queue position + private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>(); + private final RunDataExtractor dataExtractor = new RunDataExtractor() { @Override public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); } @@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd masterElectionHandler.lostDatabaseConnection(); } + private void failAllVersionDependentTasks() { + tasksPendingStateRecompute.forEach(task -> { + task.handleLeadershipLost(); + task.notifyCompleted(); + }); + tasksPendingStateRecompute.clear(); + taskCompletionQueue.forEach(task -> { + task.getTask().handleLeadershipLost(); + task.getTask().notifyCompleted(); + }); + taskCompletionQueue.clear(); + } + /** Called when all distributors have acked newest cluster state version. */ public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); - stateChangeHandler.handleAllDistributorsInSync( - stateVersionTracker.getVersionedClusterState(), nodes, database, context); + ClusterState currentState = stateVersionTracker.getVersionedClusterState(); + log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion())); + stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context); } private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) { @@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! isRunning()) { return; } didWork |= processNextQueuedRemoteTask(); + didWork |= completeSatisfiedVersionDependentTasks(); processingCycle = false; ++cycleCount; @@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! remoteTasks.isEmpty()) { final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); final RemoteClusterControllerTask task = remoteTasks.poll(); - log.finest("Processing remote task " + task.getClass().getName()); + log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName())); task.doRemoteFleetControllerTask(context); - task.notifyCompleted(); - log.finest("Done processing remote task " + task.getClass().getName()); + if (!task.hasVersionAckDependency()) { + log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); + task.notifyCompleted(); + } else { + log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); + tasksPendingStateRecompute.add(task); + } return true; } return false; @@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return context; } + private boolean completeSatisfiedVersionDependentTasks() { + int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync(); + long queueSizeBefore = taskCompletionQueue.size(); + while (!taskCompletionQueue.isEmpty()) { + VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek(); + if (publishedVersion >= taskCompletion.getMinimumVersion()) { + log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", + taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); + taskCompletion.getTask().notifyCompleted(); + taskCompletionQueue.remove(); + } else { + break; + } + } + return (taskCompletionQueue.size() != queueSizeBefore); + } + /** * A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are * up or down even when the whole cluster is down. The regular, published cluster state is not @@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } private boolean recomputeClusterStateIfRequired() { + boolean stateWasChanged = false; if (mustRecomputeCandidateClusterState()) { stateChangeHandler.unsetStateChangedFlag(); final AnnotatedClusterState candidate = computeCurrentAnnotatedState(); @@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs); handleNewSystemState(stateVersionTracker.getVersionedClusterState()); - return true; + stateWasChanged = true; } } - return false; + scheduleVersionDependentTasksForFutureCompletion(); + return stateWasChanged; + } + + /** + * Move tasks that are dependent on the most recently generated state being published into + * a completion queue with a dependency on the current version. Once that version + * has been ACKed by all distributors in the system, those tasks will be marked as completed. + * + * This works transparently for tasks that end up changing the current cluster state (i.e. + * requiring a new state to be published) and for those whose changes are idempotent. In the + * former case the tasks will depend on the version that was generated based upon them. In + * the latter case the tasks will depend on the version that is already published (or in the + * process of being published). + */ + private void scheduleVersionDependentTasksForFutureCompletion() { + for (RemoteClusterControllerTask task : tasksPendingStateRecompute) { + log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d", + task.getClass().getName(), stateVersionTracker.getCurrentVersion())); + taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task)); + } + tasksPendingStateRecompute.clear(); } private AnnotatedClusterState computeCurrentAnnotatedState() { @@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis())); firstAllowedStateBroadcast = Long.MAX_VALUE; metricUpdater.noLongerMaster(); + failAllVersionDependentTasks(); // TODO test } wantedStateChanged = false; isMaster = false; @@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd System.exit(1); } finally { running.set(false); + failAllVersionDependentTasks(); // TODO test.... synchronized (monitor) { monitor.notifyAll(); } } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java index 4a0949cc997..0e20410ce77 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java @@ -20,6 +20,28 @@ public abstract class RemoteClusterControllerTask { public abstract void doRemoteFleetControllerTask(Context context); + /** + * If the task should _not_ be considered complete before a cluster state + * version representing the changes made by the task has been ACKed by + * all distributors. + * + * Note that if a task performs an idempotent state change (e.g. setting maintenance + * mode on a node already in maintenance mode), the task may be considered complete + * immediately if its effective changes have already been ACKed. + */ + public boolean hasVersionAckDependency() { return false; } + + /** + * If the task response has been deferred due to hasVersionAckDependency(), + * handleLeadershipLost() will be invoked on the task if the cluster controller + * discovers it has lost leadership in the time between task execution and + * deferred response send time. + * + * This method will also be invoked if the controller is signalled to shut down + * before the dependent cluster version has been published. + */ + public void handleLeadershipLost() {} + public boolean isCompleted() { synchronized (monitor) { return completed; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 6fac9c9c780..ba53cdee4e8 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -91,25 +91,26 @@ public class SystemStateBroadcaster { { return false; // No point in sending system state to nodes that can't receive messages or don't want them } - if (node.getNewestSystemStateVersionSent() == systemState.getVersion()) { - return false; // No point in sending if we already have done so - } return true; } - private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterState) .collect(Collectors.toList()); } + private boolean newestStateAlreadySentToNode(NodeInfo node) { + return (node.getNewestSystemStateVersionSent() == systemState.getVersion()); + } + public boolean broadcastNewState(DatabaseHandler database, DatabaseHandler.Context dbContext, Communicator communicator, FleetController fleetController) throws InterruptedException { if (systemState == null) return false; - List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext); + List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext); if (!systemState.isOfficial()) { systemState.setOfficial(true); } @@ -119,6 +120,9 @@ public class SystemStateBroadcaster { if (node.isDistributor()) { anyOutdatedDistributorNodes = true; } + if (newestStateAlreadySentToNode(node)) { + continue; // No need to send anything more, but still have to mark node as outdated. + } if (nodeNeedsToObserveStartupTimestamps(node)) { ClusterState newState = buildModifiedClusterState(dbContext); log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion() @@ -139,6 +143,8 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } + public int lastClusterStateVersionInSync() { return lastClusterStateInSync; } + private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java new file mode 100644 index 00000000000..d7b8f96005a --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java @@ -0,0 +1,43 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.Objects; + +/** + * Wrapper for a remote (i.e. REST API) cluster controller task whose + * completion depends on side effects by the task becoming visible in + * the cluster before a response can be sent. Each such task is associated + * with a particular cluster state version number representing a lower bound + * on the published state containing the side effect. + */ +class VersionDependentTaskCompletion { + private final long minimumVersion; + private final RemoteClusterControllerTask task; + + VersionDependentTaskCompletion(long minimumVersion, RemoteClusterControllerTask task) { + this.minimumVersion = minimumVersion; + this.task = task; + } + + long getMinimumVersion() { + return minimumVersion; + } + + RemoteClusterControllerTask getTask() { + return task; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VersionDependentTaskCompletion that = (VersionDependentTaskCompletion) o; + return minimumVersion == that.minimumVersion && + Objects.equals(task, that.task); + } + + @Override + public int hashCode() { + return Objects.hash(minimumVersion, task); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java index 84c7c655a11..cb260e7de2a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java @@ -12,6 +12,8 @@ public abstract class Request<Result> extends RemoteClusterControllerTask { NEED_NOT_BE_MASTER } + // TODO a lot of this logic could be replaced with a CompleteableFuture + private Exception failure = null; private boolean resultSet = false; private Result result = null; @@ -49,9 +51,7 @@ public abstract class Request<Result> extends RemoteClusterControllerTask { } result = calculateResult(context); resultSet = true; - } catch (OtherMasterIndexException e) { - failure = e; - } catch (StateRestApiException e) { + } catch (OtherMasterIndexException | StateRestApiException e) { failure = e; } catch (Exception e) { failure = new InternalFailure("Caught unexpected exception"); @@ -59,5 +59,10 @@ public abstract class Request<Result> extends RemoteClusterControllerTask { } } + @Override + public void handleLeadershipLost() { + failure = new UnknownMasterException("Leadership lost before request could complete"); + } + public abstract Result calculateResult(Context context) throws StateRestApiException, OtherMasterIndexException; } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java index 47741a5f704..c56ce5da352 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java @@ -10,7 +10,6 @@ import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.NodeStateChangeChecker; import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTask; import com.yahoo.vespa.clustercontroller.core.ContentCluster; -import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; import com.yahoo.vespa.clustercontroller.core.restapiv2.Id; import com.yahoo.vespa.clustercontroller.core.restapiv2.MissingIdException; @@ -29,6 +28,7 @@ public class SetNodeStateRequest extends Request<SetResponse> { private final Id.Node id; private final Map<String, UnitState> newStates; private final SetUnitStateRequest.Condition condition; + private final SetUnitStateRequest.ResponseWait responseWait; public SetNodeStateRequest(Id.Node id, SetUnitStateRequest setUnitStateRequest) { @@ -36,6 +36,7 @@ public class SetNodeStateRequest extends Request<SetResponse> { this.id = id; this.newStates = setUnitStateRequest.getNewState(); this.condition = setUnitStateRequest.getCondition(); + this.responseWait = setUnitStateRequest.getResponseWait(); } @Override @@ -64,6 +65,11 @@ public class SetNodeStateRequest extends Request<SetResponse> { return new NodeState(n.getType(), state).setDescription(newState.getReason()); } + @Override + public boolean hasVersionAckDependency() { + return (this.responseWait == SetUnitStateRequest.ResponseWait.WAIT_UNTIL_CLUSTER_ACKED); + } + static SetResponse setWantedState( ContentCluster cluster, SetUnitStateRequest.Condition condition, |