diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-08 12:58:31 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:43:03 +0000 |
commit | 57b4604fc462cdc18e00bfd425a2211fac429869 (patch) | |
tree | 9797e52a497b001d89cc197b320b1deecda6b236 /clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java | |
parent | 05f27f6cfcda786232fa6da47154784dce2483e1 (diff) |
Support configurable two-phase state transitions in cluster controller
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java')
-rw-r--r-- | clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java | 65 |
1 files changed, 57 insertions, 8 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index 6d59a672e86..caa3fbcd6cd 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -68,6 +68,7 @@ public class DummyVdsNode { * Any access to this list or to its members must be synchronized on the timer variable. */ private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>(); + private int activatedClusterStateVersion = 0; private Thread messageResponder = new Thread() { public void run() { @@ -220,6 +221,12 @@ public class DummyVdsNode { } } + public int getActivatedClusterStateVersion() { + synchronized (timer) { + return activatedClusterStateVersion; + } + } + public boolean hasPendingGetNodeStateRequest() { synchronized (timer) { return !waitingRequests.isEmpty(); @@ -300,14 +307,17 @@ public class DummyVdsNode { public ClusterStateBundle getClusterStateBundle() { synchronized(timer) { - return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0)); + return clusterStateBundles.stream() + .filter(b -> b.getVersion() <= activatedClusterStateVersion) + .findFirst() // Most recent cluster state bundle first in list + .orElse(null); } } public ClusterState getClusterState() { - synchronized(timer) { - return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState()); - } + return Optional.ofNullable(getClusterStateBundle()) + .map(b -> b.getBaselineClusterState()) + .orElse(null); } public String getSlobrokName() { @@ -369,6 +379,13 @@ public class DummyVdsNode { m.paramDesc(2, "payload", "Slime format payload"); supervisor.addMethod(m); } + if (stateCommunicationVersion >= RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this, "rpc_activateClusterStateVersion"); + m.methodDesc("Activate a given cluster state version"); + m.paramDesc(0, "stateVersion", "Cluster state version to activate"); + m.returnDesc(0, "actualVersion", "Actual cluster state version on node"); + supervisor.addMethod(m); + } } public void rpc_storageConnect(Request req) { @@ -439,7 +456,7 @@ public class DummyVdsNode { } } } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } @@ -499,7 +516,7 @@ public class DummyVdsNode { req.returnValues().add(new StringValue("OK")); log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage()); e.printStackTrace(System.err); req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED)); req.returnValues().add(new StringValue(e.getMessage())); @@ -516,11 +533,14 @@ public class DummyVdsNode { synchronized(timer) { updateStartTimestamps(newState); clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); + if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + activatedClusterStateVersion = newState.getVersion(); // Simulate node that does not know of activation + } timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } @@ -536,11 +556,40 @@ public class DummyVdsNode { synchronized(timer) { updateStartTimestamps(stateBundle.getBaselineClusterState()); clusterStateBundles.add(0, stateBundle); + if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + activatedClusterStateVersion = stateBundle.getVersion(); // Simulate node that does not know of activation + } timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage()); + e.printStackTrace(System.err); + req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); + } + } + + public void rpc_activateClusterStateVersion(Request req) { + try { + if (shouldFailSetSystemStateRequests()) { + // We assume that failing setDistributionStates also implies failing version activations + req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls"); + return; + } + int activateVersion = req.parameters().get(0).asInt32(); + synchronized(timer) { + int actualVersion = getLatestSystemStateVersion().orElse(0); + req.returnValues().add(new Int32Value(actualVersion)); + if (activateVersion != actualVersion) { + req.setError(ErrorCode.METHOD_FAILED, "State version mismatch"); + } else { + activatedClusterStateVersion = activateVersion; + timer.notifyAll(); + } + } + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion); + } catch (Exception e) { + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } |