diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-08 12:58:31 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:43:03 +0000 |
commit | 57b4604fc462cdc18e00bfd425a2211fac429869 (patch) | |
tree | 9797e52a497b001d89cc197b320b1deecda6b236 /clustercontroller-core/src/test/java/com/yahoo/vespa | |
parent | 05f27f6cfcda786232fa6da47154784dce2483e1 (diff) |
Support configurable two-phase state transitions in cluster controller
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa')
8 files changed, 113 insertions, 22 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index b1e7158f61c..185fdc0c7ca 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) { ClusterState baselineState = stateBundle.getBaselineClusterState(); DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState); - node.setSystemStateVersionSent(baselineState); + node.setClusterStateVersionBundleSent(baselineState); req.setReply(new SetClusterStateRequest.Reply()); if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) { waiter.done(req); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index 6d59a672e86..caa3fbcd6cd 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -68,6 +68,7 @@ public class DummyVdsNode { * Any access to this list or to its members must be synchronized on the timer variable. */ private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>(); + private int activatedClusterStateVersion = 0; private Thread messageResponder = new Thread() { public void run() { @@ -220,6 +221,12 @@ public class DummyVdsNode { } } + public int getActivatedClusterStateVersion() { + synchronized (timer) { + return activatedClusterStateVersion; + } + } + public boolean hasPendingGetNodeStateRequest() { synchronized (timer) { return !waitingRequests.isEmpty(); @@ -300,14 +307,17 @@ public class DummyVdsNode { public ClusterStateBundle getClusterStateBundle() { synchronized(timer) { - return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0)); + return clusterStateBundles.stream() + .filter(b -> b.getVersion() <= activatedClusterStateVersion) + .findFirst() // Most recent cluster state bundle first in list + .orElse(null); } } public ClusterState getClusterState() { - synchronized(timer) { - return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState()); - } + return Optional.ofNullable(getClusterStateBundle()) + .map(b -> b.getBaselineClusterState()) + .orElse(null); } public String getSlobrokName() { @@ -369,6 +379,13 @@ public class DummyVdsNode { m.paramDesc(2, "payload", "Slime format payload"); supervisor.addMethod(m); } + if (stateCommunicationVersion >= RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this, "rpc_activateClusterStateVersion"); + m.methodDesc("Activate a given cluster state version"); + m.paramDesc(0, "stateVersion", "Cluster state version to activate"); + m.returnDesc(0, "actualVersion", "Actual cluster state version on node"); + supervisor.addMethod(m); + } } public void rpc_storageConnect(Request req) { @@ -439,7 +456,7 @@ public class DummyVdsNode { } } } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } @@ -499,7 +516,7 @@ public class DummyVdsNode { req.returnValues().add(new StringValue("OK")); log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage()); e.printStackTrace(System.err); req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED)); req.returnValues().add(new StringValue(e.getMessage())); @@ -516,11 +533,14 @@ public class DummyVdsNode { synchronized(timer) { updateStartTimestamps(newState); clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); + if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + activatedClusterStateVersion = newState.getVersion(); // Simulate node that does not know of activation + } timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } @@ -536,11 +556,40 @@ public class DummyVdsNode { synchronized(timer) { updateStartTimestamps(stateBundle.getBaselineClusterState()); clusterStateBundles.add(0, stateBundle); + if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + activatedClusterStateVersion = stateBundle.getVersion(); // Simulate node that does not know of activation + } timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage()); + e.printStackTrace(System.err); + req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); + } + } + + public void rpc_activateClusterStateVersion(Request req) { + try { + if (shouldFailSetSystemStateRequests()) { + // We assume that failing setDistributionStates also implies failing version activations + req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls"); + return; + } + int activateVersion = req.parameters().get(0).asInt32(); + synchronized(timer) { + int actualVersion = getLatestSystemStateVersion().orElse(0); + req.returnValues().add(new Int32Value(actualVersion)); + if (activateVersion != actualVersion) { + req.setError(ErrorCode.METHOD_FAILED, "State version mismatch"); + } else { + activatedClusterStateVersion = activateVersion; + timer.notifyAll(); + } + } + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion); + } catch (Exception e) { + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java index bda06248d9e..bf63aebe022 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java @@ -8,6 +8,6 @@ import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; * over regular RPC. */ public class DummyVdsNodeOptions { - // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+ - public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION; + // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+, 4 - 7.24+ + public int stateCommunicationVersion = RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java index d59dbb4933a..e9eaf56085b 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core; import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; +import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter; import org.junit.Test; import java.util.ArrayList; @@ -29,8 +30,16 @@ public class RpcVersionAutoDowngradeTest extends FleetControllerTest { @Test public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception { - setUpFakeCluster(2); // HEAD is at v3 + setUpFakeCluster(2); // HEAD is at v4 waitForState("version:\\d+ distributor:10 storage:10"); } + @Test + public void implicit_activation_for_nodes_that_return_not_found_for_version_activation_rpc() throws Exception { + setUpFakeCluster(3); // HEAD is at v4 + waitForState("version:\\d+ distributor:10 storage:10"); + } + + // TODO partial version setup for simulating upgrades + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index 32de3591f2d..85106ce7e3c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -171,6 +171,9 @@ public class StateChangeTest extends FleetControllerTest { options.nodeStateRequestTimeoutMS = 60 * 60 * 1000; options.minTimeBetweenNewSystemStates = 0; options.maxInitProgressTime = 50000; + // This test makes very specific assumptions about the amount of work done in a single tick. + // Two-phase cluster state activation changes this quite a bit, so disable it. At least for now. + options.enableTwoPhaseClusterStateActivation = false; initialize(options); @@ -1035,7 +1038,7 @@ public class StateChangeTest extends FleetControllerTest { // Assert that the failed node has not acknowledged the latest version. // (The version may still be larger than versionBeforeChange if the fleet controller sends a // "stable system" update without timestamps in the meantime - assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getSystemStateVersionAcknowledged() < versionAfterChange); + assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getClusterStateVersionBundleAcknowledged() < versionAfterChange); // Ensure non-concurrent access to getNewestSystemStateVersionSent synchronized(timer) { @@ -1343,11 +1346,19 @@ public class StateChangeTest extends FleetControllerTest { void sendAllDeferredDistributorClusterStateAcks() throws Exception { communicator.sendAllDeferredDistributorClusterStateAcks(); - ctrl.tick(); + ctrl.tick(); // Process cluster state bundle ACKs + if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) { + ctrl.tick(); // Send activations + ctrl.tick(); // Process activation ACKs + } } void processScheduledTask() throws Exception { ctrl.tick(); // Cluster state recompute iteration and send + if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) { + ctrl.tick(); // Send activations + ctrl.tick(); // Process activation ACKs + } ctrl.tick(); // Iff ACKs were received, process version dependent task(s) } @@ -1440,7 +1451,7 @@ public class StateChangeTest extends FleetControllerTest { @Test public void no_op_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception { - RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); + RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); communicator.setShouldDeferDistributorClusterStateAcks(true); fixture.markStorageNodeDown(0); @@ -1523,7 +1534,9 @@ public class StateChangeTest extends FleetControllerTest { public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception { FleetControllerOptions options = defaultOptions(); options.minTimeBetweenNewSystemStates = 10_000; + //options.enableTwoPhaseClusterStateActivation = false; //// RemoteTaskFixture fixture = createFixtureWith(options); + // Have to increment timer here to be able to send state generated by the scheduled task timer.advanceTime(10_000); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index aa5219147ce..75b65f5d85f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -68,7 +68,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); } @@ -79,7 +79,7 @@ public class SystemStateBroadcasterTest { ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { // Only distributor 0 should observe startup timestamps @@ -97,7 +97,7 @@ public class SystemStateBroadcasterTest { StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); } @@ -111,7 +111,7 @@ public class SystemStateBroadcasterTest { ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { // Only distributor 0 should observe startup timestamps @@ -122,4 +122,15 @@ public class SystemStateBroadcasterTest { StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } + + @Test + public void activation_not_sent_before_all_distributors_have_acked_state_bundle() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + + // TODO + } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index 61e9d1a90de..9eb98f4f045 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -161,7 +161,7 @@ public class RPCCommunicatorTest { waiter.handleRequestDone(req); // This would normally be done in processResponses(), but that code path is not invoked in this test. - cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false); + cf.cluster().getNodeInfo(Node.ofStorage(1)).setClusterStateBundleVersionAcknowledged(123, false); f.receivedRequest.set(null); // Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java index d140ef998b6..9734156b13f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java @@ -29,6 +29,7 @@ public interface WaitCondition { abstract class StateWait implements WaitCondition { private final Object monitor; protected ClusterState currentState; + protected ClusterState convergedState; private final SystemStateListener listener = new SystemStateListener() { @Override public void handleNewPublishedState(ClusterStateBundle state) { @@ -37,6 +38,14 @@ public interface WaitCondition { monitor.notifyAll(); } } + + @Override + public void handleStateConvergedInCluster(ClusterStateBundle states) { + synchronized (monitor) { + currentState = convergedState = states.getBaselineClusterState(); + monitor.notifyAll(); + } + } }; public StateWait(FleetController fc, Object monitor) { @@ -90,8 +99,8 @@ public interface WaitCondition { @Override public String isConditionMet() { - if (currentState != null) { - lastCheckedState = currentState; + if (convergedState != null) { + lastCheckedState = convergedState; Matcher m = pattern.matcher(lastCheckedState.toString()); if (m.matches() || !checkSpaceSubset.isEmpty()) { if (nodesToCheck != null) { |