summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java80
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java22
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java43
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java11
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java29
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java179
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java56
9 files changed, 426 insertions, 18 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 47ac5259190..ab16d9a1d1d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils;
import java.io.FileNotFoundException;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
@@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private long firstAllowedStateBroadcast = Long.MAX_VALUE;
private long tickStartTime = Long.MAX_VALUE;
+ private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>();
+ // Invariant: queued task versions are monotonically increasing with queue position
+ private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>();
+
private final RunDataExtractor dataExtractor = new RunDataExtractor() {
@Override
public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); }
@@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
masterElectionHandler.lostDatabaseConnection();
}
+ private void failAllVersionDependentTasks() {
+ tasksPendingStateRecompute.forEach(task -> {
+ task.handleLeadershipLost();
+ task.notifyCompleted();
+ });
+ tasksPendingStateRecompute.clear();
+ taskCompletionQueue.forEach(task -> {
+ task.getTask().handleLeadershipLost();
+ task.getTask().notifyCompleted();
+ });
+ taskCompletionQueue.clear();
+ }
+
/** Called when all distributors have acked newest cluster state version. */
public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException {
Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values());
- stateChangeHandler.handleAllDistributorsInSync(
- stateVersionTracker.getVersionedClusterState(), nodes, database, context);
+ ClusterState currentState = stateVersionTracker.getVersionedClusterState();
+ log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion()));
+ stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context);
}
private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) {
@@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! isRunning()) { return; }
didWork |= processNextQueuedRemoteTask();
+ didWork |= completeSatisfiedVersionDependentTasks();
processingCycle = false;
++cycleCount;
@@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! remoteTasks.isEmpty()) {
final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext();
final RemoteClusterControllerTask task = remoteTasks.poll();
- log.finest("Processing remote task " + task.getClass().getName());
+ log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName()));
task.doRemoteFleetControllerTask(context);
- task.notifyCompleted();
- log.finest("Done processing remote task " + task.getClass().getName());
+ if (!task.hasVersionAckDependency()) {
+ log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName()));
+ task.notifyCompleted();
+ } else {
+ log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName()));
+ tasksPendingStateRecompute.add(task);
+ }
return true;
}
return false;
@@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
return context;
}
+ private boolean completeSatisfiedVersionDependentTasks() {
+ int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync();
+ long queueSizeBefore = taskCompletionQueue.size();
+ while (!taskCompletionQueue.isEmpty()) {
+ VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek();
+ if (publishedVersion >= taskCompletion.getMinimumVersion()) {
+ log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing",
+ taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion));
+ taskCompletion.getTask().notifyCompleted();
+ taskCompletionQueue.remove();
+ } else {
+ break;
+ }
+ }
+ return (taskCompletionQueue.size() != queueSizeBefore);
+ }
+
/**
* A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are
* up or down even when the whole cluster is down. The regular, published cluster state is not
@@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
}
private boolean recomputeClusterStateIfRequired() {
+ boolean stateWasChanged = false;
if (mustRecomputeCandidateClusterState()) {
stateChangeHandler.unsetStateChangedFlag();
final AnnotatedClusterState candidate = computeCurrentAnnotatedState();
@@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
stateVersionTracker.promoteCandidateToVersionedState(timeNowMs);
emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs);
handleNewSystemState(stateVersionTracker.getVersionedClusterState());
- return true;
+ stateWasChanged = true;
}
}
- return false;
+ scheduleVersionDependentTasksForFutureCompletion();
+ return stateWasChanged;
+ }
+
+ /**
+ * Move tasks that are dependent on the most recently generated state being published into
+ * a completion queue with a dependency on the current version. Once that version
+ * has been ACKed by all distributors in the system, those tasks will be marked as completed.
+ *
+ * This works transparently for tasks that end up changing the current cluster state (i.e.
+ * requiring a new state to be published) and for those whose changes are idempotent. In the
+ * former case the tasks will depend on the version that was generated based upon them. In
+ * the latter case the tasks will depend on the version that is already published (or in the
+ * process of being published).
+ */
+ private void scheduleVersionDependentTasksForFutureCompletion() {
+ for (RemoteClusterControllerTask task : tasksPendingStateRecompute) {
+ log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d",
+ task.getClass().getName(), stateVersionTracker.getCurrentVersion()));
+ taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task));
+ }
+ tasksPendingStateRecompute.clear();
}
private AnnotatedClusterState computeCurrentAnnotatedState() {
@@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis()));
firstAllowedStateBroadcast = Long.MAX_VALUE;
metricUpdater.noLongerMaster();
+ failAllVersionDependentTasks(); // TODO test
}
wantedStateChanged = false;
isMaster = false;
@@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
System.exit(1);
} finally {
running.set(false);
+ failAllVersionDependentTasks(); // TODO test....
synchronized (monitor) { monitor.notifyAll(); }
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
index 4a0949cc997..0e20410ce77 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java
@@ -20,6 +20,28 @@ public abstract class RemoteClusterControllerTask {
public abstract void doRemoteFleetControllerTask(Context context);
+ /**
+ * If the task should _not_ be considered complete before a cluster state
+ * version representing the changes made by the task has been ACKed by
+ * all distributors.
+ *
+ * Note that if a task performs an idempotent state change (e.g. setting maintenance
+ * mode on a node already in maintenance mode), the task may be considered complete
+ * immediately if its effective changes have already been ACKed.
+ */
+ public boolean hasVersionAckDependency() { return false; }
+
+ /**
+ * If the task response has been deferred due to hasVersionAckDependency(),
+ * handleLeadershipLost() will be invoked on the task if the cluster controller
+ * discovers it has lost leadership in the time between task execution and
+ * deferred response send time.
+ *
+ * This method will also be invoked if the controller is signalled to shut down
+ * before the dependent cluster version has been published.
+ */
+ public void handleLeadershipLost() {}
+
public boolean isCompleted() {
synchronized (monitor) {
return completed;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 6fac9c9c780..ba53cdee4e8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -91,25 +91,26 @@ public class SystemStateBroadcaster {
{
return false; // No point in sending system state to nodes that can't receive messages or don't want them
}
- if (node.getNewestSystemStateVersionSent() == systemState.getVersion()) {
- return false; // No point in sending if we already have done so
- }
return true;
}
- private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
+ private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) {
return dbContext.getCluster().getNodeInfo().stream()
.filter(this::nodeNeedsClusterState)
.collect(Collectors.toList());
}
+ private boolean newestStateAlreadySentToNode(NodeInfo node) {
+ return (node.getNewestSystemStateVersionSent() == systemState.getVersion());
+ }
+
public boolean broadcastNewState(DatabaseHandler database,
DatabaseHandler.Context dbContext,
Communicator communicator,
FleetController fleetController) throws InterruptedException {
if (systemState == null) return false;
- List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext);
+ List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext);
if (!systemState.isOfficial()) {
systemState.setOfficial(true);
}
@@ -119,6 +120,9 @@ public class SystemStateBroadcaster {
if (node.isDistributor()) {
anyOutdatedDistributorNodes = true;
}
+ if (newestStateAlreadySentToNode(node)) {
+ continue; // No need to send anything more, but still have to mark node as outdated.
+ }
if (nodeNeedsToObserveStartupTimestamps(node)) {
ClusterState newState = buildModifiedClusterState(dbContext);
log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion()
@@ -139,6 +143,8 @@ public class SystemStateBroadcaster {
return !recipients.isEmpty();
}
+ public int lastClusterStateVersionInSync() { return lastClusterStateInSync; }
+
private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java
new file mode 100644
index 00000000000..d7b8f96005a
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java
@@ -0,0 +1,43 @@
+// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import java.util.Objects;
+
+/**
+ * Wrapper for a remote (i.e. REST API) cluster controller task whose
+ * completion depends on side effects by the task becoming visible in
+ * the cluster before a response can be sent. Each such task is associated
+ * with a particular cluster state version number representing a lower bound
+ * on the published state containing the side effect.
+ */
+class VersionDependentTaskCompletion {
+ private final long minimumVersion;
+ private final RemoteClusterControllerTask task;
+
+ VersionDependentTaskCompletion(long minimumVersion, RemoteClusterControllerTask task) {
+ this.minimumVersion = minimumVersion;
+ this.task = task;
+ }
+
+ long getMinimumVersion() {
+ return minimumVersion;
+ }
+
+ RemoteClusterControllerTask getTask() {
+ return task;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ VersionDependentTaskCompletion that = (VersionDependentTaskCompletion) o;
+ return minimumVersion == that.minimumVersion &&
+ Objects.equals(task, that.task);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(minimumVersion, task);
+ }
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
index 84c7c655a11..cb260e7de2a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java
@@ -12,6 +12,8 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
NEED_NOT_BE_MASTER
}
+ // TODO a lot of this logic could be replaced with a CompleteableFuture
+
private Exception failure = null;
private boolean resultSet = false;
private Result result = null;
@@ -49,9 +51,7 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
}
result = calculateResult(context);
resultSet = true;
- } catch (OtherMasterIndexException e) {
- failure = e;
- } catch (StateRestApiException e) {
+ } catch (OtherMasterIndexException | StateRestApiException e) {
failure = e;
} catch (Exception e) {
failure = new InternalFailure("Caught unexpected exception");
@@ -59,5 +59,10 @@ public abstract class Request<Result> extends RemoteClusterControllerTask {
}
}
+ @Override
+ public void handleLeadershipLost() {
+ failure = new UnknownMasterException("Leadership lost before request could complete");
+ }
+
public abstract Result calculateResult(Context context) throws StateRestApiException, OtherMasterIndexException;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
index 47741a5f704..c56ce5da352 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java
@@ -10,7 +10,6 @@ import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.NodeStateChangeChecker;
import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTask;
import com.yahoo.vespa.clustercontroller.core.ContentCluster;
-import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import com.yahoo.vespa.clustercontroller.core.restapiv2.Id;
import com.yahoo.vespa.clustercontroller.core.restapiv2.MissingIdException;
@@ -29,6 +28,7 @@ public class SetNodeStateRequest extends Request<SetResponse> {
private final Id.Node id;
private final Map<String, UnitState> newStates;
private final SetUnitStateRequest.Condition condition;
+ private final SetUnitStateRequest.ResponseWait responseWait;
public SetNodeStateRequest(Id.Node id, SetUnitStateRequest setUnitStateRequest) {
@@ -36,6 +36,7 @@ public class SetNodeStateRequest extends Request<SetResponse> {
this.id = id;
this.newStates = setUnitStateRequest.getNewState();
this.condition = setUnitStateRequest.getCondition();
+ this.responseWait = setUnitStateRequest.getResponseWait();
}
@Override
@@ -64,6 +65,11 @@ public class SetNodeStateRequest extends Request<SetResponse> {
return new NodeState(n.getType(), state).setDescription(newState.getReason());
}
+ @Override
+ public boolean hasVersionAckDependency() {
+ return (this.responseWait == SetUnitStateRequest.ResponseWait.WAIT_UNTIL_CLUSTER_ACKED);
+ }
+
static SetResponse setWantedState(
ContentCluster cluster,
SetUnitStateRequest.Condition condition,
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index 8148ad69488..c500b4c7390 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -1,12 +1,14 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.collections.Pair;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.Node;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -15,6 +17,12 @@ public class DummyCommunicator implements Communicator, NodeLookup {
List<Node> newNodes;
Timer timer;
+ private boolean shouldDeferDistributorClusterStateAcks = false;
+ private List<Pair<Waiter<SetClusterStateRequest>, DummySetClusterStateRequest>> deferredClusterStateAcks = new ArrayList<>();
+
+ public void setShouldDeferDistributorClusterStateAcks(boolean shouldDeferDistributorClusterStateAcks) {
+ this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks;
+ }
public class DummyGetNodeStateRequest extends GetNodeStateRequest {
Waiter<GetNodeStateRequest> waiter;
@@ -82,7 +90,26 @@ public class DummyCommunicator implements Communicator, NodeLookup {
DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, state);
node.setSystemStateVersionSent(state);
req.setReply(new SetClusterStateRequest.Reply());
- waiter.done(req);
+ if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
+ waiter.done(req);
+ } else {
+ deferredClusterStateAcks.add(new Pair<>(waiter, req));
+ }
+ }
+
+ public void sendAllDeferredDistributorClusterStateAcks() {
+ deferredClusterStateAcks.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond()));
+ deferredClusterStateAcks.clear();
+ }
+
+ public void sendPartialDeferredDistributorClusterStateAcks() {
+ if (deferredClusterStateAcks.isEmpty()) {
+ throw new IllegalStateException("Tried to send partial ACKs with no ACKs deferred");
+ }
+ List<Pair<Waiter<SetClusterStateRequest>, DummySetClusterStateRequest>> toAck =
+ deferredClusterStateAcks.subList(0, deferredClusterStateAcks.size() - 1);
+ toAck.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond()));
+ deferredClusterStateAcks.removeAll(toAck);
}
@Override
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index ea0b409c2b4..28d1095f629 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -13,6 +13,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.*;
@@ -78,7 +79,7 @@ public class StateChangeTest extends FleetControllerTest {
}
- private List<ConfiguredNode> createNodes(int count) {
+ private static List<ConfiguredNode> createNodes(int count) {
List<ConfiguredNode> nodes = new ArrayList<>();
for (int i = 0; i < count; i++)
nodes.add(new ConfiguredNode(i, false));
@@ -1239,4 +1240,180 @@ public class StateChangeTest extends FleetControllerTest {
}
+ private static abstract class MockTask extends RemoteClusterControllerTask {
+ boolean invoked = false;
+
+ boolean isInvoked() { return invoked; }
+
+ @Override
+ public boolean hasVersionAckDependency() { return true; }
+ }
+
+ // We create an explicit mock task class instead of using mock() simply because of
+ // the utter pain that mocking void functions (doRemoteFleetControllerTask()) is
+ // when using Mockito.
+ private static class MockSynchronousTask extends MockTask {
+ @Override
+ public void doRemoteFleetControllerTask(Context ctx) {
+ // Trigger a state transition edge that requires a state to be published and ACKed
+ NodeState newNodeState = new NodeState(NodeType.STORAGE, State.MAINTENANCE);
+ NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0));
+ nodeInfo.setWantedState(newNodeState);
+ ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState);
+ invoked = true;
+ }
+ }
+
+ private static class MockIdempotentSynchronousTask extends MockTask {
+ @Override
+ public void doRemoteFleetControllerTask(Context ctx) {
+ // Tests scheduling this task shall have ensured that node storage.0 already is DOWN,
+ // so this won't trigger a new state to be published
+ NodeState newNodeState = new NodeState(NodeType.STORAGE, State.DOWN);
+ NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0));
+ nodeInfo.setWantedState(newNodeState);
+ ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState);
+ invoked = true;
+ }
+ }
+
+ // TODO ideally we'd break this out so it doesn't depend on fields in the parent test instance, but
+ // fleet controller tests have a _lot_ of state, so risk of duplicating a lot of that...
+ class RemoteTaskFixture {
+ RemoteTaskFixture(FleetControllerOptions options) throws Exception {
+ initialize(options);
+ ctrl.tick();
+ }
+
+ MockTask scheduleTask(MockTask task) throws Exception {
+ ctrl.schedule(task);
+ ctrl.tick(); // Task processing iteration
+ return task;
+ }
+
+ MockTask scheduleNonIdempotentVersionDependentTask() throws Exception {
+ return scheduleTask(new MockSynchronousTask());
+ }
+
+ MockTask scheduleIdempotentVersionDependentTask() throws Exception {
+ return scheduleTask(new MockIdempotentSynchronousTask());
+ }
+
+ void markStorageNodeDown(int index) throws Exception {
+ communicator.setNodeState(new Node(NodeType.STORAGE, index), State.DOWN, "foo"); // Auto-ACKed
+ ctrl.tick();
+ }
+
+ void sendPartialDeferredDistributorClusterStateAcks() throws Exception {
+ communicator.sendPartialDeferredDistributorClusterStateAcks();
+ ctrl.tick();
+ }
+
+ void sendAllDeferredDistributorClusterStateAcks() throws Exception {
+ communicator.sendAllDeferredDistributorClusterStateAcks();
+ ctrl.tick();
+ }
+ }
+
+ private static FleetControllerOptions defaultOptions() {
+ return new FleetControllerOptions("mycluster", createNodes(10));
+ }
+
+ private static FleetControllerOptions optionsWithZeroTransitionTime() {
+ FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10));
+ options.maxTransitionTime.put(NodeType.STORAGE, 0);
+ return options;
+ }
+
+ private static FleetControllerOptions optionsAllowingZeroNodesDown() {
+ FleetControllerOptions options = optionsWithZeroTransitionTime();
+ options.minStorageNodesUp = 10;
+ options.minDistributorNodesUp = 10;
+ return options;
+ }
+
+ private RemoteTaskFixture createFixtureWith(FleetControllerOptions options) throws Exception {
+ return new RemoteTaskFixture(options);
+ }
+
+ private RemoteTaskFixture createDefaultFixture() throws Exception {
+ return new RemoteTaskFixture(defaultOptions());
+ }
+
+ @Test
+ public void synchronous_remote_task_is_completed_when_state_is_acked_by_cluster() throws Exception {
+ RemoteTaskFixture fixture = createDefaultFixture();
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+
+ ctrl.tick(); // Cluster state recompute iteration. New state generated, but not ACKed by nodes
+ ctrl.tick(); // Ensure that we're deferring ACKs. Otherwise, this tick would process ACKs and complete tasks.
+ assertFalse(task.isCompleted());
+
+ fixture.sendPartialDeferredDistributorClusterStateAcks();
+ assertFalse(task.isCompleted()); // Not yet acked by all nodes
+
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted()); // Now finally acked by all nodes
+ }
+
+ @Test
+ public void idempotent_synchronous_remote_task_can_complete_immediately_if_current_state_already_acked() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+ fixture.markStorageNodeDown(0);
+ MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred tasks processing; should complete tasks
+ assertTrue(task.isCompleted());
+ }
+
+ @Test
+ public void idempotent_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+ fixture.markStorageNodeDown(0);
+ MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred task processing; version not satisfied yet
+ assertFalse(task.isCompleted());
+
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted()); // Now acked by all nodes
+
+ }
+
+ // When the cluster is down no intermediate states will be published to the nodes
+ // unless the state triggers a cluster Up edge. To avoid hanging task responses
+ // for an indeterminate amount of time in this scenario, we effectively treat
+ // tasks running in such a context as if they were idempotent. I.e. we only require
+ // the cluster down-state to have been published.
+ @Test
+ public void immediately_complete_sync_remote_task_when_cluster_is_down() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsAllowingZeroNodesDown());
+ // Controller options require 10/10 nodes up, so take one down to trigger a cluster Down edge.
+ fixture.markStorageNodeDown(1);
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred tasks processing; should complete tasks
+ assertTrue(task.isCompleted());
+
+
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java
index 3fe489a0d5a..88483e47f4e 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java
@@ -1,14 +1,19 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.restapiv2;
+import com.yahoo.vdslib.state.NodeType;
+import com.yahoo.vespa.clustercontroller.core.restapiv2.requests.SetNodeStateRequest;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.InvalidContentException;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.MissingUnitException;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.OperationNotSupportedForUnitException;
+import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.UnknownMasterException;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.requests.SetUnitStateRequest;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.SetResponse;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.UnitResponse;
import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.UnitState;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -17,13 +22,18 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.StringContains.containsString;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class SetNodeStateTest extends StateRestApiTest {
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
public static class SetUnitStateRequestImpl extends StateRequest implements SetUnitStateRequest {
private Map<String, UnitState> newStates = new LinkedHashMap<>();
private Condition condition = Condition.FORCE;
+ private ResponseWait responseWait = ResponseWait.WAIT_UNTIL_CLUSTER_ACKED;
public SetUnitStateRequestImpl(String req) {
super(req, 0);
@@ -34,6 +44,11 @@ public class SetNodeStateTest extends StateRestApiTest {
return this;
}
+ public SetUnitStateRequestImpl setResponseWait(ResponseWait responseWait) {
+ this.responseWait = responseWait;
+ return this;
+ }
+
public SetUnitStateRequestImpl setNewState(
final String type,
final String state,
@@ -61,6 +76,11 @@ public class SetNodeStateTest extends StateRestApiTest {
public Condition getCondition() {
return condition;
}
+
+ @Override
+ public ResponseWait getResponseWait() {
+ return responseWait;
+ }
}
private void verifyStateSet(String state, String reason) throws Exception {
@@ -345,4 +365,40 @@ public class SetNodeStateTest extends StateRestApiTest {
assertEquals(expected, jsonWriter.createJson(response).toString(2));
}
+ private Id.Node createDummyId() {
+ return new Id.Node(new Id.Service(new Id.Cluster("foo"), NodeType.STORAGE), 0);
+ }
+
+ private SetNodeStateRequest createDummySetNodeStateRequest() {
+ return new SetNodeStateRequest(createDummyId(), new SetUnitStateRequestImpl("music/storage/1")
+ .setNewState("user", "maintenance", "whatever reason."));
+ }
+
+ @Test
+ public void set_node_state_requests_are_by_default_tagged_as_having_version_ack_dependency() {
+ SetNodeStateRequest request = createDummySetNodeStateRequest();
+ assertTrue(request.hasVersionAckDependency());
+ }
+
+ @Test
+ public void set_node_state_requests_may_override_version_ack_dependency() {
+ SetNodeStateRequest request = new SetNodeStateRequest(createDummyId(), new SetUnitStateRequestImpl("music/storage/1")
+ .setNewState("user", "maintenance", "whatever reason.")
+ .setResponseWait(SetUnitStateRequest.ResponseWait.NO_WAIT));
+ assertFalse(request.hasVersionAckDependency());
+ }
+
+ // Technically, this failure mode currently applies to all requests, but it's only really
+ // important to test (and expected to happen) for requests that have dependencies on cluster
+ // state version publishing.
+ @Test
+ public void leadership_loss_fails_set_node_state_request() throws Exception {
+ expectedException.expectMessage("Leadership lost before request could complete");
+ expectedException.expect(UnknownMasterException.class);
+
+ SetNodeStateRequest request = createDummySetNodeStateRequest();
+ request.handleLeadershipLost();
+ request.getResult();
+ }
+
}