diff options
author | Tor Brede Vekterli <vekterli@oath.com> | 2018-04-20 17:04:34 +0200 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@oath.com> | 2018-04-24 14:12:54 +0200 |
commit | 2e736fdd071fc3ad72912afe96c767c7902870d1 (patch) | |
tree | 452c8e9d9f63c8007d77e536123791ff35e5c4f3 /clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java | |
parent | 6a7351dc77ad191540b61555360d20caea94c156 (diff) |
ZooKeeper-persist and load published cluster state bundles
Store synchronously upon each new versioned state, load whenever controller
is elected master. Effectively carries over visible node states from one
controller's lifetime to the next. This removes the edge case where default
bucket space content nodes are marked as in Maintainence until their global
merge status is known.
To avoid controller tripping over its own feet, state bundles are now _not_
versioned at all until the initial send time period has passed. This prevents
overwriting the state persisted from a previous controller with a transient
state where all nodes are down due to not having Slobrok contact yet.
A new cluster state recompute+send edge has been added when the master passes
its initial state send time period.
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java')
-rw-r--r-- | clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java | 51 |
1 files changed, 30 insertions, 21 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index a2da6bd1c6c..6d59a672e86 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -1,8 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; -import com.yahoo.compress.CompressionType; -import com.yahoo.compress.Compressor; import com.yahoo.jrt.*; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.slobrok.api.BackOffPolicy; @@ -10,15 +8,13 @@ import com.yahoo.jrt.slobrok.api.Register; import com.yahoo.jrt.slobrok.api.SlobrokList; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.*; -import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil; -import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; -import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; import java.util.logging.Logger; +import java.util.stream.Collectors; /** * @@ -34,6 +30,7 @@ public class DummyVdsNode { private NodeType type; private int index; private NodeState nodeState; + private String hostInfo = "{}"; private Supervisor supervisor; private Acceptor acceptor; private Register register; @@ -67,10 +64,10 @@ public class DummyVdsNode { private final List<Req> waitingRequests = new LinkedList<>(); /** - * History of received system states. + * History of received cluster states. * Any access to this list or to its members must be synchronized on the timer variable. */ - private List<ClusterState> systemState = new LinkedList<>(); + private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>(); private Thread messageResponder = new Thread() { public void run() { @@ -216,8 +213,10 @@ public class DummyVdsNode { /** Returns the latest system state version received, or empty if none are received yet. */ private Optional<Integer> getLatestSystemStateVersion() { synchronized(timer) { - if (systemState.isEmpty()) return Optional.empty(); - return Optional.of(systemState.get(0).getVersion()); + if (clusterStateBundles.isEmpty()) { + return Optional.empty(); + } + return Optional.of(clusterStateBundles.get(0).getVersion()); } } @@ -260,7 +259,7 @@ public class DummyVdsNode { log.log(LogLevel.DEBUG, "Dummy node " + this + " answering pending node state request."); req.request.returnValues().add(new StringValue(nodeState.serialize())); if (req.request.methodName().equals("getnodestate3")) { - req.request.returnValues().add(new StringValue("Dummy node host info")); + req.request.returnValues().add(new StringValue(hostInfo)); } req.request.returnRequest(); ++setNodeStateReplies; @@ -268,14 +267,19 @@ public class DummyVdsNode { waitingRequests.clear(); } - public void setNodeState(NodeState state) { + public void setNodeState(NodeState state, String hostInfo) { log.log(LogLevel.DEBUG, "Dummy node " + this + " got new state: " + state); synchronized(timer) { this.nodeState = state; + this.hostInfo = hostInfo; replyToPendingNodeStateRequests(); } } + public void setNodeState(NodeState state) { + setNodeState(state, "{}"); + } + public void setNodeState(State state) { setNodeState(new NodeState(type, state)); } @@ -287,16 +291,22 @@ public class DummyVdsNode { } public List<ClusterState> getSystemStatesReceived() { - List<ClusterState> states = new ArrayList<>(); synchronized(timer) { - states.addAll(systemState); + return clusterStateBundles.stream() + .map(ClusterStateBundle::getBaselineClusterState) + .collect(Collectors.toList()); + } + } + + public ClusterStateBundle getClusterStateBundle() { + synchronized(timer) { + return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0)); } - return states; } public ClusterState getClusterState() { synchronized(timer) { - return (systemState.isEmpty() ? null : systemState.get(0)); + return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState()); } } @@ -482,7 +492,7 @@ public class DummyVdsNode { ClusterState newState = new ClusterState(req.parameters().get(0).asString()); synchronized(timer) { updateStartTimestamps(newState); - systemState.add(0, newState); + clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); timer.notifyAll(); } req.returnValues().add(new Int32Value(1)); @@ -505,7 +515,7 @@ public class DummyVdsNode { ClusterState newState = new ClusterState(req.parameters().get(0).asString()); synchronized(timer) { updateStartTimestamps(newState); - systemState.add(0, newState); + clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState); @@ -523,13 +533,12 @@ public class DummyVdsNode { return; } ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req); - ClusterState newState = stateBundle.getBaselineClusterState(); synchronized(timer) { - updateStartTimestamps(newState); - systemState.add(0, newState); + updateStartTimestamps(stateBundle.getBaselineClusterState()); + clusterStateBundles.add(0, stateBundle); timer.notifyAll(); } - log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState); + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle); } catch (Exception e) { log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); e.printStackTrace(System.err); |