aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-08 12:58:31 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:43:03 +0000
commit57b4604fc462cdc18e00bfd425a2211fac429869 (patch)
tree9797e52a497b001d89cc197b320b1deecda6b236 /clustercontroller-core
parent05f27f6cfcda786232fa6da47154784dce2483e1 (diff)
Support configurable two-phase state transitions in cluster controller
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java23
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java80
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java215
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java8
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java10
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java10
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java6
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java65
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java4
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java11
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java19
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java19
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java13
16 files changed, 382 insertions, 108 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 9d833f366e5..a653ace265b 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -76,6 +76,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
private boolean waitingForCycle = false;
private StatusPageServer.PatternRequestRouter statusRequestRouter = new StatusPageServer.PatternRequestRouter();
private final List<ClusterStateBundle> newStates = new ArrayList<>();
+ private final List<ClusterStateBundle> convergedStates = new ArrayList<>();
private long configGeneration = -1;
private long nextConfigGeneration = -1;
private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>();
@@ -253,6 +254,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
throw new NullPointerException("Cluster state should never be null at this point");
}
listener.handleNewPublishedState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state)));
+ ClusterStateBundle convergedState = systemStateBroadcaster.getLastClusterStateBundleConverged();
+ if (convergedState != null) {
+ listener.handleStateConvergedInCluster(convergedState);
+ }
}
public FleetControllerOptions getOptions() {
@@ -438,6 +443,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
ClusterState currentState = stateVersionTracker.getVersionedClusterState();
log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion()));
stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context);
+ convergedStates.add(stateVersionTracker.getVersionedClusterStateBundle()); // FIXME ugh, going via version tracker?
}
private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) {
@@ -666,12 +672,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
// Reset timer to only see warning once.
firstAllowedStateBroadcast = currentTime;
}
- sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator);
+ // FIXME bad interaction with activation..!
+ sentAny = systemStateBroadcaster.broadcastNewStateBundleIfRequired(databaseContext, communicator);
if (sentAny) {
// FIXME won't this inhibit resending to unresponsive nodes?
nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates;
}
}
+ // Always allow activations if we've already broadcasted a state
+ sentAny |= systemStateBroadcaster.broadcastStateActivationsIfRequired(databaseContext, communicator);
return sentAny;
}
@@ -686,6 +695,16 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
newStates.clear();
}
}
+ if ( ! convergedStates.isEmpty()) {
+ synchronized (systemStateListeners) {
+ for (ClusterStateBundle stateBundle : convergedStates) {
+ for(SystemStateListener listener : systemStateListeners) {
+ listener.handleStateConvergedInCluster(stateBundle);
+ }
+ }
+ convergedStates.clear();
+ }
+ }
}
private boolean processNextQueuedRemoteTask() {
@@ -1047,7 +1066,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
while (true) {
int ackedNodes = 0;
for (NodeInfo node : cluster.getNodeInfo()) {
- if (node.getSystemStateVersionAcknowledged() >= version) {
+ if (node.getClusterStateVersionBundleAcknowledged() >= version) {
++ackedNodes;
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
index e069dde1901..d98cc6473e7 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java
@@ -121,6 +121,8 @@ public class FleetControllerOptions implements Cloneable {
public boolean clusterHasGlobalDocumentTypes = false;
+ public boolean enableTwoPhaseClusterStateActivation = true;
+
// TODO: Choose a default value
public double minMergeCompletionRatio = 1.0;
@@ -231,6 +233,7 @@ public class FleetControllerOptions implements Cloneable {
sb.append("<tr><td><nobr>Wanted distribution bits</nobr></td><td align=\"right\">").append(distributionBits).append("</td></tr>");
sb.append("<tr><td><nobr>Max deferred task version wait time</nobr></td><td align=\"right\">").append(maxDeferredTaskVersionWaitTime.toMillis()).append("ms</td></tr>");
sb.append("<tr><td><nobr>Cluster has global document types configured</nobr></td><td align=\"right\">").append(clusterHasGlobalDocumentTypes).append("</td></tr>");
+ sb.append("<tr><td><nobr>Enable 2-phase cluster state activation protocol</nobr></td><td align=\"right\">").append(enableTwoPhaseClusterStateActivation).append("</td></tr>");
sb.append("</table>");
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 54cf2dad00a..58e2fa14d4f 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -14,7 +14,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.TreeMap;
import java.util.logging.Logger;
@@ -74,11 +73,16 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
* Version 1 is for the getnodestate2 command ((legacy, not supported).
* Version 2 is for the getnodestate3 command
* Version 3 adds support for setdistributionstates
+ * Version 4 adds support for explicit cluster state version bundle activation
*/
private int version;
- private Map<Integer, ClusterState> systemStateVersionSent = new TreeMap<>();
- private ClusterState systemStateVersionAcknowledged;
+ // Mapping of cluster state version -> cluster state instance
+ private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>();
+ private ClusterState clusterStateVersionBundleAcknowledged;
+
+ private int clusterStateVersionActivationSent = -1;
+ private int clusterStateVersionActivationAcked = -1;
/**
* When a node goes from an up state to a down state, update this flag with the start timestamp the node had before going down.
* The cluster state broadcaster will use this to identify whether distributors have restarted.
@@ -102,7 +106,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
// NOTE: See update(node) below
NodeInfo(ContentCluster cluster, Node n, boolean configuredRetired, String rpcAddress, Distribution distribution) {
- if (cluster == null) throw new IllegalArgumentException("Cluster not set");
+ if (cluster == null) {
+ throw new IllegalArgumentException("Cluster not set");
+ }
reportedState = new NodeState(n.getType(), State.DOWN);
wantedState = new NodeState(n.getType(), State.UP);
this.cluster = cluster;
@@ -238,7 +244,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public ContentCluster getCluster() { return cluster; }
- /** Returns true if the node is currentl registered in slobrok */
+ /** Returns true if the node is currently registered in slobrok */
// FIXME why is this called "isRpcAddressOutdated" then???
public boolean isRpcAddressOutdated() { return lastSeenInSlobrok != null; }
@@ -353,12 +359,13 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
/** Sets the wanted state. The wanted state is taken as UP if a null argument is given */
public void setWantedState(NodeState state) {
- if (state == null)
+ if (state == null) {
state = new NodeState(node.getType(), State.UP);
+ }
NodeState newWanted = new NodeState(node.getType(), state.getState());
newWanted.setDescription(state.getDescription());
if (!newWanted.equals(state)) {
- try{
+ try {
throw new Exception();
} catch (Exception e) {
StringWriter sw = new StringWriter();
@@ -408,40 +415,40 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
}
public int getVersion() { return version; }
- public int getConnectionVersion() { return connectionVersion; }
- public void setConnectionVersion(int version) { connectionVersion = version; }
public ClusterState getNewestSystemStateSent() {
- ClusterState last = null;
- for (ClusterState s : systemStateVersionSent.values()) {
- if (last == null || last.getVersion() < s.getVersion()) {
- last = s;
- }
+ if (clusterStateVersionBundleSent.isEmpty()) {
+ return null;
}
- return last;
+ return clusterStateVersionBundleSent.lastEntry().getValue();
}
public int getNewestSystemStateVersionSent() {
ClusterState last = getNewestSystemStateSent();
return last == null ? -1 : last.getVersion();
}
- public int getSystemStateVersionAcknowledged() {
- return (systemStateVersionAcknowledged == null ? -1 : systemStateVersionAcknowledged.getVersion());
+
+ public int getClusterStateVersionBundleAcknowledged() {
+ return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion());
}
- public void setSystemStateVersionSent(ClusterState state) {
- if (state == null) throw new Error("Should not clear info for last version sent");
- if (systemStateVersionSent.containsKey(state.getVersion())) {
+ public void setClusterStateVersionBundleSent(ClusterState state) {
+ if (state == null) {
+ throw new Error("Should not clear info for last version sent");
+ }
+ if (clusterStateVersionBundleSent.containsKey(state.getVersion())) {
throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node);
}
- systemStateVersionSent.put(state.getVersion(), state);
+ clusterStateVersionBundleSent.put(state.getVersion(), state);
}
- public void setSystemStateVersionAcknowledged(Integer version, boolean success) {
- if (version == null) throw new Error("Should not clear info for last version acked");
- if (!systemStateVersionSent.containsKey(version)) {
+ public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) {
+ if (version == null) {
+ throw new Error("Should not clear info for last version acked");
+ }
+ if (!clusterStateVersionBundleSent.containsKey(version)) {
throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node);
}
- ClusterState state = systemStateVersionSent.remove(version);
- if (success && (systemStateVersionAcknowledged == null || systemStateVersionAcknowledged.getVersion() < state.getVersion())) {
- systemStateVersionAcknowledged = state;
+ ClusterState state = clusterStateVersionBundleSent.remove(version);
+ if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) {
+ clusterStateVersionBundleAcknowledged = state;
if (wentDownWithStartTime != 0
&& (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion())
&& !state.getNodeState(node).getState().oneOf("dsm"))
@@ -452,6 +459,25 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
}
}
+ public void setClusterStateVersionActivationSent(int version) {
+ clusterStateVersionActivationSent = version;
+ }
+ public int getClusterStateVersionActivationSent() {
+ return clusterStateVersionActivationSent;
+ }
+
+ public int getClusterStateVersionActivationAcked() {
+ return clusterStateVersionActivationAcked;
+ }
+ public void setSystemStateVersionActivationAcked(Integer version, boolean success) {
+ if (success && (version > clusterStateVersionActivationAcked)) {
+ clusterStateVersionActivationAcked = version;
+ } else if (!success) {
+ clusterStateVersionActivationSent = -1; // Trigger resend
+ }
+ }
+
+
public void setHostInfo(HostInfo hostInfo) {
// Note: This will blank out any hostInfo we already had, if the parsing fails.
// This is intentional, to make sure we're never left with stale data.
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index a40a45fd48a..e4a9f62b054 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -1,6 +1,7 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.jrt.ErrorCode;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler;
@@ -20,12 +21,17 @@ public class SystemStateBroadcaster {
private final Object monitor;
private ClusterStateBundle clusterStateBundle;
private final List<SetClusterStateRequest> setClusterStateReplies = new LinkedList<>();
+ private final List<ActivateClusterStateVersionRequest> activateClusterStateVersionReplies = new LinkedList<>();
private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000;
private final Map<Node, Long> lastErrorReported = new TreeMap<>();
- private int lastClusterStateInSync = 0;
+
+ private int lastStateVersionBundleAcked = 0;
+ private int lastClusterStateVersionConverged = 0;
+ private ClusterStateBundle lastClusterStateBundleConverged;
private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter();
+ private final ActivateClusterStateVersionWaiter activateClusterStateVersionWaiter = new ActivateClusterStateVersionWaiter();
public SystemStateBroadcaster(Timer timer, Object monitor) {
this.timer = timer;
@@ -52,6 +58,10 @@ public class SystemStateBroadcaster {
return clusterStateBundle;
}
+ public ClusterStateBundle getLastClusterStateBundleConverged() {
+ return lastClusterStateBundleConverged;
+ }
+
private void reportNodeError(boolean nodeOk, NodeInfo info, String message) {
long time = timer.getCurrentTimeInMillis();
Long lastReported = lastErrorReported.get(info.getNode());
@@ -65,37 +75,62 @@ public class SystemStateBroadcaster {
public boolean processResponses() {
boolean anyResponsesFound = false;
synchronized(monitor) {
- for (SetClusterStateRequest req : setClusterStateReplies) {
- anyResponsesFound = true;
-
- NodeInfo info = req.getNodeInfo();
- boolean nodeOk = info.getReportedState().getState().oneOf("uir");
- int version = req.getClusterStateVersion();
-
- if (req.getReply().isError()) {
- info.setSystemStateVersionAcknowledged(version, false);
- if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
- if (info.getNewestSystemStateVersionSent() == version) {
- reportNodeError(nodeOk, info,
- "Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage()
- + " from " + info + " setsystemstate request.");
- }
- }
+ anyResponsesFound = !setClusterStateReplies.isEmpty() || !activateClusterStateVersionReplies.isEmpty();
+ processSetClusterStateResponses();
+ processActivateClusterStateVersionResponses();
+ }
+ return anyResponsesFound;
+ }
+
+ private void processActivateClusterStateVersionResponses() {
+ for (var req : activateClusterStateVersionReplies) {
+ NodeInfo info = req.getNodeInfo();
+ int version = req.getClusterStateVersion();
+ boolean success = true;
+ if (req.getReply().isError()) {
+ // NO_SUCH_METHOD implies node is on a version that does not understand explicit activations
+ // and it has already merrily started using the state version. Treat as if it had been ACKed.
+ if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
+ log.log(LogLevel.INFO, () -> String.format("Activation NACK for node %s with version %d, message %s",
+ info, version, req.getReply().getReturnMessage())); // TODO log level
+ success = false;
} else {
- info.setSystemStateVersionAcknowledged(version, true);
- log.log(LogLevel.DEBUG, "Node " + info + " acked system state version " + version + ".");
- lastErrorReported.remove(info.getNode());
+ log.log(LogLevel.INFO, () -> String.format("Node %s did not understand state activation RPC; " +
+ "implicitly treating state %d as activated on node", info, version)); // TODO log level
}
}
- setClusterStateReplies.clear();
+ info.setSystemStateVersionActivationAcked(version, success);
+ // TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported
+ // as part of processSetClusterStateResponses anyway, but can add it here as well if deemed necessary.
}
- return anyResponsesFound;
+ activateClusterStateVersionReplies.clear();
}
- private boolean nodeNeedsClusterState(NodeInfo node) {
- if (node.getSystemStateVersionAcknowledged() == clusterStateBundle.getVersion()) {
- return false; // No point in sending if node already has updated system state
+ private void processSetClusterStateResponses() {
+ for (SetClusterStateRequest req : setClusterStateReplies) {
+ NodeInfo info = req.getNodeInfo();
+ int version = req.getClusterStateVersion();
+
+ if (req.getReply().isError()) {
+ info.setClusterStateBundleVersionAcknowledged(version, false);
+ if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
+ if (info.getNewestSystemStateVersionSent() == version) {
+ boolean nodeOk = info.getReportedState().getState().oneOf("uir");
+ reportNodeError(nodeOk, info,
+ String.format("Got error response %d: %s from %s setdistributionstates request.",
+ req.getReply().getReturnCode(), req.getReply().getReturnMessage(), info));
+ }
+ }
+ } else {
+ info.setClusterStateBundleVersionAcknowledged(version, true);
+ log.log(LogLevel.DEBUG, () -> String.format("Node %s ACKed system state version %d.", info, version));
+ lastErrorReported.remove(info.getNode());
+ }
}
+ setClusterStateReplies.clear();
+ }
+
+ private static boolean nodeIsReachable(NodeInfo node) {
if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) {
return false; // Can't set state on nodes we don't know where are
}
@@ -108,41 +143,107 @@ public class SystemStateBroadcaster {
return true;
}
+ private boolean nodeNeedsClusterStateBundle(NodeInfo node) {
+ if (node.getClusterStateVersionBundleAcknowledged() == clusterStateBundle.getVersion()) {
+ return false; // No point in sending if node already has updated system state
+ }
+ return nodeIsReachable(node);
+ }
+
+ private boolean nodeNeedsClusterStateActivation(NodeInfo node) {
+ if (node.getClusterStateVersionActivationAcked() == clusterStateBundle.getVersion()) {
+ return false; // No point in sending if node already has activated cluster state version
+ }
+ return nodeIsReachable(node);
+ }
+
private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) {
return dbContext.getCluster().getNodeInfo().stream()
- .filter(this::nodeNeedsClusterState)
- .filter(node -> !newestStateAlreadySentToNode(node))
+ .filter(this::nodeNeedsClusterStateBundle)
+ .filter(node -> !newestStateBundleAlreadySentToNode(node))
.collect(Collectors.toList());
}
- private boolean newestStateAlreadySentToNode(NodeInfo node) {
+ // Precondition: no nodes in the cluster need to receive the current cluster state version bundle
+ private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.Context dbContext) {
+ return dbContext.getCluster().getNodeInfo().stream()
+ .filter(this::nodeNeedsClusterStateActivation)
+ .filter(node -> !newestStateActivationAlreadySentToNode(node))
+ .collect(Collectors.toList());
+ }
+
+ private boolean newestStateBundleAlreadySentToNode(NodeInfo node) {
return (node.getNewestSystemStateVersionSent() == clusterStateBundle.getVersion());
}
+ private boolean newestStateActivationAlreadySentToNode(NodeInfo node) {
+ return (node.getClusterStateVersionActivationSent() == clusterStateBundle.getVersion());
+ }
+
+ private static boolean twoPhaseTransitionEnabled(FleetController controller) {
+ return controller.getOptions().enableTwoPhaseClusterStateActivation;
+ }
+
/**
- * Checks if all distributor nodes have ACKed the most recent cluster state. Iff this
- * is the case, triggers handleAllDistributorsInSync() on the provided FleetController
+ * Checks if all distributor nodes have ACKed (and activated) the most recent cluster state.
+ * Iff this is the case, triggers handleAllDistributorsInSync() on the provided FleetController
* object and updates the broadcaster's last known in-sync cluster state version.
*/
void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database,
DatabaseHandler.Context dbContext,
FleetController fleetController) throws InterruptedException {
- if ((clusterStateBundle == null) || (lastClusterStateInSync == clusterStateBundle.getVersion())) {
+ if ((clusterStateBundle == null) || currentClusterStateIsConverged()) {
return; // Nothing to do for the current state
}
final int currentStateVersion = clusterStateBundle.getVersion();
- boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream()
+ boolean anyDistributorsNeedStateBundle = dbContext.getCluster().getNodeInfo().stream()
.filter(NodeInfo::isDistributor)
- .anyMatch(this::nodeNeedsClusterState);
+ .anyMatch(this::nodeNeedsClusterStateBundle);
+
+ if (!anyDistributorsNeedStateBundle && (currentStateVersion > lastStateVersionBundleAcked)) {
+ markCurrentClusterStateBundleAsReceivedByAllDistributors();
+ if (twoPhaseTransitionEnabled(fleetController)) {
+ log.log(LogLevel.INFO, () -> String.format("All distributors have ACKed cluster state " + // TODO log level
+ "version %d, sending activation", currentStateVersion));
+ } else {
+ markCurrentClusterStateAsConverged(database, dbContext, fleetController);
+ }
+ return; // Either converged (no two-phase) or activations must be sent before we can continue.
+ }
- if (!anyOutdatedDistributorNodes && (currentStateVersion > lastClusterStateInSync)) {
- log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
- lastClusterStateInSync = currentStateVersion;
- fleetController.handleAllDistributorsInSync(database, dbContext);
+ if (anyDistributorsNeedStateBundle || !twoPhaseTransitionEnabled(fleetController)) {
+ return;
}
+
+ boolean anyDistributorsNeedActivation = dbContext.getCluster().getNodeInfo().stream()
+ .filter(NodeInfo::isDistributor)
+ .anyMatch(this::nodeNeedsClusterStateActivation);
+
+ if (!anyDistributorsNeedActivation && (currentStateVersion > lastClusterStateVersionConverged)) {
+ markCurrentClusterStateAsConverged(database, dbContext, fleetController);
+ } else {
+ log.log(LogLevel.INFO, () -> String.format("distributors still need activation in state %d (last converged: %d)", // TODO log level
+ currentStateVersion, lastClusterStateVersionConverged));
+ }
+ }
+
+ private void markCurrentClusterStateBundleAsReceivedByAllDistributors() {
+ lastStateVersionBundleAcked = clusterStateBundle.getVersion();
}
- public boolean broadcastNewState(DatabaseHandler.Context dbContext, Communicator communicator) {
+ private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException {
+ // TODO log level
+ log.log(LogLevel.INFO, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state");
+ lastClusterStateVersionConverged = clusterStateBundle.getVersion();
+ lastClusterStateBundleConverged = clusterStateBundle;
+ fleetController.handleAllDistributorsInSync(database, dbContext);
+ }
+
+ private boolean currentClusterStateIsConverged() {
+ return lastClusterStateVersionConverged == clusterStateBundle.getVersion();
+ }
+
+ public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) {
if (clusterStateBundle == null) {
return false;
}
@@ -159,12 +260,15 @@ public class SystemStateBroadcaster {
if (nodeNeedsToObserveStartupTimestamps(node)) {
// TODO this is the same for all nodes, compute only once
ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext));
- log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion()
- + " to node " + node + ": " + modifiedBundle);
+ // TODO log level
+ log.log(LogLevel.INFO, () -> String.format("Sending modified cluster state version %d" +
+ " to node %s: %s", baselineState.getVersion(), node, modifiedBundle));
communicator.setSystemState(modifiedBundle, node, setClusterStateWaiter);
} else {
- log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node
- + ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")");
+ // TODO log level
+ log.log(LogLevel.INFO, () -> String.format("Sending system state version %d to node %s. " +
+ "(went down time %d, node start time %d)", baselineState.getVersion(), node,
+ node.getWentDownWithStartTime(), node.getStartTimestamp()));
communicator.setSystemState(clusterStateBundle, node, setClusterStateWaiter);
}
}
@@ -172,7 +276,28 @@ public class SystemStateBroadcaster {
return !recipients.isEmpty();
}
- public int lastClusterStateVersionInSync() { return lastClusterStateInSync; }
+ public boolean broadcastStateActivationsIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) {
+ if (clusterStateBundle == null || !clusterStateBundle.getBaselineClusterState().isOfficial()) {
+ return false;
+ }
+
+ if ((lastStateVersionBundleAcked != clusterStateBundle.getVersion())
+ || !dbContext.getFleetController().getOptions().enableTwoPhaseClusterStateActivation) {
+ return false; // Not yet received bundle ACK from all nodes; wait.
+ }
+
+ var recipients = resolveStateActivationSendSet(dbContext);
+ for (NodeInfo node : recipients) {
+ // TODO log level
+ log.log(LogLevel.INFO, () -> String.format("Sending cluster state activation to node %s for version %d",
+ node, clusterStateBundle.getVersion()));
+ communicator.activateClusterStateVersion(clusterStateBundle.getVersion(), node, activateClusterStateVersionWaiter);
+ }
+
+ return !recipients.isEmpty();
+ }
+
+ public int lastClusterStateVersionInSync() { return lastClusterStateVersionConverged; }
private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) {
return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp();
@@ -202,7 +327,9 @@ public class SystemStateBroadcaster {
private class ActivateClusterStateVersionWaiter implements Communicator.Waiter<ActivateClusterStateVersionRequest> {
@Override
public void done(ActivateClusterStateVersionRequest reply) {
- // TODO
+ synchronized (monitor) {
+ activateClusterStateVersionReplies.add(reply);
+ }
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
index 764bb3a0d92..a0d53e8c93e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java
@@ -8,6 +8,14 @@ public interface SystemStateListener {
// TODO consider rename to bundle
void handleNewPublishedState(ClusterStateBundle states);
+ /**
+ * Invoked at the edge when all pending cluster state bundles and version activations
+ * have been successfully ACKed by all distributors in the cluster.
+ *
+ * @param states bundle that has converged across all distributors
+ */
+ default void handleStateConvergedInCluster(ClusterStateBundle states) {}
+
default void handleNewCandidateState(ClusterStateBundle states) {}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
index f19bb5ad9b8..c5b1da0cb66 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
@@ -11,12 +11,10 @@ import com.yahoo.vespa.clustercontroller.core.Timer;
public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
- ActivateClusterStateVersionRequest request;
- Timer timer;
- Communicator.Waiter<ActivateClusterStateVersionRequest> waiter;
+ private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter;
+ private ActivateClusterStateVersionRequest request;
- public RPCActivateClusterStateVersionWaiter(Communicator.Waiter<ActivateClusterStateVersionRequest> waiter, Timer timer) {
- this.timer = timer;
+ public RPCActivateClusterStateVersionWaiter(Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) {
this.waiter = waiter;
}
@@ -28,7 +26,7 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
NodeInfo info = request.getNodeInfo();
if (req.isError()) {
return new ActivateClusterStateVersionRequest.Reply(req.errorCode(), req.errorMessage());
- } else if (!req.checkReturnTypes("")) {
+ } else if (!req.checkReturnTypes("i")) {
return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
return new ActivateClusterStateVersionRequest.Reply();
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index a28da4d02a1..f5629bda343 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -36,14 +36,15 @@ public class RPCCommunicator implements Communicator {
public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName());
+ public static final int ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION = 4;
+ public static final String ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME = "activate_cluster_state_version";
+
public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3;
public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates";
public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
- public static final String ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME = "activate_cluster_state_version";
-
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -161,12 +162,12 @@ public class RPCCommunicator implements Communicator {
waiter.setRequest(stateRequest);
connection.invokeAsync(req, 60, waiter);
- node.setSystemStateVersionSent(baselineState);
+ node.setClusterStateVersionBundleSent(baselineState);
}
@Override
public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> externalWaiter) {
- var waiter = new RPCActivateClusterStateVersionWaiter(externalWaiter, timer);
+ var waiter = new RPCActivateClusterStateVersionWaiter(externalWaiter);
Target connection = getConnection(node);
if ( ! connection.isValid()) {
@@ -183,6 +184,7 @@ public class RPCCommunicator implements Communicator {
waiter.setRequest(activationRequest);
connection.invokeAsync(req, 60, waiter);
+ node.setClusterStateVersionActivationSent(clusterStateVersion);
}
// protected for testing.
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java
index 551eb34f8fa..c4238fa5c84 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java
@@ -248,10 +248,10 @@ public class VdsClusterHtmlRendrer {
}
// System state version
- row.addCell(new HtmlTable.Cell("" + nodeInfo.getSystemStateVersionAcknowledged()));
- if (nodeInfo.getSystemStateVersionAcknowledged() < state.getVersion() - 2) {
+ row.addCell(new HtmlTable.Cell("" + nodeInfo.getClusterStateVersionBundleAcknowledged()));
+ if (nodeInfo.getClusterStateVersionBundleAcknowledged() < state.getVersion() - 2) {
row.getLastCell().addProperties(ERROR_PROPERTY);
- } else if (nodeInfo.getSystemStateVersionAcknowledged() < state.getVersion()) {
+ } else if (nodeInfo.getClusterStateVersionBundleAcknowledged() < state.getVersion()) {
row.getLastCell().addProperties(WARNING_PROPERTY);
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index b1e7158f61c..185fdc0c7ca 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup {
public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) {
ClusterState baselineState = stateBundle.getBaselineClusterState();
DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState);
- node.setSystemStateVersionSent(baselineState);
+ node.setClusterStateVersionBundleSent(baselineState);
req.setReply(new SetClusterStateRequest.Reply());
if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) {
waiter.done(req);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index 6d59a672e86..caa3fbcd6cd 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -68,6 +68,7 @@ public class DummyVdsNode {
* Any access to this list or to its members must be synchronized on the timer variable.
*/
private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>();
+ private int activatedClusterStateVersion = 0;
private Thread messageResponder = new Thread() {
public void run() {
@@ -220,6 +221,12 @@ public class DummyVdsNode {
}
}
+ public int getActivatedClusterStateVersion() {
+ synchronized (timer) {
+ return activatedClusterStateVersion;
+ }
+ }
+
public boolean hasPendingGetNodeStateRequest() {
synchronized (timer) {
return !waitingRequests.isEmpty();
@@ -300,14 +307,17 @@ public class DummyVdsNode {
public ClusterStateBundle getClusterStateBundle() {
synchronized(timer) {
- return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0));
+ return clusterStateBundles.stream()
+ .filter(b -> b.getVersion() <= activatedClusterStateVersion)
+ .findFirst() // Most recent cluster state bundle first in list
+ .orElse(null);
}
}
public ClusterState getClusterState() {
- synchronized(timer) {
- return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState());
- }
+ return Optional.ofNullable(getClusterStateBundle())
+ .map(b -> b.getBaselineClusterState())
+ .orElse(null);
}
public String getSlobrokName() {
@@ -369,6 +379,13 @@ public class DummyVdsNode {
m.paramDesc(2, "payload", "Slime format payload");
supervisor.addMethod(m);
}
+ if (stateCommunicationVersion >= RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this, "rpc_activateClusterStateVersion");
+ m.methodDesc("Activate a given cluster state version");
+ m.paramDesc(0, "stateVersion", "Cluster state version to activate");
+ m.returnDesc(0, "actualVersion", "Actual cluster state version on node");
+ supervisor.addMethod(m);
+ }
}
public void rpc_storageConnect(Request req) {
@@ -439,7 +456,7 @@ public class DummyVdsNode {
}
}
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
@@ -499,7 +516,7 @@ public class DummyVdsNode {
req.returnValues().add(new StringValue("OK"));
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage());
e.printStackTrace(System.err);
req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED));
req.returnValues().add(new StringValue(e.getMessage()));
@@ -516,11 +533,14 @@ public class DummyVdsNode {
synchronized(timer) {
updateStartTimestamps(newState);
clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState)));
+ if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ activatedClusterStateVersion = newState.getVersion(); // Simulate node that does not know of activation
+ }
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
@@ -536,11 +556,40 @@ public class DummyVdsNode {
synchronized(timer) {
updateStartTimestamps(stateBundle.getBaselineClusterState());
clusterStateBundles.add(0, stateBundle);
+ if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) {
+ activatedClusterStateVersion = stateBundle.getVersion(); // Simulate node that does not know of activation
+ }
timer.notifyAll();
}
log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle);
} catch (Exception e) {
- log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage());
+ e.printStackTrace(System.err);
+ req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
+ }
+ }
+
+ public void rpc_activateClusterStateVersion(Request req) {
+ try {
+ if (shouldFailSetSystemStateRequests()) {
+ // We assume that failing setDistributionStates also implies failing version activations
+ req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls");
+ return;
+ }
+ int activateVersion = req.parameters().get(0).asInt32();
+ synchronized(timer) {
+ int actualVersion = getLatestSystemStateVersion().orElse(0);
+ req.returnValues().add(new Int32Value(actualVersion));
+ if (activateVersion != actualVersion) {
+ req.setError(ErrorCode.METHOD_FAILED, "State version mismatch");
+ } else {
+ activatedClusterStateVersion = activateVersion;
+ timer.notifyAll();
+ }
+ }
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion);
+ } catch (Exception e) {
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage());
e.printStackTrace(System.err);
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
index bda06248d9e..bf63aebe022 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
@@ -8,6 +8,6 @@ import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
* over regular RPC.
*/
public class DummyVdsNodeOptions {
- // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+
- public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
+ // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+, 4 - 7.24+
+ public int stateCommunicationVersion = RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION;
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
index d59dbb4933a..e9eaf56085b 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
@@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core;
import com.yahoo.vdslib.distribution.ConfiguredNode;
import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.State;
+import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter;
import org.junit.Test;
import java.util.ArrayList;
@@ -29,8 +30,16 @@ public class RpcVersionAutoDowngradeTest extends FleetControllerTest {
@Test
public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception {
- setUpFakeCluster(2); // HEAD is at v3
+ setUpFakeCluster(2); // HEAD is at v4
waitForState("version:\\d+ distributor:10 storage:10");
}
+ @Test
+ public void implicit_activation_for_nodes_that_return_not_found_for_version_activation_rpc() throws Exception {
+ setUpFakeCluster(3); // HEAD is at v4
+ waitForState("version:\\d+ distributor:10 storage:10");
+ }
+
+ // TODO partial version setup for simulating upgrades
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
index 32de3591f2d..85106ce7e3c 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java
@@ -171,6 +171,9 @@ public class StateChangeTest extends FleetControllerTest {
options.nodeStateRequestTimeoutMS = 60 * 60 * 1000;
options.minTimeBetweenNewSystemStates = 0;
options.maxInitProgressTime = 50000;
+ // This test makes very specific assumptions about the amount of work done in a single tick.
+ // Two-phase cluster state activation changes this quite a bit, so disable it. At least for now.
+ options.enableTwoPhaseClusterStateActivation = false;
initialize(options);
@@ -1035,7 +1038,7 @@ public class StateChangeTest extends FleetControllerTest {
// Assert that the failed node has not acknowledged the latest version.
// (The version may still be larger than versionBeforeChange if the fleet controller sends a
// "stable system" update without timestamps in the meantime
- assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getSystemStateVersionAcknowledged() < versionAfterChange);
+ assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getClusterStateVersionBundleAcknowledged() < versionAfterChange);
// Ensure non-concurrent access to getNewestSystemStateVersionSent
synchronized(timer) {
@@ -1343,11 +1346,19 @@ public class StateChangeTest extends FleetControllerTest {
void sendAllDeferredDistributorClusterStateAcks() throws Exception {
communicator.sendAllDeferredDistributorClusterStateAcks();
- ctrl.tick();
+ ctrl.tick(); // Process cluster state bundle ACKs
+ if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) {
+ ctrl.tick(); // Send activations
+ ctrl.tick(); // Process activation ACKs
+ }
}
void processScheduledTask() throws Exception {
ctrl.tick(); // Cluster state recompute iteration and send
+ if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) {
+ ctrl.tick(); // Send activations
+ ctrl.tick(); // Process activation ACKs
+ }
ctrl.tick(); // Iff ACKs were received, process version dependent task(s)
}
@@ -1440,7 +1451,7 @@ public class StateChangeTest extends FleetControllerTest {
@Test
public void no_op_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception {
- RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
+ RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime());
communicator.setShouldDeferDistributorClusterStateAcks(true);
fixture.markStorageNodeDown(0);
@@ -1523,7 +1534,9 @@ public class StateChangeTest extends FleetControllerTest {
public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception {
FleetControllerOptions options = defaultOptions();
options.minTimeBetweenNewSystemStates = 10_000;
+ //options.enableTwoPhaseClusterStateActivation = false; ////
RemoteTaskFixture fixture = createFixtureWith(options);
+
// Have to increment timer here to be able to send state generated by the scheduled task
timer.advanceTime(10_000);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index aa5219147ce..75b65f5d85f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -68,7 +68,7 @@ public class SystemStateBroadcasterTest {
ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
}
@@ -79,7 +79,7 @@ public class SystemStateBroadcasterTest {
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
// Only distributor 0 should observe startup timestamps
@@ -97,7 +97,7 @@ public class SystemStateBroadcasterTest {
StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()));
}
@@ -111,7 +111,7 @@ public class SystemStateBroadcasterTest {
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
- f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> {
// Only distributor 0 should observe startup timestamps
@@ -122,4 +122,15 @@ public class SystemStateBroadcasterTest {
StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
+
+ @Test
+ public void activation_not_sent_before_all_distributors_have_acked_state_bundle() {
+ Fixture f = new Fixture();
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
+ ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.broadcaster.handleNewClusterStates(stateBundle);
+ f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator);
+
+ // TODO
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
index 61e9d1a90de..9eb98f4f045 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
@@ -161,7 +161,7 @@ public class RPCCommunicatorTest {
waiter.handleRequestDone(req);
// This would normally be done in processResponses(), but that code path is not invoked in this test.
- cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false);
+ cf.cluster().getNodeInfo(Node.ofStorage(1)).setClusterStateBundleVersionAcknowledged(123, false);
f.receivedRequest.set(null);
// Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
index d140ef998b6..9734156b13f 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java
@@ -29,6 +29,7 @@ public interface WaitCondition {
abstract class StateWait implements WaitCondition {
private final Object monitor;
protected ClusterState currentState;
+ protected ClusterState convergedState;
private final SystemStateListener listener = new SystemStateListener() {
@Override
public void handleNewPublishedState(ClusterStateBundle state) {
@@ -37,6 +38,14 @@ public interface WaitCondition {
monitor.notifyAll();
}
}
+
+ @Override
+ public void handleStateConvergedInCluster(ClusterStateBundle states) {
+ synchronized (monitor) {
+ currentState = convergedState = states.getBaselineClusterState();
+ monitor.notifyAll();
+ }
+ }
};
public StateWait(FleetController fc, Object monitor) {
@@ -90,8 +99,8 @@ public interface WaitCondition {
@Override
public String isConditionMet() {
- if (currentState != null) {
- lastCheckedState = currentState;
+ if (convergedState != null) {
+ lastCheckedState = convergedState;
Matcher m = pattern.matcher(lastCheckedState.toString());
if (m.matches() || !checkSpaceSubset.isEmpty()) {
if (nodesToCheck != null) {