summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-11 14:24:51 +0200
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-11 14:25:18 +0200
commita4cb51b28f43420db61a2459737c7585a227ee54 (patch)
treea83d2df85feae5bb748553feee42206a49d69173 /clustercontroller-core
parent0c8f9bdd5a8d6ce35e1a8adba174c51dc8583a92 (diff)
Add support for version ACK-dependent tasks in cluster controller
Used to enable synchronous operation for set-node-state calls, which ensure that side-effects of the call are visible when the response returns. If controller leadership is lost before state is published, tasks will be failed back to the client.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java80
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java22
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java43
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java11
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java29
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java179
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java56
9 files changed, 426 insertions, 18 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 47ac5259190..ab16d9a1d1d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private long firstAllowedStateBroadcast = Long.MAX_VALUE;
private long tickStartTime = Long.MAX_VALUE;
+ private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>();
+ // Invariant: queued task versions are monotonically increasing with queue position
+ private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>();
+
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
masterElectionHandler.lostDatabaseConnection();
}
+ private void failAllVersionDependentTasks() {
+ tasksPendingStateRecompute.forEach(task -> {
+ task.handleLeadershipLost();
+ task.notifyCompleted();
+ });
+ tasksPendingStateRecompute.clear();
+ taskCompletionQueue.forEach(task -> {
+ task.getTask().handleLeadershipLost();
+ task.getTask().notifyCompleted();
+ });
+ taskCompletionQueue.clear();
+ }
+
/** Called when all distributors have acked newest cluster state version. */
public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException {
Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values());
- stateChangeHandler.handleAllDistributorsInSync(
- stateVersionTracker.getVersionedClusterState(), nodes, database, context);
+ ClusterState currentState = stateVersionTracker.getVersionedClusterState();
+ log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion()));
+ stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context);
}
private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) {
@@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! isRunning()) { return; }
didWork |= processNextQueuedRemoteTask();
+ didWork |= completeSatisfiedVersionDependentTasks();
processingCycle = false;
++cycleCount;
@@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! remoteTasks.isEmpty()) {
final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext();
final RemoteClusterControllerTask task = remoteTasks.poll();
- log.finest("Processing remote task " + task.getClass().getName());
+ log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName()));
task.doRemoteFleetControllerTask(context);
- task.notifyCompleted();
- log.finest("Done processing remote task " + task.getClass().getName());
+ if (!task.hasVersionAckDependency()) {
+ log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName()));
+ task.notifyCompleted();
+ } else {
+ log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName()));
+ tasksPendingStateRecompute.add(task);
+ }
return true;
}
return false;
@@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return context;
}
+ private boolean completeSatisfiedVersionDependentTasks() {
+ int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync();
+ long queueSizeBefore = taskCompletionQueue.size();
+ while (!taskCompletionQueue.isEmpty()) {
+ VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek();
+ if (publishedVersion >= taskCompletion.getMinimumVersion()) {
+ log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing",
+ taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion));
+ taskCompletion.getTask().notifyCompleted();
+ taskCompletionQueue.remove();
+ } else {
+ break;
+ }
+ }
+ return (taskCompletionQueue.size() != queueSizeBefore);
+ }
+
/**
* A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are
* up or down even when the whole cluster is down. The regular, published cluster state is not
@@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
private boolean recomputeClusterStateIfRequired() {
+ boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
stateChangeHandler.unsetStateChangedFlag();
final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
@@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
handleNewSystemState(stateVersionTracker.getVersionedClusterState());
- return true;
+ stateWasChanged = true;
}
}
- return false;
+ scheduleVersionDependentTasksForFutureCompletion();
+ return stateWasChanged;
+ }
+
+ /**
+ * Move tasks that are dependent on the most recently generated state being published into
+ * a completion queue with a dependency on the current version. Once that version
+ * has been ACKed by all distributors in the system, those tasks will be marked as completed.
+ *
+ * This works transparently for tasks that end up changing the current cluster state (i.e.
+ * requiring a new state to be published) and for those whose changes are idempotent. In the
+ * former case the tasks will depend on the version that was generated based upon them. In
+ * the latter case the tasks will depend on the version that is already published (or in the
+ * process of being published).
+ */
+ private void scheduleVersionDependentTasksForFutureCompletion() {
+ for (RemoteClusterControllerTask task : tasksPendingStateRecompute) {
+ log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d",
+ task.getClass().getName(), stateVersionTracker.getCurrentVersion()));
+ taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task));
+ }
+ tasksPendingStateRecompute.clear();
}
private AnnotatedClusterState computeCurrentAnnotatedState() {
@@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis()));
firstAllowedStateBroadcast = Long.MAX_VALUE;
metricUpdater.noLongerMaster();
+ failAllVersionDependentTasks(); // TODO test
}
wantedStateChanged = false;
isMaster = false;
@@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
System.exit(1);
} finally {
running.set(false);
+ failAllVersionDependentTasks(); // TODO test....
synchronized (monitor) { monitor.notifyAll(); }
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
index 4a0949cc997..0e20410ce77 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
@@ -20,6 +20,28 @@ public abstract class RemoteClusterControllerTask {
public abstract void doRemoteFleetControllerTask(Context context);
+ /**
+ * If the task should _not_ be considered complete before a cluster state
+ * version representing the changes made by the task has been ACKed by
+ * all distributors.
+ *
+ * Note that if a task performs an idempotent state change (e.g. setting maintenance
+ * mode on a node already in maintenance mode), the task may be considered complete
+ * immediately if its effective changes have already been ACKed.
+ */
+ public boolean hasVersionAckDependency() { return false; }
+
+ /**
+ * If the task response has been deferred due to hasVersionAckDependency(),
+ * handleLeadershipLost() will be invoked on the task if the cluster controller
+ * discovers it has lost leadership in the time between task execution and
+ * deferred response send time.
+ *
+ * This method will also be invoked if the controller is signalled to shut down
+ * before the dependent cluster version has been published.
+ */
+ public void handleLeadershipLost() {}
+
public boolean isCompleted() {
synchronized (monitor) {
return completed;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 6fac9c9c780..ba53cdee4e8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -91,25 +91,26 @@ public class SystemStateBroadcaster {
{
return false; // No point in sending system state to nodes that can't receive messages or don't want them
}
- if (node.getNewestSystemStateVersionSent() == systemState.getVersion()) {
- return false; // No point in sending if we already have done so
- }
return true;
}
- private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
+ private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) {
return dbContext.getCluster().getNodeInfo().stream()
.filter(this::nodeNeedsClusterState)
.collect(Collectors.toList());
}
+ private boolean newestStateAlreadySentToNode(NodeInfo node) {
+ return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
+ }
+
public boolean broadcastNewState(DatabaseHandler database,
DatabaseHandler.Context dbContext,
Communicator communicator,
FleetController fleetController) throws InterruptedException {
if (systemState == null) return false;
- List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
+ List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext);
if (!systemState.isOfficial()) {
systemState.setOfficial(true);
}
@@ -119,6 +120,9 @@ public class SystemStateBroadcaster {
if (node.isDistributor()) {
anyOutdatedDistributorNodes = true;
}
+ if (newestStateAlreadySentToNode(node)) {
+ continue; // No need to send anything more, but still have to mark node as outdated.
+ }
if (nodeNeedsToObserveStartupTimestamps(node)) {
ClusterState newState = buildModifiedClusterState(dbContext);
log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
@@ -139,6 +143,8 @@ public class SystemStateBroadcaster {
return !recipients.isEmpty();
}
+ public int lastClusterStateVersionInSync() { return lastClusterStateInSync; }
+
private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java
new file mode 100644
index 00000000000..d7b8f96005a
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java
@@ -0,0 +1,43 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import java.util.Objects;
+
+/**
+ * Wrapper for a remote (i.e. REST API) cluster controller task whose
+ * completion depends on side effects by the task becoming visible in
+ * the cluster before a response can be sent. Each such task is associated
+ * with a particular cluster state version number representing a lower bound
+ * on the published state containing the side effect.
+ */
+class VersionDependentTaskCompletion {
+ private final long minimumVersion;
+ private final RemoteClusterControllerTask task;
+
+ VersionDependentTaskCompletion(long minimumVersion, RemoteClusterControllerTask task) {
+ this.minimumVersion = minimumVersion;
+ this.task = task;
+ }
+
+ long getMinimumVersion() {
+ return minimumVersion;
+ }
+
+ RemoteClusterControllerTask getTask() {
+ return task;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VersionDependentTaskCompletion that = (VersionDependentTaskCompletion) o;
+ return minimumVersion == that.minimumVersion &&
+ Objects.equals(task, that.task);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(minimumVersion, task);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
index 84c7c655a11..cb260e7de2a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
@@ -12,6 +12,8 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
NEED_NOT_BE_MASTER
}
+ // TODO a lot of this logic could be replaced with a CompleteableFuture
+
private Exception failure = null;
private boolean resultSet = false;
private Result result = null;
@@ -49,9 +51,7 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
}
result = calculateResult(context);
resultSet = true;
- } catch (OtherMasterIndexException e) {
- failure = e;
- } catch (StateRestApiException e) {
+ } catch (OtherMasterIndexException | StateRestApiException e) {
failure = e;
} catch (Exception e) {
failure = new InternalFailure("Caught unexpected exception");
@@ -59,5 +59,10 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
}
}
+ @Override
+ public void handleLeadershipLost() {
+ failure = new UnknownMasterException("Leadership lost before request could complete");
+ }
+
public abstract Result calculateResult(Context context) throws StateRestApiException, OtherMasterIndexException;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
index 47741a5f704..c56ce5da352 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
@@ -10,7 +10,6 @@ import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.NodeStateChangeChecker;
import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTask;
import com.yahoo.vespa.clustercontroller.core.ContentCluster;
-import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import com.yahoo.vespa.clustercontroller.core.restapiv2.Id;
import com.yahoo.vespa.clustercontroller.core.restapiv2.MissingIdException;
@@ -29,6 +28,7 @@ public class SetNodeStateRequest extends Request<SetResponse> {
private final Id.Node id;
private final Map<String, UnitState> newStates;
private final SetUnitStateRequest.Condition condition;
+ private final SetUnitStateRequest.ResponseWait responseWait;
public SetNodeStateRequest(Id.Node id, SetUnitStateRequest setUnitStateRequest) {
@@ -36,6 +36,7 @@ public class SetNodeStateRequest extends Request<SetResponse> {
this.id = id;
this.newStates = setUnitStateRequest.getNewState();
this.condition = setUnitStateRequest.getCondition();
+ this.responseWait = setUnitStateRequest.getResponseWait();
}
@Override
@@ -64,6 +65,11 @@ public class SetNodeStateRequest extends Request<SetResponse> {
return new NodeState(n.getType(), state).setDescription(newState.getReason());
}
+ @Override
+ public boolean hasVersionAckDependency() {
+ return (this.responseWait == SetUnitStateRequest.ResponseWait.WAIT_UNTIL_CLUSTER_ACKED);
+ }
+
static SetResponse setWantedState(
ContentCluster cluster,
SetUnitStateRequest.Condition condition,
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index 8148ad69488..c500b4c7390 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.collections.Pair;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -15,6 +17,12 @@ public class DummyCommunicator implements Communicator, NodeLookup {
List<Node> newNodes;
Timer timer;
+ private boolean shouldDeferDistributorClusterStateAcks = false;
+ private List<Pair<Waiter<SetClusterStateRequest>, DummySetClusterStateRequest>> deferredClusterStateAcks = new ArrayList<>();
+
+ public void setShouldDeferDistributorClusterStateAcks(boolean shouldDeferDistributorClusterStateAcks) {
+ this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks;
+ }
public class DummyGetNodeStateRequest extends GetNodeStateRequest {
Waiter<GetNodeStateRequest> waiter;
@@ -82,7 +90,26 @@ public class DummyCommunicator implements Communicator, NodeLookup {
DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, state);
node.setSystemStateVersionSent(state);
req.setReply(new SetClusterStateRequest.Reply());
- waiter.done(req);
+ if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
+ waiter.done(req);
+ } else {
+ deferredClusterStateAcks.add(new Pair<>(waiter, req));
+ }
+ }
+
+ public void sendAllDeferredDistributorClusterStateAcks() {
+ deferredClusterStateAcks.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond()));
+ deferredClusterStateAcks.clear();
+ }
+
+ public void sendPartialDeferredDistributorClusterStateAcks() {
+ if (deferredClusterStateAcks.isEmpty()) {
+ throw new IllegalStateException("Tried to send partial ACKs with no ACKs deferred");
+ }
+ List<Pair<Waiter<SetClusterStateRequest>, DummySetClusterStateRequest>> toAck =
+ deferredClusterStateAcks.subList(0, deferredClusterStateAcks.size() - 1);
+ toAck.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond()));
+ deferredClusterStateAcks.removeAll(toAck);
}
@Override
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index ea0b409c2b4..28d1095f629 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -13,6 +13,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.*;
@@ -78,7 +79,7 @@ public class StateChangeTest extends FleetControllerTest {
}
- private List<ConfiguredNode> createNodes(int count) {
+ private static List<ConfiguredNode> createNodes(int count) {
List<ConfiguredNode> nodes = new ArrayList<>();
for (int i = 0; i < count; i++)
nodes.add(new ConfiguredNode(i, false));
@@ -1239,4 +1240,180 @@ public class StateChangeTest extends FleetControllerTest {
}
+ private static abstract class MockTask extends RemoteClusterControllerTask {
+ boolean invoked = false;
+
+ boolean isInvoked() { return invoked; }
+
+ @Override
+ public boolean hasVersionAckDependency() { return true; }
+ }
+
+ // We create an explicit mock task class instead of using mock() simply because of
+ // the utter pain that mocking void functions (doRemoteFleetControllerTask()) is
+ // when using Mockito.
+ private static class MockSynchronousTask extends MockTask {
+ @Override
+ public void doRemoteFleetControllerTask(Context ctx) {
+ // Trigger a state transition edge that requires a state to be published and ACKed
+ NodeState newNodeState = new NodeState(NodeType.STORAGE, State.MAINTENANCE);
+ NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0));
+ nodeInfo.setWantedState(newNodeState);
+ ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState);
+ invoked = true;
+ }
+ }
+
+ private static class MockIdempotentSynchronousTask extends MockTask {
+ @Override
+ public void doRemoteFleetControllerTask(Context ctx) {
+ // Tests scheduling this task shall have ensured that node storage.0 already is DOWN,
+ // so this won't trigger a new state to be published
+ NodeState newNodeState = new NodeState(NodeType.STORAGE, State.DOWN);
+ NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0));
+ nodeInfo.setWantedState(newNodeState);
+ ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState);
+ invoked = true;
+ }
+ }
+
+ // TODO ideally we'd break this out so it doesn't depend on fields in the parent test instance, but
+ // fleet controller tests have a _lot_ of state, so risk of duplicating a lot of that...
+ class RemoteTaskFixture {
+ RemoteTaskFixture(FleetControllerOptions options) throws Exception {
+ initialize(options);
+ ctrl.tick();
+ }
+
+ MockTask scheduleTask(MockTask task) throws Exception {
+ ctrl.schedule(task);
+ ctrl.tick(); // Task processing iteration
+ return task;
+ }
+
+ MockTask scheduleNonIdempotentVersionDependentTask() throws Exception {
+ return scheduleTask(new MockSynchronousTask());
+ }
+
+ MockTask scheduleIdempotentVersionDependentTask() throws Exception {
+ return scheduleTask(new MockIdempotentSynchronousTask());
+ }
+
+ void markStorageNodeDown(int index) throws Exception {
+ communicator.setNodeState(new Node(NodeType.STORAGE, index), State.DOWN, "foo"); // Auto-ACKed
+ ctrl.tick();
+ }
+
+ void sendPartialDeferredDistributorClusterStateAcks() throws Exception {
+ communicator.sendPartialDeferredDistributorClusterStateAcks();
+ ctrl.tick();
+ }
+
+ void sendAllDeferredDistributorClusterStateAcks() throws Exception {
+ communicator.sendAllDeferredDistributorClusterStateAcks();
+ ctrl.tick();
+ }
+ }
+
+ private static FleetControllerOptions defaultOptions() {
+ return new FleetControllerOptions("mycluster", createNodes(10));
+ }
+
+ private static FleetControllerOptions optionsWithZeroTransitionTime() {
+ FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10));
+ options.maxTransitionTime.put(NodeType.STORAGE, 0);
+ return options;
+ }
+
+ private static FleetControllerOptions optionsAllowingZeroNodesDown() {
+ FleetControllerOptions options = optionsWithZeroTransitionTime();
+ options.minStorageNodesUp = 10;
+ options.minDistributorNodesUp = 10;
+ return options;
+ }
+
+ private RemoteTaskFixture createFixtureWith(FleetControllerOptions options) throws Exception {
+ return new RemoteTaskFixture(options);
+ }
+
+ private RemoteTaskFixture createDefaultFixture() throws Exception {
+ return new RemoteTaskFixture(defaultOptions());
+ }
+
+ @Test
+ public void synchronous_remote_task_is_completed_when_state_is_acked_by_cluster() throws Exception {
+ RemoteTaskFixture fixture = createDefaultFixture();
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+
+ ctrl.tick(); // Cluster state recompute iteration. New state generated, but not ACKed by nodes
+ ctrl.tick(); // Ensure that we're deferring ACKs. Otherwise, this tick would process ACKs and complete tasks.
+ assertFalse(task.isCompleted());
+
+ fixture.sendPartialDeferredDistributorClusterStateAcks();
+ assertFalse(task.isCompleted()); // Not yet acked by all nodes
+
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted()); // Now finally acked by all nodes
+ }
+
+ @Test
+ public void idempotent_synchronous_remote_task_can_complete_immediately_if_current_state_already_acked() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+ fixture.markStorageNodeDown(0);
+ MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred tasks processing; should complete tasks
+ assertTrue(task.isCompleted());
+ }
+
+ @Test
+ public void idempotent_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+ fixture.markStorageNodeDown(0);
+ MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred task processing; version not satisfied yet
+ assertFalse(task.isCompleted());
+
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted()); // Now acked by all nodes
+
+ }
+
+ // When the cluster is down no intermediate states will be published to the nodes
+ // unless the state triggers a cluster Up edge. To avoid hanging task responses
+ // for an indeterminate amount of time in this scenario, we effectively treat
+ // tasks running in such a context as if they were idempotent. I.e. we only require
+ // the cluster down-state to have been published.
+ @Test
+ public void immediately_complete_sync_remote_task_when_cluster_is_down() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsAllowingZeroNodesDown());
+ // Controller options require 10/10 nodes up, so take one down to trigger a cluster Down edge.
+ fixture.markStorageNodeDown(1);
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred tasks processing; should complete tasks
+ assertTrue(task.isCompleted());
+
+
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java
index 3fe489a0d5a..88483e47f4e 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java
@@ -1,14 +1,19 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.restapiv2;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vespa.clustercontroller.core.restapiv2.requests.SetNodeStateRequest;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.InvalidContentException;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.MissingUnitException;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.OperationNotSupportedForUnitException;
+import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.UnknownMasterException;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.requests.SetUnitStateRequest;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.SetResponse;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.UnitResponse;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.UnitState;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -17,13 +22,18 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SetNodeStateTest extends StateRestApiTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
public static class SetUnitStateRequestImpl extends StateRequest implements SetUnitStateRequest {
private Map<String, UnitState> newStates = new LinkedHashMap<>();
private Condition condition = Condition.FORCE;
+ private ResponseWait responseWait = ResponseWait.WAIT_UNTIL_CLUSTER_ACKED;
public SetUnitStateRequestImpl(String req) {
super(req, 0);
@@ -34,6 +44,11 @@ public class SetNodeStateTest extends StateRestApiTest {
return this;
}
+ public SetUnitStateRequestImpl setResponseWait(ResponseWait responseWait) {
+ this.responseWait = responseWait;
+ return this;
+ }
+
public SetUnitStateRequestImpl setNewState(
final String type,
final String state,
@@ -61,6 +76,11 @@ public class SetNodeStateTest extends StateRestApiTest {
public Condition getCondition() {
return condition;
}
+
+ @Override
+ public ResponseWait getResponseWait() {
+ return responseWait;
+ }
}
private void verifyStateSet(String state, String reason) throws Exception {
@@ -345,4 +365,40 @@ public class SetNodeStateTest extends StateRestApiTest {
assertEquals(expected, jsonWriter.createJson(response).toString(2));
}
+ private Id.Node createDummyId() {
+ return new Id.Node(new Id.Service(new Id.Cluster("foo"), NodeType.STORAGE), 0);
+ }
+
+ private SetNodeStateRequest createDummySetNodeStateRequest() {
+ return new SetNodeStateRequest(createDummyId(), new SetUnitStateRequestImpl("music/storage/1")
+ .setNewState("user", "maintenance", "whatever reason."));
+ }
+
+ @Test
+ public void set_node_state_requests_are_by_default_tagged_as_having_version_ack_dependency() {
+ SetNodeStateRequest request = createDummySetNodeStateRequest();
+ assertTrue(request.hasVersionAckDependency());
+ }
+
+ @Test
+ public void set_node_state_requests_may_override_version_ack_dependency() {
+ SetNodeStateRequest request = new SetNodeStateRequest(createDummyId(), new SetUnitStateRequestImpl("music/storage/1")
+ .setNewState("user", "maintenance", "whatever reason.")
+ .setResponseWait(SetUnitStateRequest.ResponseWait.NO_WAIT));
+ assertFalse(request.hasVersionAckDependency());
+ }
+
+ // Technically, this failure mode currently applies to all requests, but it's only really
+ // important to test (and expected to happen) for requests that have dependencies on cluster
+ // state version publishing.
+ @Test
+ public void leadership_loss_fails_set_node_state_request() throws Exception {
+ expectedException.expectMessage("Leadership lost before request could complete");
+ expectedException.expect(UnknownMasterException.class);
+
+ SetNodeStateRequest request = createDummySetNodeStateRequest();
+ request.handleLeadershipLost();
+ request.getResult();
+ }
+
}