aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-11 14:24:51 +0200
committerTor Brede Vekterli <vekterli@yahoo-inc.com>2017-09-11 14:25:18 +0200
commita4cb51b28f43420db61a2459737c7585a227ee54 (patch)
treea83d2df85feae5bb748553feee42206a49d69173 /clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
parent0c8f9bdd5a8d6ce35e1a8adba174c51dc8583a92 (diff)
Add support for version ACK-dependent tasks in cluster controller
Used to enable synchronous operation for set-node-state calls, which ensure that side-effects of the call are visible when the response returns. If controller leadership is lost before state is published, tasks will be failed back to the client.
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java')
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java179
1 files changed, 178 insertions, 1 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index ea0b409c2b4..28d1095f629 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -13,6 +13,7 @@ import org.junit.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.*;
@@ -78,7 +79,7 @@ public class StateChangeTest extends FleetControllerTest {
}
- private List<ConfiguredNode> createNodes(int count) {
+ private static List<ConfiguredNode> createNodes(int count) {
List<ConfiguredNode> nodes = new ArrayList<>();
for (int i = 0; i < count; i++)
nodes.add(new ConfiguredNode(i, false));
@@ -1239,4 +1240,180 @@ public class StateChangeTest extends FleetControllerTest {
}
+ private static abstract class MockTask extends RemoteClusterControllerTask {
+ boolean invoked = false;
+
+ boolean isInvoked() { return invoked; }
+
+ @Override
+ public boolean hasVersionAckDependency() { return true; }
+ }
+
+ // We create an explicit mock task class instead of using mock() simply because of
+ // the utter pain that mocking void functions (doRemoteFleetControllerTask()) is
+ // when using Mockito.
+ private static class MockSynchronousTask extends MockTask {
+ @Override
+ public void doRemoteFleetControllerTask(Context ctx) {
+ // Trigger a state transition edge that requires a state to be published and ACKed
+ NodeState newNodeState = new NodeState(NodeType.STORAGE, State.MAINTENANCE);
+ NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0));
+ nodeInfo.setWantedState(newNodeState);
+ ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState);
+ invoked = true;
+ }
+ }
+
+ private static class MockIdempotentSynchronousTask extends MockTask {
+ @Override
+ public void doRemoteFleetControllerTask(Context ctx) {
+ // Tests scheduling this task shall have ensured that node storage.0 already is DOWN,
+ // so this won't trigger a new state to be published
+ NodeState newNodeState = new NodeState(NodeType.STORAGE, State.DOWN);
+ NodeInfo nodeInfo = ctx.cluster.getNodeInfo(new Node(NodeType.STORAGE, 0));
+ nodeInfo.setWantedState(newNodeState);
+ ctx.nodeStateOrHostInfoChangeHandler.handleNewWantedNodeState(nodeInfo, newNodeState);
+ invoked = true;
+ }
+ }
+
+ // TODO ideally we'd break this out so it doesn't depend on fields in the parent test instance, but
+ // fleet controller tests have a _lot_ of state, so risk of duplicating a lot of that...
+ class RemoteTaskFixture {
+ RemoteTaskFixture(FleetControllerOptions options) throws Exception {
+ initialize(options);
+ ctrl.tick();
+ }
+
+ MockTask scheduleTask(MockTask task) throws Exception {
+ ctrl.schedule(task);
+ ctrl.tick(); // Task processing iteration
+ return task;
+ }
+
+ MockTask scheduleNonIdempotentVersionDependentTask() throws Exception {
+ return scheduleTask(new MockSynchronousTask());
+ }
+
+ MockTask scheduleIdempotentVersionDependentTask() throws Exception {
+ return scheduleTask(new MockIdempotentSynchronousTask());
+ }
+
+ void markStorageNodeDown(int index) throws Exception {
+ communicator.setNodeState(new Node(NodeType.STORAGE, index), State.DOWN, "foo"); // Auto-ACKed
+ ctrl.tick();
+ }
+
+ void sendPartialDeferredDistributorClusterStateAcks() throws Exception {
+ communicator.sendPartialDeferredDistributorClusterStateAcks();
+ ctrl.tick();
+ }
+
+ void sendAllDeferredDistributorClusterStateAcks() throws Exception {
+ communicator.sendAllDeferredDistributorClusterStateAcks();
+ ctrl.tick();
+ }
+ }
+
+ private static FleetControllerOptions defaultOptions() {
+ return new FleetControllerOptions("mycluster", createNodes(10));
+ }
+
+ private static FleetControllerOptions optionsWithZeroTransitionTime() {
+ FleetControllerOptions options = new FleetControllerOptions("mycluster", createNodes(10));
+ options.maxTransitionTime.put(NodeType.STORAGE, 0);
+ return options;
+ }
+
+ private static FleetControllerOptions optionsAllowingZeroNodesDown() {
+ FleetControllerOptions options = optionsWithZeroTransitionTime();
+ options.minStorageNodesUp = 10;
+ options.minDistributorNodesUp = 10;
+ return options;
+ }
+
+ private RemoteTaskFixture createFixtureWith(FleetControllerOptions options) throws Exception {
+ return new RemoteTaskFixture(options);
+ }
+
+ private RemoteTaskFixture createDefaultFixture() throws Exception {
+ return new RemoteTaskFixture(defaultOptions());
+ }
+
+ @Test
+ public void synchronous_remote_task_is_completed_when_state_is_acked_by_cluster() throws Exception {
+ RemoteTaskFixture fixture = createDefaultFixture();
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+
+ ctrl.tick(); // Cluster state recompute iteration. New state generated, but not ACKed by nodes
+ ctrl.tick(); // Ensure that we're deferring ACKs. Otherwise, this tick would process ACKs and complete tasks.
+ assertFalse(task.isCompleted());
+
+ fixture.sendPartialDeferredDistributorClusterStateAcks();
+ assertFalse(task.isCompleted()); // Not yet acked by all nodes
+
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted()); // Now finally acked by all nodes
+ }
+
+ @Test
+ public void idempotent_synchronous_remote_task_can_complete_immediately_if_current_state_already_acked() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+ fixture.markStorageNodeDown(0);
+ MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred tasks processing; should complete tasks
+ assertTrue(task.isCompleted());
+ }
+
+ @Test
+ public void idempotent_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+
+ communicator.setShouldDeferDistributorClusterStateAcks(true);
+ fixture.markStorageNodeDown(0);
+ MockTask task = fixture.scheduleIdempotentVersionDependentTask(); // Tries to set node 0 into Down; already in that state
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred task processing; version not satisfied yet
+ assertFalse(task.isCompleted());
+
+ fixture.sendAllDeferredDistributorClusterStateAcks();
+ assertTrue(task.isCompleted()); // Now acked by all nodes
+
+ }
+
+ // When the cluster is down no intermediate states will be published to the nodes
+ // unless the state triggers a cluster Up edge. To avoid hanging task responses
+ // for an indeterminate amount of time in this scenario, we effectively treat
+ // tasks running in such a context as if they were idempotent. I.e. we only require
+ // the cluster down-state to have been published.
+ @Test
+ public void immediately_complete_sync_remote_task_when_cluster_is_down() throws Exception {
+ RemoteTaskFixture fixture = createFixtureWith(optionsAllowingZeroNodesDown());
+ // Controller options require 10/10 nodes up, so take one down to trigger a cluster Down edge.
+ fixture.markStorageNodeDown(1);
+ MockTask task = fixture.scheduleNonIdempotentVersionDependentTask();
+
+ assertTrue(task.isInvoked());
+ assertFalse(task.isCompleted());
+
+ ctrl.tick(); // Cluster state recompute iteration. New state _not_ generated
+ ctrl.tick(); // Deferred tasks processing; should complete tasks
+ assertTrue(task.isCompleted());
+
+
+ }
+
}