aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-08 12:58:31 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:43:03 +0000
commit57b4604fc462cdc18e00bfd425a2211fac429869 (patch)
tree9797e52a497b001d89cc197b320b1deecda6b236 /clustercontroller-core/src/test/java/com/yahoo/vespa
parent05f27f6cfcda786232fa6da47154784dce2483e1 (diff)
Support configurable two-phase state transitions in cluster controller
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa')
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java65
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java4
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java11
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java19
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java19
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java13
8 files changed, 113 insertions, 22 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index b1e7158f61c..185fdc0c7ca 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
ClusterState baselineState = stateBundle.getBaselineClusterState();
DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState);
- node.setSystemStateVersionSent(baselineState);
+ node.setClusterStateVersionBundleSent(baselineState);
req.setReply(new SetClusterStateRequest.Reply());
if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
waiter.done(req);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index 6d59a672e86..caa3fbcd6cd 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -68,6 +68,7 @@ public class DummyVdsNode {
* Any access to this list or to its members must be synchronized on the timer variable.
*/
private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>();
+ private int activatedClusterStateVersion = 0;
private Thread messageResponder = new Thread() {
public void run() {
@@ -220,6 +221,12 @@ public class DummyVdsNode {
}
}
+ public int getActivatedClusterStateVersion() {
+ synchronized (timer) {
+ return activatedClusterStateVersion;
+ }
+ }
+
public boolean hasPendingGetNodeStateRequest() {
synchronized (timer) {
return !waitingRequests.isEmpty();
@@ -300,14 +307,17 @@ public class DummyVdsNode {
public ClusterStateBundle getClusterStateBundle() {
synchronized(timer) {
- return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0));
+ return clusterStateBundles.stream()
+ .filter(b -> b.getVersion() <= activatedClusterStateVersion)
+ .findFirst() // Most recent cluster state bundle first in list
+ .orElse(null);
}
}
public ClusterState getClusterState() {
- synchronized(timer) {
- return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState());
- }
+ return Optional.ofNullable(getClusterStateBundle())
+ .map(b -> b.getBaselineClusterState())
+ .orElse(null);
}
public String getSlobrokName() {
@@ -369,6 +379,13 @@ public class DummyVdsNode {
m.paramDesc(2, "payload", "Slime format payload");
supervisor.addMethod(m);
}
+ if (stateCommunicationVersion >= RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this, "rpc_activateClusterStateVersion");
+ m.methodDesc("Activate a given cluster state version");
+ m.paramDesc(0, "stateVersion", "Cluster state version to activate");
+ m.returnDesc(0, "actualVersion", "Actual cluster state version on node");
+ supervisor.addMethod(m);
+ }
}
public void rpc_storageConnect(Request req) {
@@ -439,7 +456,7 @@ public class DummyVdsNode {
}
}
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
@@ -499,7 +516,7 @@ public class DummyVdsNode {
req.returnValues().add(new StringValue("OK"));
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage());
e.printStackTrace(System.err);
req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED));
req.returnValues().add(new StringValue(e.getMessage()));
@@ -516,11 +533,14 @@ public class DummyVdsNode {
synchronized(timer) {
updateStartTimestamps(newState);
clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
+ if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ activatedClusterStateVersion = newState.getVersion(); // Simulate node that does not know of activation
+ }
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
@@ -536,11 +556,40 @@ public class DummyVdsNode {
synchronized(timer) {
updateStartTimestamps(stateBundle.getBaselineClusterState());
clusterStateBundles.add(0, stateBundle);
+ if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ activatedClusterStateVersion = stateBundle.getVersion(); // Simulate node that does not know of activation
+ }
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage());
+ e.printStackTrace(System.err);
+ req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
+ }
+ }
+
+ public void rpc_activateClusterStateVersion(Request req) {
+ try {
+ if (shouldFailSetSystemStateRequests()) {
+ // We assume that failing setDistributionStates also implies failing version activations
+ req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls");
+ return;
+ }
+ int activateVersion = req.parameters().get(0).asInt32();
+ synchronized(timer) {
+ int actualVersion = getLatestSystemStateVersion().orElse(0);
+ req.returnValues().add(new Int32Value(actualVersion));
+ if (activateVersion != actualVersion) {
+ req.setError(ErrorCode.METHOD_FAILED, "State version mismatch");
+ } else {
+ activatedClusterStateVersion = activateVersion;
+ timer.notifyAll();
+ }
+ }
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion);
+ } catch (Exception e) {
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
index bda06248d9e..bf63aebe022 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
@@ -8,6 +8,6 @@ import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
* over regular RPC.
*/
public class DummyVdsNodeOptions {
- // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+
- public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
+ // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+, 4 - 7.24+
+ public int stateCommunicationVersion = RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION;
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
index d59dbb4933a..e9eaf56085b 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core;
import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter;
import org.junit.Test;
import java.util.ArrayList;
@@ -29,8 +30,16 @@ public class RpcVersionAutoDowngradeTest extends FleetControllerTest {
@Test
public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception {
- setUpFakeCluster(2); // HEAD is at v3
+ setUpFakeCluster(2); // HEAD is at v4
waitForState("version:\\d+ distributor:10 storage:10");
}
+ @Test
+ public void implicit_activation_for_nodes_that_return_not_found_for_version_activation_rpc() throws Exception {
+ setUpFakeCluster(3); // HEAD is at v4
+ waitForState("version:\\d+ distributor:10 storage:10");
+ }
+
+ // TODO partial version setup for simulating upgrades
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index 32de3591f2d..85106ce7e3c 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -171,6 +171,9 @@ public class StateChangeTest extends FleetControllerTest {
options.nodeStateRequestTimeoutMS = 60 * 60 * 1000;
options.minTimeBetweenNewSystemStates = 0;
options.maxInitProgressTime = 50000;
+ // This test makes very specific assumptions about the amount of work done in a single tick.
+ // Two-phase cluster state activation changes this quite a bit, so disable it. At least for now.
+ options.enableTwoPhaseClusterStateActivation = false;
initialize(options);
@@ -1035,7 +1038,7 @@ public class StateChangeTest extends FleetControllerTest {
// Assert that the failed node has not acknowledged the latest version.
// (The version may still be larger than versionBeforeChange if the fleet controller sends a
// "stable system" update without timestamps in the meantime
- assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getSystemStateVersionAcknowledged() < versionAfterChange);
+ assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getClusterStateVersionBundleAcknowledged() < versionAfterChange);
// Ensure non-concurrent access to getNewestSystemStateVersionSent
synchronized(timer) {
@@ -1343,11 +1346,19 @@ public class StateChangeTest extends FleetControllerTest {
void sendAllDeferredDistributorClusterStateAcks() throws Exception {
communicator.sendAllDeferredDistributorClusterStateAcks();
- ctrl.tick();
+ ctrl.tick(); // Process cluster state bundle ACKs
+ if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) {
+ ctrl.tick(); // Send activations
+ ctrl.tick(); // Process activation ACKs
+ }
}
void processScheduledTask() throws Exception {
ctrl.tick(); // Cluster state recompute iteration and send
+ if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) {
+ ctrl.tick(); // Send activations
+ ctrl.tick(); // Process activation ACKs
+ }
ctrl.tick(); // Iff ACKs were received, process version dependent task(s)
}
@@ -1440,7 +1451,7 @@ public class StateChangeTest extends FleetControllerTest {
@Test
public void no_op_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception {
- RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
communicator.setShouldDeferDistributorClusterStateAcks(true);
fixture.markStorageNodeDown(0);
@@ -1523,7 +1534,9 @@ public class StateChangeTest extends FleetControllerTest {
public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception {
FleetControllerOptions options = defaultOptions();
options.minTimeBetweenNewSystemStates = 10_000;
+ //options.enableTwoPhaseClusterStateActivation = false; ////
RemoteTaskFixture fixture = createFixtureWith(options);
+
// Have to increment timer here to be able to send state generated by the scheduled task
timer.advanceTime(10_000);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index aa5219147ce..75b65f5d85f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -68,7 +68,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
}
@@ -79,7 +79,7 @@ public class SystemStateBroadcasterTest {
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
// Only distributor 0 should observe startup timestamps
@@ -97,7 +97,7 @@ public class SystemStateBroadcasterTest {
StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
}
@@ -111,7 +111,7 @@ public class SystemStateBroadcasterTest {
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
// Only distributor 0 should observe startup timestamps
@@ -122,4 +122,15 @@ public class SystemStateBroadcasterTest {
StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
+
+ @Test
+ public void activation_not_sent_before_all_distributors_have_acked_state_bundle() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ // TODO
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
index 61e9d1a90de..9eb98f4f045 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
@@ -161,7 +161,7 @@ public class RPCCommunicatorTest {
waiter.handleRequestDone(req);
// This would normally be done in processResponses(), but that code path is not invoked in this test.
- cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false);
+ cf.cluster().getNodeInfo(Node.ofStorage(1)).setClusterStateBundleVersionAcknowledged(123, false);
f.receivedRequest.set(null);
// Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
index d140ef998b6..9734156b13f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
@@ -29,6 +29,7 @@ public interface WaitCondition {
abstract class StateWait implements WaitCondition {
private final Object monitor;
protected ClusterState currentState;
+ protected ClusterState convergedState;
private final SystemStateListener listener = new SystemStateListener() {
@Override
public void handleNewPublishedState(ClusterStateBundle state) {
@@ -37,6 +38,14 @@ public interface WaitCondition {
monitor.notifyAll();
}
}
+
+ @Override
+ public void handleStateConvergedInCluster(ClusterStateBundle states) {
+ synchronized (monitor) {
+ currentState = convergedState = states.getBaselineClusterState();
+ monitor.notifyAll();
+ }
+ }
};
public StateWait(FleetController fc, Object monitor) {
@@ -90,8 +99,8 @@ public interface WaitCondition {
@Override
public String isConditionMet() {
- if (currentState != null) {
- lastCheckedState = currentState;
+ if (convergedState != null) {
+ lastCheckedState = convergedState;
Matcher m = pattern.matcher(lastCheckedState.toString());
if (m.matches() || !checkSpaceSubset.isEmpty()) {
if (nodesToCheck != null) {