aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-08 12:58:31 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:43:03 +0000
commit57b4604fc462cdc18e00bfd425a2211fac429869 (patch)
tree9797e52a497b001d89cc197b320b1deecda6b236 /clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
parent05f27f6cfcda786232fa6da47154784dce2483e1 (diff)
Support configurable two-phase state transitions in cluster controller
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java')
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java65
1 files changed, 57 insertions, 8 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index 6d59a672e86..caa3fbcd6cd 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -68,6 +68,7 @@ public class DummyVdsNode {
* Any access to this list or to its members must be synchronized on the timer variable.
*/
private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>();
+ private int activatedClusterStateVersion = 0;
private Thread messageResponder = new Thread() {
public void run() {
@@ -220,6 +221,12 @@ public class DummyVdsNode {
}
}
+ public int getActivatedClusterStateVersion() {
+ synchronized (timer) {
+ return activatedClusterStateVersion;
+ }
+ }
+
public boolean hasPendingGetNodeStateRequest() {
synchronized (timer) {
return !waitingRequests.isEmpty();
@@ -300,14 +307,17 @@ public class DummyVdsNode {
public ClusterStateBundle getClusterStateBundle() {
synchronized(timer) {
- return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0));
+ return clusterStateBundles.stream()
+ .filter(b -> b.getVersion() <= activatedClusterStateVersion)
+ .findFirst() // Most recent cluster state bundle first in list
+ .orElse(null);
}
}
public ClusterState getClusterState() {
- synchronized(timer) {
- return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState());
- }
+ return Optional.ofNullable(getClusterStateBundle())
+ .map(b -> b.getBaselineClusterState())
+ .orElse(null);
}
public String getSlobrokName() {
@@ -369,6 +379,13 @@ public class DummyVdsNode {
m.paramDesc(2, "payload", "Slime format payload");
supervisor.addMethod(m);
}
+ if (stateCommunicationVersion >= RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this, "rpc_activateClusterStateVersion");
+ m.methodDesc("Activate a given cluster state version");
+ m.paramDesc(0, "stateVersion", "Cluster state version to activate");
+ m.returnDesc(0, "actualVersion", "Actual cluster state version on node");
+ supervisor.addMethod(m);
+ }
}
public void rpc_storageConnect(Request req) {
@@ -439,7 +456,7 @@ public class DummyVdsNode {
}
}
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
@@ -499,7 +516,7 @@ public class DummyVdsNode {
req.returnValues().add(new StringValue("OK"));
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage());
e.printStackTrace(System.err);
req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED));
req.returnValues().add(new StringValue(e.getMessage()));
@@ -516,11 +533,14 @@ public class DummyVdsNode {
synchronized(timer) {
updateStartTimestamps(newState);
clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
+ if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ activatedClusterStateVersion = newState.getVersion(); // Simulate node that does not know of activation
+ }
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
@@ -536,11 +556,40 @@ public class DummyVdsNode {
synchronized(timer) {
updateStartTimestamps(stateBundle.getBaselineClusterState());
clusterStateBundles.add(0, stateBundle);
+ if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ activatedClusterStateVersion = stateBundle.getVersion(); // Simulate node that does not know of activation
+ }
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage());
+ e.printStackTrace(System.err);
+ req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
+ }
+ }
+
+ public void rpc_activateClusterStateVersion(Request req) {
+ try {
+ if (shouldFailSetSystemStateRequests()) {
+ // We assume that failing setDistributionStates also implies failing version activations
+ req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls");
+ return;
+ }
+ int activateVersion = req.parameters().get(0).asInt32();
+ synchronized(timer) {
+ int actualVersion = getLatestSystemStateVersion().orElse(0);
+ req.returnValues().add(new Int32Value(actualVersion));
+ if (activateVersion != actualVersion) {
+ req.setError(ErrorCode.METHOD_FAILED, "State version mismatch");
+ } else {
+ activatedClusterStateVersion = activateVersion;
+ timer.notifyAll();
+ }
+ }
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion);
+ } catch (Exception e) {
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}