diff options
Diffstat (limited to 'clustercontroller-core/src')
9 files changed, 426 insertions, 18 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 47ac5259190..ab16d9a1d1d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.exception.ExceptionUtils; import java.io.FileNotFoundException; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -79,6 +80,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private long firstAllowedStateBroadcast = Long.MAX_VALUE; private long tickStartTime = Long.MAX_VALUE; + private List<RemoteClusterControllerTask> tasksPendingStateRecompute = new ArrayList<>(); + // Invariant: queued task versions are monotonically increasing with queue position + private Queue<VersionDependentTaskCompletion> taskCompletionQueue = new ArrayDeque<>(); + private final RunDataExtractor dataExtractor = new RunDataExtractor() { @Override public com.yahoo.vdslib.state.ClusterState getLatestClusterState() { return stateVersionTracker.getVersionedClusterState(); } @@ -393,11 +398,25 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd masterElectionHandler.lostDatabaseConnection(); } + private void failAllVersionDependentTasks() { + tasksPendingStateRecompute.forEach(task -> { + task.handleLeadershipLost(); + task.notifyCompleted(); + }); + tasksPendingStateRecompute.clear(); + taskCompletionQueue.forEach(task -> { + task.getTask().handleLeadershipLost(); + task.getTask().notifyCompleted(); + }); + taskCompletionQueue.clear(); + } + /** Called when all distributors have acked newest cluster state version. */ public void handleAllDistributorsInSync(DatabaseHandler database, DatabaseHandler.Context context) throws InterruptedException { Set<ConfiguredNode> nodes = new HashSet<>(cluster.clusterInfo().getConfiguredNodes().values()); - stateChangeHandler.handleAllDistributorsInSync( - stateVersionTracker.getVersionedClusterState(), nodes, database, context); + ClusterState currentState = stateVersionTracker.getVersionedClusterState(); + log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion())); + stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context); } private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) { @@ -542,6 +561,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! isRunning()) { return; } didWork |= processNextQueuedRemoteTask(); + didWork |= completeSatisfiedVersionDependentTasks(); processingCycle = false; ++cycleCount; @@ -642,10 +662,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! remoteTasks.isEmpty()) { final RemoteClusterControllerTask.Context context = createRemoteTaskProcessingContext(); final RemoteClusterControllerTask task = remoteTasks.poll(); - log.finest("Processing remote task " + task.getClass().getName()); + log.finest(() -> String.format("Processing remote task of type '%s'", task.getClass().getName())); task.doRemoteFleetControllerTask(context); - task.notifyCompleted(); - log.finest("Done processing remote task " + task.getClass().getName()); + if (!task.hasVersionAckDependency()) { + log.finest(() -> String.format("Done processing remote task of type '%s'", task.getClass().getName())); + task.notifyCompleted(); + } else { + log.finest(() -> String.format("Remote task of type '%s' queued until state recomputation", task.getClass().getName())); + tasksPendingStateRecompute.add(task); + } return true; } return false; @@ -661,6 +686,23 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd return context; } + private boolean completeSatisfiedVersionDependentTasks() { + int publishedVersion = systemStateBroadcaster.lastClusterStateVersionInSync(); + long queueSizeBefore = taskCompletionQueue.size(); + while (!taskCompletionQueue.isEmpty()) { + VersionDependentTaskCompletion taskCompletion = taskCompletionQueue.peek(); + if (publishedVersion >= taskCompletion.getMinimumVersion()) { + log.info(() -> String.format("Deferred task of type '%s' has minimum version %d, published is %d; completing", + taskCompletion.getTask().getClass().getName(), taskCompletion.getMinimumVersion(), publishedVersion)); + taskCompletion.getTask().notifyCompleted(); + taskCompletionQueue.remove(); + } else { + break; + } + } + return (taskCompletionQueue.size() != queueSizeBefore); + } + /** * A "consolidated" cluster state is guaranteed to have up-to-date information on which nodes are * up or down even when the whole cluster is down. The regular, published cluster state is not @@ -713,6 +755,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd } private boolean recomputeClusterStateIfRequired() { + boolean stateWasChanged = false; if (mustRecomputeCandidateClusterState()) { stateChangeHandler.unsetStateChangedFlag(); final AnnotatedClusterState candidate = computeCurrentAnnotatedState(); @@ -727,10 +770,31 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd stateVersionTracker.promoteCandidateToVersionedState(timeNowMs); emitEventsForAlteredStateEdges(before, stateVersionTracker.getAnnotatedVersionedClusterState(), timeNowMs); handleNewSystemState(stateVersionTracker.getVersionedClusterState()); - return true; + stateWasChanged = true; } } - return false; + scheduleVersionDependentTasksForFutureCompletion(); + return stateWasChanged; + } + + /** + * Move tasks that are dependent on the most recently generated state being published into + * a completion queue with a dependency on the current version. Once that version + * has been ACKed by all distributors in the system, those tasks will be marked as completed. + * + * This works transparently for tasks that end up changing the current cluster state (i.e. + * requiring a new state to be published) and for those whose changes are idempotent. In the + * former case the tasks will depend on the version that was generated based upon them. In + * the latter case the tasks will depend on the version that is already published (or in the + * process of being published). + */ + private void scheduleVersionDependentTasksForFutureCompletion() { + for (RemoteClusterControllerTask task : tasksPendingStateRecompute) { + log.finest(() -> String.format("Adding task of type '%s' to be acked at version %d", + task.getClass().getName(), stateVersionTracker.getCurrentVersion())); + taskCompletionQueue.add(new VersionDependentTaskCompletion(stateVersionTracker.getCurrentVersion(), task)); + } + tasksPendingStateRecompute.clear(); } private AnnotatedClusterState computeCurrentAnnotatedState() { @@ -804,6 +868,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd eventLog.add(new ClusterEvent(ClusterEvent.Type.MASTER_ELECTION, "This node is no longer fleetcontroller master.", timer.getCurrentTimeInMillis())); firstAllowedStateBroadcast = Long.MAX_VALUE; metricUpdater.noLongerMaster(); + failAllVersionDependentTasks(); // TODO test } wantedStateChanged = false; isMaster = false; @@ -827,6 +892,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd System.exit(1); } finally { running.set(false); + failAllVersionDependentTasks(); // TODO test.... synchronized (monitor) { monitor.notifyAll(); } } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java index 4a0949cc997..0e20410ce77 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/RemoteClusterControllerTask.java @@ -20,6 +20,28 @@ public abstract class RemoteClusterControllerTask { public abstract void doRemoteFleetControllerTask(Context context); + /** + * If the task should _not_ be considered complete before a cluster state + * version representing the changes made by the task has been ACKed by + * all distributors. + * + * Note that if a task performs an idempotent state change (e.g. setting maintenance + * mode on a node already in maintenance mode), the task may be considered complete + * immediately if its effective changes have already been ACKed. + */ + public boolean hasVersionAckDependency() { return false; } + + /** + * If the task response has been deferred due to hasVersionAckDependency(), + * handleLeadershipLost() will be invoked on the task if the cluster controller + * discovers it has lost leadership in the time between task execution and + * deferred response send time. + * + * This method will also be invoked if the controller is signalled to shut down + * before the dependent cluster version has been published. + */ + public void handleLeadershipLost() {} + public boolean isCompleted() { synchronized (monitor) { return completed; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 6fac9c9c780..ba53cdee4e8 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -91,25 +91,26 @@ public class SystemStateBroadcaster { { return false; // No point in sending system state to nodes that can't receive messages or don't want them } - if (node.getNewestSystemStateVersionSent() == systemState.getVersion()) { - return false; // No point in sending if we already have done so - } return true; } - private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) { + private List<NodeInfo> resolveStateVersionCandidateSendSet(DatabaseHandler.Context dbContext) { return dbContext.getCluster().getNodeInfo().stream() .filter(this::nodeNeedsClusterState) .collect(Collectors.toList()); } + private boolean newestStateAlreadySentToNode(NodeInfo node) { + return (node.getNewestSystemStateVersionSent() == systemState.getVersion()); + } + public boolean broadcastNewState(DatabaseHandler database, DatabaseHandler.Context dbContext, Communicator communicator, FleetController fleetController) throws InterruptedException { if (systemState == null) return false; - List<NodeInfo> recipients = resolveStateVersionSendSet(dbContext); + List<NodeInfo> recipients = resolveStateVersionCandidateSendSet(dbContext); if (!systemState.isOfficial()) { systemState.setOfficial(true); } @@ -119,6 +120,9 @@ public class SystemStateBroadcaster { if (node.isDistributor()) { anyOutdatedDistributorNodes = true; } + if (newestStateAlreadySentToNode(node)) { + continue; // No need to send anything more, but still have to mark node as outdated. + } if (nodeNeedsToObserveStartupTimestamps(node)) { ClusterState newState = buildModifiedClusterState(dbContext); log.log(LogLevel.DEBUG, "Sending modified system state version " + systemState.getVersion() @@ -139,6 +143,8 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } + public int lastClusterStateVersionInSync() { return lastClusterStateInSync; } + private boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java new file mode 100644 index 00000000000..d7b8f96005a --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/VersionDependentTaskCompletion.java @@ -0,0 +1,43 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import java.util.Objects; + +/** + * Wrapper for a remote (i.e. REST API) cluster controller task whose + * completion depends on side effects by the task becoming visible in + * the cluster before a response can be sent. Each such task is associated + * with a particular cluster state version number representing a lower bound + * on the published state containing the side effect. + */ +class VersionDependentTaskCompletion { + private final long minimumVersion; + private final RemoteClusterControllerTask task; + + VersionDependentTaskCompletion(long minimumVersion, RemoteClusterControllerTask task) { + this.minimumVersion = minimumVersion; + this.task = task; + } + + long getMinimumVersion() { + return minimumVersion; + } + + RemoteClusterControllerTask getTask() { + return task; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + VersionDependentTaskCompletion that = (VersionDependentTaskCompletion) o; + return minimumVersion == that.minimumVersion && + Objects.equals(task, that.task); + } + + @Override + public int hashCode() { + return Objects.hash(minimumVersion, task); + } +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java index 84c7c655a11..cb260e7de2a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/Request.java @@ -12,6 +12,8 @@ public abstract class Request<Result> extends RemoteClusterControllerTask { NEED_NOT_BE_MASTER } + // TODO a lot of this logic could be replaced with a CompleteableFuture + private Exception failure = null; private boolean resultSet = false; private Result result = null; @@ -49,9 +51,7 @@ public abstract class Request<Result> extends RemoteClusterControllerTask { } result = calculateResult(context); resultSet = true; - } catch (OtherMasterIndexException e) { - failure = e; - } catch (StateRestApiException e) { + } catch (OtherMasterIndexException | StateRestApiException e) { failure = e; } catch (Exception e) { failure = new InternalFailure("Caught unexpected exception"); @@ -59,5 +59,10 @@ public abstract class Request<Result> extends RemoteClusterControllerTask { } } + @Override + public void handleLeadershipLost() { + failure = new UnknownMasterException("Leadership lost before request could complete"); + } + public abstract Result calculateResult(Context context) throws StateRestApiException, OtherMasterIndexException; } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java index 47741a5f704..c56ce5da352 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/restapiv2/requests/SetNodeStateRequest.java @@ -10,7 +10,6 @@ import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.NodeStateChangeChecker; import com.yahoo.vespa.clustercontroller.core.RemoteClusterControllerTask; import com.yahoo.vespa.clustercontroller.core.ContentCluster; -import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; import com.yahoo.vespa.clustercontroller.core.restapiv2.Id; import com.yahoo.vespa.clustercontroller.core.restapiv2.MissingIdException; @@ -29,6 +28,7 @@ public class SetNodeStateRequest extends Request<SetResponse> { private final Id.Node id; private final Map<String, UnitState> newStates; private final SetUnitStateRequest.Condition condition; + private final SetUnitStateRequest.ResponseWait responseWait; public SetNodeStateRequest(Id.Node id, SetUnitStateRequest setUnitStateRequest) { @@ -36,6 +36,7 @@ public class SetNodeStateRequest extends Request<SetResponse> { this.id = id; this.newStates = setUnitStateRequest.getNewState(); this.condition = setUnitStateRequest.getCondition(); + this.responseWait = setUnitStateRequest.getResponseWait(); } @Override @@ -64,6 +65,11 @@ public class SetNodeStateRequest extends Request<SetResponse> { return new NodeState(n.getType(), state).setDescription(newState.getReason()); } + @Override + public boolean hasVersionAckDependency() { + return (this.responseWait == SetUnitStateRequest.ResponseWait.WAIT_UNTIL_CLUSTER_ACKED); + } + static SetResponse setWantedState( ContentCluster cluster, SetUnitStateRequest.Condition condition, diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index 8148ad69488..c500b4c7390 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -1,12 +1,14 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.collections.Pair; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.Node; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -15,6 +17,12 @@ public class DummyCommunicator implements Communicator, NodeLookup { List<Node> newNodes; Timer timer; + private boolean shouldDeferDistributorClusterStateAcks = false; + private List<Pair<Waiter<SetClusterStateRequest>, DummySetClusterStateRequest>> deferredClusterStateAcks = new ArrayList<>(); + + public void setShouldDeferDistributorClusterStateAcks(boolean shouldDeferDistributorClusterStateAcks) { + this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks; + } public class DummyGetNodeStateRequest extends GetNodeStateRequest { Waiter<GetNodeStateRequest> waiter; @@ -82,7 +90,26 @@ public class DummyCommunicator implements Communicator, NodeLookup { DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, state); node.setSystemStateVersionSent(state); req.setReply(new SetClusterStateRequest.Reply()); - waiter.done(req); + if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) { + waiter.done(req); + } else { + deferredClusterStateAcks.add(new Pair<>(waiter, req)); + } + } + + public void sendAllDeferredDistributorClusterStateAcks() { + deferredClusterStateAcks.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond())); + deferredClusterStateAcks.clear(); + } + + public void sendPartialDeferredDistributorClusterStateAcks() { + if (deferredClusterStateAcks.isEmpty()) { + throw new IllegalStateException("Tried to send partial ACKs with no ACKs deferred"); + } + List<Pair<Waiter<SetClusterStateRequest>, DummySetClusterStateRequest>> toAck = + deferredClusterStateAcks.subList(0, deferredClusterStateAcks.size() - 1); + toAck.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond())); + deferredClusterStateAcks.removeAll(toAck); } @Override diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index ea0b409c2b4..28d1095f629 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -13,6 +13,7 @@ import org.junit.Test; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.*; @@ -78,7 +79,7 @@ public class StateChangeTest extends FleetControllerTest { } - private List<ConfiguredNode> createNodes(int count) { + private static List<ConfiguredNode> createNodes(int count) { List<ConfiguredNode> nodes = new ArrayList<>(); for (int i = 0; i < count; i++) nodes.add(new ConfiguredNode(i, false)); @@ -1239,4 +1240,180 @@ public class StateChangeTest extends FleetControllerTest { } + private static abstract class MockTask extends RemoteClusterControllerTask { + boolean invoked = false; + + boolean isInvoked() { return invoked; } + + @Override + public boolean hasVersionAckDependency() { return true; } + } + + // We create an explicit mock task class instead of using mock() simply because of + // the utter pain that mocking void functions (doRemoteFleetControllerTask()) is + // when using Mockito. + private static class MockSynchronousTask extends MockTask { + @Override + public void doRemoteFleetControllerTask(Context ctx) { + // Trigger a state transition edge that requires a state to be published and ACKed + NodeState newNodeState = new NodeState(NodeType.STORAGE, State.MAINTENANCE); + NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + nodeInfo.setWantedState(newNodeState); + ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState); + invoked = true; + } + } + + private static class MockIdempotentSynchronousTask extends MockTask { + @Override + public void doRemoteFleetControllerTask(Context ctx) { + // Tests scheduling this task shall have ensured that node storage.0 already is DOWN, + // so this won't trigger a new state to be published + NodeState newNodeState = new NodeState(NodeType.STORAGE, State.DOWN); + NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0)); + nodeInfo.setWantedState(newNodeState); + ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState); + invoked = true; + } + } + + // TODO ideally we'd break this out so it doesn't depend on fields in the parent test instance, but + // fleet controller tests have a _lot_ of state, so risk of duplicating a lot of that... + class RemoteTaskFixture { + RemoteTaskFixture(FleetControllerOptions options) throws Exception { + initialize(options); + ctrl.tick(); + } + + MockTask scheduleTask(MockTask task) throws Exception { + ctrl.schedule(task); + ctrl.tick(); // Task processing iteration + return task; + } + + MockTask scheduleNonIdempotentVersionDependentTask() throws Exception { + return scheduleTask(new MockSynchronousTask()); + } + + MockTask scheduleIdempotentVersionDependentTask() throws Exception { + return scheduleTask(new MockIdempotentSynchronousTask()); + } + + void markStorageNodeDown(int index) throws Exception { + communicator.setNodeState(new Node(NodeType.STORAGE, index), State.DOWN, "foo"); // Auto-ACKed + ctrl.tick(); + } + + void sendPartialDeferredDistributorClusterStateAcks() throws Exception { + communicator.sendPartialDeferredDistributorClusterStateAcks(); + ctrl.tick(); + } + + void sendAllDeferredDistributorClusterStateAcks() throws Exception { + communicator.sendAllDeferredDistributorClusterStateAcks(); + ctrl.tick(); + } + } + + private static FleetControllerOptions defaultOptions() { + return new FleetControllerOptions("mycluster", createNodes(10)); + } + + private static FleetControllerOptions optionsWithZeroTransitionTime() { + FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10)); + options.maxTransitionTime.put(NodeType.STORAGE, 0); + return options; + } + + private static FleetControllerOptions optionsAllowingZeroNodesDown() { + FleetControllerOptions options = optionsWithZeroTransitionTime(); + options.minStorageNodesUp = 10; + options.minDistributorNodesUp = 10; + return options; + } + + private RemoteTaskFixture createFixtureWith(FleetControllerOptions options) throws Exception { + return new RemoteTaskFixture(options); + } + + private RemoteTaskFixture createDefaultFixture() throws Exception { + return new RemoteTaskFixture(defaultOptions()); + } + + @Test + public void synchronous_remote_task_is_completed_when_state_is_acked_by_cluster() throws Exception { + RemoteTaskFixture fixture = createDefaultFixture(); + MockTask task = fixture.scheduleNonIdempotentVersionDependentTask(); + + assertTrue(task.isInvoked()); + assertFalse(task.isCompleted()); + communicator.setShouldDeferDistributorClusterStateAcks(true); + + ctrl.tick(); // Cluster state recompute iteration. New state generated, but not ACKed by nodes + ctrl.tick(); // Ensure that we're deferring ACKs. Otherwise, this tick would process ACKs and complete tasks. + assertFalse(task.isCompleted()); + + fixture.sendPartialDeferredDistributorClusterStateAcks(); + assertFalse(task.isCompleted()); // Not yet acked by all nodes + + fixture.sendAllDeferredDistributorClusterStateAcks(); + assertTrue(task.isCompleted()); // Now finally acked by all nodes + } + + @Test + public void idempotent_synchronous_remote_task_can_complete_immediately_if_current_state_already_acked() throws Exception { + RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); + fixture.markStorageNodeDown(0); + MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state + + assertTrue(task.isInvoked()); + assertFalse(task.isCompleted()); + + ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated + ctrl.tick(); // Deferred tasks processing; should complete tasks + assertTrue(task.isCompleted()); + } + + @Test + public void idempotent_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception { + RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); + + communicator.setShouldDeferDistributorClusterStateAcks(true); + fixture.markStorageNodeDown(0); + MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state + + assertTrue(task.isInvoked()); + assertFalse(task.isCompleted()); + + ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated + ctrl.tick(); // Deferred task processing; version not satisfied yet + assertFalse(task.isCompleted()); + + fixture.sendAllDeferredDistributorClusterStateAcks(); + assertTrue(task.isCompleted()); // Now acked by all nodes + + } + + // When the cluster is down no intermediate states will be published to the nodes + // unless the state triggers a cluster Up edge. To avoid hanging task responses + // for an indeterminate amount of time in this scenario, we effectively treat + // tasks running in such a context as if they were idempotent. I.e. we only require + // the cluster down-state to have been published. + @Test + public void immediately_complete_sync_remote_task_when_cluster_is_down() throws Exception { + RemoteTaskFixture fixture = createFixtureWith(optionsAllowingZeroNodesDown()); + // Controller options require 10/10 nodes up, so take one down to trigger a cluster Down edge. + fixture.markStorageNodeDown(1); + MockTask task = fixture.scheduleNonIdempotentVersionDependentTask(); + + assertTrue(task.isInvoked()); + assertFalse(task.isCompleted()); + + ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated + ctrl.tick(); // Deferred tasks processing; should complete tasks + assertTrue(task.isCompleted()); + + + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java index 3fe489a0d5a..88483e47f4e 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/restapiv2/SetNodeStateTest.java @@ -1,14 +1,19 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.restapiv2; +import com.yahoo.vdslib.state.NodeType; +import com.yahoo.vespa.clustercontroller.core.restapiv2.requests.SetNodeStateRequest; import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.InvalidContentException; import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.MissingUnitException; import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.OperationNotSupportedForUnitException; +import com.yahoo.vespa.clustercontroller.utils.staterestapi.errors.UnknownMasterException; import com.yahoo.vespa.clustercontroller.utils.staterestapi.requests.SetUnitStateRequest; import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.SetResponse; import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.UnitResponse; import com.yahoo.vespa.clustercontroller.utils.staterestapi.response.UnitState; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.util.LinkedHashMap; import java.util.Map; @@ -17,13 +22,18 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.StringContains.containsString; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class SetNodeStateTest extends StateRestApiTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + public static class SetUnitStateRequestImpl extends StateRequest implements SetUnitStateRequest { private Map<String, UnitState> newStates = new LinkedHashMap<>(); private Condition condition = Condition.FORCE; + private ResponseWait responseWait = ResponseWait.WAIT_UNTIL_CLUSTER_ACKED; public SetUnitStateRequestImpl(String req) { super(req, 0); @@ -34,6 +44,11 @@ public class SetNodeStateTest extends StateRestApiTest { return this; } + public SetUnitStateRequestImpl setResponseWait(ResponseWait responseWait) { + this.responseWait = responseWait; + return this; + } + public SetUnitStateRequestImpl setNewState( final String type, final String state, @@ -61,6 +76,11 @@ public class SetNodeStateTest extends StateRestApiTest { public Condition getCondition() { return condition; } + + @Override + public ResponseWait getResponseWait() { + return responseWait; + } } private void verifyStateSet(String state, String reason) throws Exception { @@ -345,4 +365,40 @@ public class SetNodeStateTest extends StateRestApiTest { assertEquals(expected, jsonWriter.createJson(response).toString(2)); } + private Id.Node createDummyId() { + return new Id.Node(new Id.Service(new Id.Cluster("foo"), NodeType.STORAGE), 0); + } + + private SetNodeStateRequest createDummySetNodeStateRequest() { + return new SetNodeStateRequest(createDummyId(), new SetUnitStateRequestImpl("music/storage/1") + .setNewState("user", "maintenance", "whatever reason.")); + } + + @Test + public void set_node_state_requests_are_by_default_tagged_as_having_version_ack_dependency() { + SetNodeStateRequest request = createDummySetNodeStateRequest(); + assertTrue(request.hasVersionAckDependency()); + } + + @Test + public void set_node_state_requests_may_override_version_ack_dependency() { + SetNodeStateRequest request = new SetNodeStateRequest(createDummyId(), new SetUnitStateRequestImpl("music/storage/1") + .setNewState("user", "maintenance", "whatever reason.") + .setResponseWait(SetUnitStateRequest.ResponseWait.NO_WAIT)); + assertFalse(request.hasVersionAckDependency()); + } + + // Technically, this failure mode currently applies to all requests, but it's only really + // important to test (and expected to happen) for requests that have dependencies on cluster + // state version publishing. + @Test + public void leadership_loss_fails_set_node_state_request() throws Exception { + expectedException.expectMessage("Leadership lost before request could complete"); + expectedException.expect(UnknownMasterException.class); + + SetNodeStateRequest request = createDummySetNodeStateRequest(); + request.handleLeadershipLost(); + request.getResult(); + } + } |