summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java80
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java22
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java43
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java11
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java8
6 files changed, 164 insertions, 16 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 47ac5259190..ab16d9a1d1d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private long firstAllowedStateBroadcast = Long.MAX_VALUE;
private long tickStartTime = Long.MAX_VALUE;
+ private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>();
+ // Invariant: queued task versions are monotonically increasing with queue position
+ private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>();
+
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
masterElectionHandler.lostDatabaseConnection();
}
+ private void failAllVersionDependentTasks() {
+ tasksPendingStateRecompute.forEach(task -> {
+ task.handleLeadershipLost();
+ task.notifyCompleted();
+ });
+ tasksPendingStateRecompute.clear();
+ taskCompletionQueue.forEach(task -> {
+ task.getTask().handleLeadershipLost();
+ task.getTask().notifyCompleted();
+ });
+ taskCompletionQueue.clear();
+ }
+
/** Called when all distributors have acked newest cluster state version. */
public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException {
Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values());
- stateChangeHandler.handleAllDistributorsInSync(
- stateVersionTracker.getVersionedClusterState(), nodes, database, context);
+ ClusterState currentState = stateVersionTracker.getVersionedClusterState();
+ log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion()));
+ stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context);
}
private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) {
@@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! isRunning()) { return; }
didWork |= processNextQueuedRemoteTask();
+ didWork |= completeSatisfiedVersionDependentTasks();
processingCycle = false;
++cycleCount;
@@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! remoteTasks.isEmpty()) {
final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext();
final RemoteClusterControllerTask task = remoteTasks.poll();
- log.finest("Processing remote task " + task.getClass().getName());
+ log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName()));
task.doRemoteFleetControllerTask(context);
- task.notifyCompleted();
- log.finest("Done processing remote task " + task.getClass().getName());
+ if (!task.hasVersionAckDependency()) {
+ log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName()));
+ task.notifyCompleted();
+ } else {
+ log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName()));
+ tasksPendingStateRecompute.add(task);
+ }
return true;
}
return false;
@@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return context;
}
+ private boolean completeSatisfiedVersionDependentTasks() {
+ int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync();
+ long queueSizeBefore = taskCompletionQueue.size();
+ while (!taskCompletionQueue.isEmpty()) {
+ VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek();
+ if (publishedVersion >= taskCompletion.getMinimumVersion()) {
+ log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing",
+ taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion));
+ taskCompletion.getTask().notifyCompleted();
+ taskCompletionQueue.remove();
+ } else {
+ break;
+ }
+ }
+ return (taskCompletionQueue.size() != queueSizeBefore);
+ }
+
/**
* A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are
* up or down even when the whole cluster is down. The regular, published cluster state is not
@@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
private boolean recomputeClusterStateIfRequired() {
+ boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
stateChangeHandler.unsetStateChangedFlag();
final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
@@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
handleNewSystemState(stateVersionTracker.getVersionedClusterState());
- return true;
+ stateWasChanged = true;
}
}
- return false;
+ scheduleVersionDependentTasksForFutureCompletion();
+ return stateWasChanged;
+ }
+
+ /**
+ * Move tasks that are dependent on the most recently generated state being published into
+ * a completion queue with a dependency on the current version. Once that version
+ * has been ACKed by all distributors in the system, those tasks will be marked as completed.
+ *
+ * This works transparently for tasks that end up changing the current cluster state (i.e.
+ * requiring a new state to be published) and for those whose changes are idempotent. In the
+ * former case the tasks will depend on the version that was generated based upon them. In
+ * the latter case the tasks will depend on the version that is already published (or in the
+ * process of being published).
+ */
+ private void scheduleVersionDependentTasksForFutureCompletion() {
+ for (RemoteClusterControllerTask task : tasksPendingStateRecompute) {
+ log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d",
+ task.getClass().getName(), stateVersionTracker.getCurrentVersion()));
+ taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task));
+ }
+ tasksPendingStateRecompute.clear();
}
private AnnotatedClusterState computeCurrentAnnotatedState() {
@@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis()));
firstAllowedStateBroadcast = Long.MAX_VALUE;
metricUpdater.noLongerMaster();
+ failAllVersionDependentTasks(); // TODO test
}
wantedStateChanged = false;
isMaster = false;
@@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
System.exit(1);
} finally {
running.set(false);
+ failAllVersionDependentTasks(); // TODO test....
synchronized (monitor) { monitor.notifyAll(); }
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
index 4a0949cc997..0e20410ce77 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
@@ -20,6 +20,28 @@ public abstract class RemoteClusterControllerTask {
public abstract void doRemoteFleetControllerTask(Context context);
+ /**
+ * If the task should _not_ be considered complete before a cluster state
+ * version representing the changes made by the task has been ACKed by
+ * all distributors.
+ *
+ * Note that if a task performs an idempotent state change (e.g. setting maintenance
+ * mode on a node already in maintenance mode), the task may be considered complete
+ * immediately if its effective changes have already been ACKed.
+ */
+ public boolean hasVersionAckDependency() { return false; }
+
+ /**
+ * If the task response has been deferred due to hasVersionAckDependency(),
+ * handleLeadershipLost() will be invoked on the task if the cluster controller
+ * discovers it has lost leadership in the time between task execution and
+ * deferred response send time.
+ *
+ * This method will also be invoked if the controller is signalled to shut down
+ * before the dependent cluster version has been published.
+ */
+ public void handleLeadershipLost() {}
+
public boolean isCompleted() {
synchronized (monitor) {
return completed;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 6fac9c9c780..ba53cdee4e8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -91,25 +91,26 @@ public class SystemStateBroadcaster {
{
return false; // No point in sending system state to nodes that can't receive messages or don't want them
}
- if (node.getNewestSystemStateVersionSent() == systemState.getVersion()) {
- return false; // No point in sending if we already have done so
- }
return true;
}
- private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
+ private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) {
return dbContext.getCluster().getNodeInfo().stream()
.filter(this::nodeNeedsClusterState)
.collect(Collectors.toList());
}
+ private boolean newestStateAlreadySentToNode(NodeInfo node) {
+ return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
+ }
+
public boolean broadcastNewState(DatabaseHandler database,
DatabaseHandler.Context dbContext,
Communicator communicator,
FleetController fleetController) throws InterruptedException {
if (systemState == null) return false;
- List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
+ List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext);
if (!systemState.isOfficial()) {
systemState.setOfficial(true);
}
@@ -119,6 +120,9 @@ public class SystemStateBroadcaster {
if (node.isDistributor()) {
anyOutdatedDistributorNodes = true;
}
+ if (newestStateAlreadySentToNode(node)) {
+ continue; // No need to send anything more, but still have to mark node as outdated.
+ }
if (nodeNeedsToObserveStartupTimestamps(node)) {
ClusterState newState = buildModifiedClusterState(dbContext);
log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
@@ -139,6 +143,8 @@ public class SystemStateBroadcaster {
return !recipients.isEmpty();
}
+ public int lastClusterStateVersionInSync() { return lastClusterStateInSync; }
+
private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java
new file mode 100644
index 00000000000..d7b8f96005a
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java
@@ -0,0 +1,43 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import java.util.Objects;
+
+/**
+ * Wrapper for a remote (i.e. REST API) cluster controller task whose
+ * completion depends on side effects by the task becoming visible in
+ * the cluster before a response can be sent. Each such task is associated
+ * with a particular cluster state version number representing a lower bound
+ * on the published state containing the side effect.
+ */
+class VersionDependentTaskCompletion {
+ private final long minimumVersion;
+ private final RemoteClusterControllerTask task;
+
+ VersionDependentTaskCompletion(long minimumVersion, RemoteClusterControllerTask task) {
+ this.minimumVersion = minimumVersion;
+ this.task = task;
+ }
+
+ long getMinimumVersion() {
+ return minimumVersion;
+ }
+
+ RemoteClusterControllerTask getTask() {
+ return task;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VersionDependentTaskCompletion that = (VersionDependentTaskCompletion) o;
+ return minimumVersion == that.minimumVersion &&
+ Objects.equals(task, that.task);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(minimumVersion, task);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
index 84c7c655a11..cb260e7de2a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
@@ -12,6 +12,8 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
NEED_NOT_BE_MASTER
}
+ // TODO a lot of this logic could be replaced with a CompleteableFuture
+
private Exception failure = null;
private boolean resultSet = false;
private Result result = null;
@@ -49,9 +51,7 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
}
result = calculateResult(context);
resultSet = true;
- } catch (OtherMasterIndexException e) {
- failure = e;
- } catch (StateRestApiException e) {
+ } catch (OtherMasterIndexException | StateRestApiException e) {
failure = e;
} catch (Exception e) {
failure = new InternalFailure("Caught unexpected exception");
@@ -59,5 +59,10 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
}
}
+ @Override
+ public void handleLeadershipLost() {
+ failure = new UnknownMasterException("Leadership lost before request could complete");
+ }
+
public abstract Result calculateResult(Context context) throws StateRestApiException, OtherMasterIndexException;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
index 47741a5f704..c56ce5da352 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
@@ -10,7 +10,6 @@ import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.NodeStateChangeChecker;
import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTask;
import com.yahoo.vespa.clustercontroller.core.ContentCluster;
-import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import com.yahoo.vespa.clustercontroller.core.restapiv2.Id;
import com.yahoo.vespa.clustercontroller.core.restapiv2.MissingIdException;
@@ -29,6 +28,7 @@ public class SetNodeStateRequest extends Request<SetResponse> {
private final Id.Node id;
private final Map<String, UnitState> newStates;
private final SetUnitStateRequest.Condition condition;
+ private final SetUnitStateRequest.ResponseWait responseWait;
public SetNodeStateRequest(Id.Node id, SetUnitStateRequest setUnitStateRequest) {
@@ -36,6 +36,7 @@ public class SetNodeStateRequest extends Request<SetResponse> {
this.id = id;
this.newStates = setUnitStateRequest.getNewState();
this.condition = setUnitStateRequest.getCondition();
+ this.responseWait = setUnitStateRequest.getResponseWait();
}
@Override
@@ -64,6 +65,11 @@ public class SetNodeStateRequest extends Request<SetResponse> {
return new NodeState(n.getType(), state).setDescription(newState.getReason());
}
+ @Override
+ public boolean hasVersionAckDependency() {
+ return (this.responseWait == SetUnitStateRequest.ResponseWait.WAIT_UNTIL_CLUSTER_ACKED);
+ }
+
static SetResponse setWantedState(
ContentCluster cluster,
SetUnitStateRequest.Condition condition,