diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-08 12:58:31 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:43:03 +0000 |
commit | 57b4604fc462cdc18e00bfd425a2211fac429869 (patch) | |
tree | 9797e52a497b001d89cc197b320b1deecda6b236 /clustercontroller-core | |
parent | 05f27f6cfcda786232fa6da47154784dce2483e1 (diff) |
Support configurable two-phase state transitions in cluster controller
Diffstat (limited to 'clustercontroller-core')
16 files changed, 382 insertions, 108 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 9d833f366e5..a653ace265b 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -76,6 +76,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd private boolean waitingForCycle = false; private StatusPageServer.PatternRequestRouter statusRequestRouter = new StatusPageServer.PatternRequestRouter(); private final List<ClusterStateBundle> newStates = new ArrayList<>(); + private final List<ClusterStateBundle> convergedStates = new ArrayList<>(); private long configGeneration = -1; private long nextConfigGeneration = -1; private Queue<RemoteClusterControllerTask> remoteTasks = new LinkedList<>(); @@ -253,6 +254,10 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd throw new NullPointerException("Cluster state should never be null at this point"); } listener.handleNewPublishedState(ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(state))); + ClusterStateBundle convergedState = systemStateBroadcaster.getLastClusterStateBundleConverged(); + if (convergedState != null) { + listener.handleStateConvergedInCluster(convergedState); + } } public FleetControllerOptions getOptions() { @@ -438,6 +443,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd ClusterState currentState = stateVersionTracker.getVersionedClusterState(); log.fine(() -> String.format("All distributors have ACKed cluster state version %d", currentState.getVersion())); stateChangeHandler.handleAllDistributorsInSync(currentState, nodes, database, context); + convergedStates.add(stateVersionTracker.getVersionedClusterStateBundle()); // FIXME ugh, going via version tracker? } private boolean changesConfiguredNodeSet(Collection<ConfiguredNode> newNodes) { @@ -666,12 +672,15 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd // Reset timer to only see warning once. firstAllowedStateBroadcast = currentTime; } - sentAny = systemStateBroadcaster.broadcastNewState(databaseContext, communicator); + // FIXME bad interaction with activation..! + sentAny = systemStateBroadcaster.broadcastNewStateBundleIfRequired(databaseContext, communicator); if (sentAny) { // FIXME won't this inhibit resending to unresponsive nodes? nextStateSendTime = currentTime + options.minTimeBetweenNewSystemStates; } } + // Always allow activations if we've already broadcasted a state + sentAny |= systemStateBroadcaster.broadcastStateActivationsIfRequired(databaseContext, communicator); return sentAny; } @@ -686,6 +695,16 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd newStates.clear(); } } + if ( ! convergedStates.isEmpty()) { + synchronized (systemStateListeners) { + for (ClusterStateBundle stateBundle : convergedStates) { + for(SystemStateListener listener : systemStateListeners) { + listener.handleStateConvergedInCluster(stateBundle); + } + } + convergedStates.clear(); + } + } } private boolean processNextQueuedRemoteTask() { @@ -1047,7 +1066,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd while (true) { int ackedNodes = 0; for (NodeInfo node : cluster.getNodeInfo()) { - if (node.getSystemStateVersionAcknowledged() >= version) { + if (node.getClusterStateVersionBundleAcknowledged() >= version) { ++ackedNodes; } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java index e069dde1901..d98cc6473e7 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetControllerOptions.java @@ -121,6 +121,8 @@ public class FleetControllerOptions implements Cloneable { public boolean clusterHasGlobalDocumentTypes = false; + public boolean enableTwoPhaseClusterStateActivation = true; + // TODO: Choose a default value public double minMergeCompletionRatio = 1.0; @@ -231,6 +233,7 @@ public class FleetControllerOptions implements Cloneable { sb.append("<tr><td><nobr>Wanted distribution bits</nobr></td><td align=\"right\">").append(distributionBits).append("</td></tr>"); sb.append("<tr><td><nobr>Max deferred task version wait time</nobr></td><td align=\"right\">").append(maxDeferredTaskVersionWaitTime.toMillis()).append("ms</td></tr>"); sb.append("<tr><td><nobr>Cluster has global document types configured</nobr></td><td align=\"right\">").append(clusterHasGlobalDocumentTypes).append("</td></tr>"); + sb.append("<tr><td><nobr>Enable 2-phase cluster state activation protocol</nobr></td><td align=\"right\">").append(enableTwoPhaseClusterStateActivation).append("</td></tr>"); sb.append("</table>"); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index 54cf2dad00a..58e2fa14d4f 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -14,7 +14,6 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.TreeMap; import java.util.logging.Logger; @@ -74,11 +73,16 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { * Version 1 is for the getnodestate2 command ((legacy, not supported). * Version 2 is for the getnodestate3 command * Version 3 adds support for setdistributionstates + * Version 4 adds support for explicit cluster state version bundle activation */ private int version; - private Map<Integer, ClusterState> systemStateVersionSent = new TreeMap<>(); - private ClusterState systemStateVersionAcknowledged; + // Mapping of cluster state version -> cluster state instance + private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>(); + private ClusterState clusterStateVersionBundleAcknowledged; + + private int clusterStateVersionActivationSent = -1; + private int clusterStateVersionActivationAcked = -1; /** * When a node goes from an up state to a down state, update this flag with the start timestamp the node had before going down. * The cluster state broadcaster will use this to identify whether distributors have restarted. @@ -102,7 +106,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { // NOTE: See update(node) below NodeInfo(ContentCluster cluster, Node n, boolean configuredRetired, String rpcAddress, Distribution distribution) { - if (cluster == null) throw new IllegalArgumentException("Cluster not set"); + if (cluster == null) { + throw new IllegalArgumentException("Cluster not set"); + } reportedState = new NodeState(n.getType(), State.DOWN); wantedState = new NodeState(n.getType(), State.UP); this.cluster = cluster; @@ -238,7 +244,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public ContentCluster getCluster() { return cluster; } - /** Returns true if the node is currentl registered in slobrok */ + /** Returns true if the node is currently registered in slobrok */ // FIXME why is this called "isRpcAddressOutdated" then??? public boolean isRpcAddressOutdated() { return lastSeenInSlobrok != null; } @@ -353,12 +359,13 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { /** Sets the wanted state. The wanted state is taken as UP if a null argument is given */ public void setWantedState(NodeState state) { - if (state == null) + if (state == null) { state = new NodeState(node.getType(), State.UP); + } NodeState newWanted = new NodeState(node.getType(), state.getState()); newWanted.setDescription(state.getDescription()); if (!newWanted.equals(state)) { - try{ + try { throw new Exception(); } catch (Exception e) { StringWriter sw = new StringWriter(); @@ -408,40 +415,40 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { } public int getVersion() { return version; } - public int getConnectionVersion() { return connectionVersion; } - public void setConnectionVersion(int version) { connectionVersion = version; } public ClusterState getNewestSystemStateSent() { - ClusterState last = null; - for (ClusterState s : systemStateVersionSent.values()) { - if (last == null || last.getVersion() < s.getVersion()) { - last = s; - } + if (clusterStateVersionBundleSent.isEmpty()) { + return null; } - return last; + return clusterStateVersionBundleSent.lastEntry().getValue(); } public int getNewestSystemStateVersionSent() { ClusterState last = getNewestSystemStateSent(); return last == null ? -1 : last.getVersion(); } - public int getSystemStateVersionAcknowledged() { - return (systemStateVersionAcknowledged == null ? -1 : systemStateVersionAcknowledged.getVersion()); + + public int getClusterStateVersionBundleAcknowledged() { + return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion()); } - public void setSystemStateVersionSent(ClusterState state) { - if (state == null) throw new Error("Should not clear info for last version sent"); - if (systemStateVersionSent.containsKey(state.getVersion())) { + public void setClusterStateVersionBundleSent(ClusterState state) { + if (state == null) { + throw new Error("Should not clear info for last version sent"); + } + if (clusterStateVersionBundleSent.containsKey(state.getVersion())) { throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node); } - systemStateVersionSent.put(state.getVersion(), state); + clusterStateVersionBundleSent.put(state.getVersion(), state); } - public void setSystemStateVersionAcknowledged(Integer version, boolean success) { - if (version == null) throw new Error("Should not clear info for last version acked"); - if (!systemStateVersionSent.containsKey(version)) { + public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) { + if (version == null) { + throw new Error("Should not clear info for last version acked"); + } + if (!clusterStateVersionBundleSent.containsKey(version)) { throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node); } - ClusterState state = systemStateVersionSent.remove(version); - if (success && (systemStateVersionAcknowledged == null || systemStateVersionAcknowledged.getVersion() < state.getVersion())) { - systemStateVersionAcknowledged = state; + ClusterState state = clusterStateVersionBundleSent.remove(version); + if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) { + clusterStateVersionBundleAcknowledged = state; if (wentDownWithStartTime != 0 && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion()) && !state.getNodeState(node).getState().oneOf("dsm")) @@ -452,6 +459,25 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { } } + public void setClusterStateVersionActivationSent(int version) { + clusterStateVersionActivationSent = version; + } + public int getClusterStateVersionActivationSent() { + return clusterStateVersionActivationSent; + } + + public int getClusterStateVersionActivationAcked() { + return clusterStateVersionActivationAcked; + } + public void setSystemStateVersionActivationAcked(Integer version, boolean success) { + if (success && (version > clusterStateVersionActivationAcked)) { + clusterStateVersionActivationAcked = version; + } else if (!success) { + clusterStateVersionActivationSent = -1; // Trigger resend + } + } + + public void setHostInfo(HostInfo hostInfo) { // Note: This will blank out any hostInfo we already had, if the parsing fails. // This is intentional, to make sure we're never left with stale data. diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index a40a45fd48a..e4a9f62b054 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -1,6 +1,7 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.jrt.ErrorCode; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.*; import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; @@ -20,12 +21,17 @@ public class SystemStateBroadcaster { private final Object monitor; private ClusterStateBundle clusterStateBundle; private final List<SetClusterStateRequest> setClusterStateReplies = new LinkedList<>(); + private final List<ActivateClusterStateVersionRequest> activateClusterStateVersionReplies = new LinkedList<>(); private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000; private final Map<Node, Long> lastErrorReported = new TreeMap<>(); - private int lastClusterStateInSync = 0; + + private int lastStateVersionBundleAcked = 0; + private int lastClusterStateVersionConverged = 0; + private ClusterStateBundle lastClusterStateBundleConverged; private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter(); + private final ActivateClusterStateVersionWaiter activateClusterStateVersionWaiter = new ActivateClusterStateVersionWaiter(); public SystemStateBroadcaster(Timer timer, Object monitor) { this.timer = timer; @@ -52,6 +58,10 @@ public class SystemStateBroadcaster { return clusterStateBundle; } + public ClusterStateBundle getLastClusterStateBundleConverged() { + return lastClusterStateBundleConverged; + } + private void reportNodeError(boolean nodeOk, NodeInfo info, String message) { long time = timer.getCurrentTimeInMillis(); Long lastReported = lastErrorReported.get(info.getNode()); @@ -65,37 +75,62 @@ public class SystemStateBroadcaster { public boolean processResponses() { boolean anyResponsesFound = false; synchronized(monitor) { - for (SetClusterStateRequest req : setClusterStateReplies) { - anyResponsesFound = true; - - NodeInfo info = req.getNodeInfo(); - boolean nodeOk = info.getReportedState().getState().oneOf("uir"); - int version = req.getClusterStateVersion(); - - if (req.getReply().isError()) { - info.setSystemStateVersionAcknowledged(version, false); - if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) { - if (info.getNewestSystemStateVersionSent() == version) { - reportNodeError(nodeOk, info, - "Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage() - + " from " + info + " setsystemstate request."); - } - } + anyResponsesFound = !setClusterStateReplies.isEmpty() || !activateClusterStateVersionReplies.isEmpty(); + processSetClusterStateResponses(); + processActivateClusterStateVersionResponses(); + } + return anyResponsesFound; + } + + private void processActivateClusterStateVersionResponses() { + for (var req : activateClusterStateVersionReplies) { + NodeInfo info = req.getNodeInfo(); + int version = req.getClusterStateVersion(); + boolean success = true; + if (req.getReply().isError()) { + // NO_SUCH_METHOD implies node is on a version that does not understand explicit activations + // and it has already merrily started using the state version. Treat as if it had been ACKed. + if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) { + log.log(LogLevel.INFO, () -> String.format("Activation NACK for node %s with version %d, message %s", + info, version, req.getReply().getReturnMessage())); // TODO log level + success = false; } else { - info.setSystemStateVersionAcknowledged(version, true); - log.log(LogLevel.DEBUG, "Node " + info + " acked system state version " + version + "."); - lastErrorReported.remove(info.getNode()); + log.log(LogLevel.INFO, () -> String.format("Node %s did not understand state activation RPC; " + + "implicitly treating state %d as activated on node", info, version)); // TODO log level } } - setClusterStateReplies.clear(); + info.setSystemStateVersionActivationAcked(version, success); + // TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported + // as part of processSetClusterStateResponses anyway, but can add it here as well if deemed necessary. } - return anyResponsesFound; + activateClusterStateVersionReplies.clear(); } - private boolean nodeNeedsClusterState(NodeInfo node) { - if (node.getSystemStateVersionAcknowledged() == clusterStateBundle.getVersion()) { - return false; // No point in sending if node already has updated system state + private void processSetClusterStateResponses() { + for (SetClusterStateRequest req : setClusterStateReplies) { + NodeInfo info = req.getNodeInfo(); + int version = req.getClusterStateVersion(); + + if (req.getReply().isError()) { + info.setClusterStateBundleVersionAcknowledged(version, false); + if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) { + if (info.getNewestSystemStateVersionSent() == version) { + boolean nodeOk = info.getReportedState().getState().oneOf("uir"); + reportNodeError(nodeOk, info, + String.format("Got error response %d: %s from %s setdistributionstates request.", + req.getReply().getReturnCode(), req.getReply().getReturnMessage(), info)); + } + } + } else { + info.setClusterStateBundleVersionAcknowledged(version, true); + log.log(LogLevel.DEBUG, () -> String.format("Node %s ACKed system state version %d.", info, version)); + lastErrorReported.remove(info.getNode()); + } } + setClusterStateReplies.clear(); + } + + private static boolean nodeIsReachable(NodeInfo node) { if (node.getRpcAddress() == null || node.isRpcAddressOutdated()) { return false; // Can't set state on nodes we don't know where are } @@ -108,41 +143,107 @@ public class SystemStateBroadcaster { return true; } + private boolean nodeNeedsClusterStateBundle(NodeInfo node) { + if (node.getClusterStateVersionBundleAcknowledged() == clusterStateBundle.getVersion()) { + return false; // No point in sending if node already has updated system state + } + return nodeIsReachable(node); + } + + private boolean nodeNeedsClusterStateActivation(NodeInfo node) { + if (node.getClusterStateVersionActivationAcked() == clusterStateBundle.getVersion()) { + return false; // No point in sending if node already has activated cluster state version + } + return nodeIsReachable(node); + } + private List<NodeInfo> resolveStateVersionSendSet(DatabaseHandler.Context dbContext) { return dbContext.getCluster().getNodeInfo().stream() - .filter(this::nodeNeedsClusterState) - .filter(node -> !newestStateAlreadySentToNode(node)) + .filter(this::nodeNeedsClusterStateBundle) + .filter(node -> !newestStateBundleAlreadySentToNode(node)) .collect(Collectors.toList()); } - private boolean newestStateAlreadySentToNode(NodeInfo node) { + // Precondition: no nodes in the cluster need to receive the current cluster state version bundle + private List<NodeInfo> resolveStateActivationSendSet(DatabaseHandler.Context dbContext) { + return dbContext.getCluster().getNodeInfo().stream() + .filter(this::nodeNeedsClusterStateActivation) + .filter(node -> !newestStateActivationAlreadySentToNode(node)) + .collect(Collectors.toList()); + } + + private boolean newestStateBundleAlreadySentToNode(NodeInfo node) { return (node.getNewestSystemStateVersionSent() == clusterStateBundle.getVersion()); } + private boolean newestStateActivationAlreadySentToNode(NodeInfo node) { + return (node.getClusterStateVersionActivationSent() == clusterStateBundle.getVersion()); + } + + private static boolean twoPhaseTransitionEnabled(FleetController controller) { + return controller.getOptions().enableTwoPhaseClusterStateActivation; + } + /** - * Checks if all distributor nodes have ACKed the most recent cluster state. Iff this - * is the case, triggers handleAllDistributorsInSync() on the provided FleetController + * Checks if all distributor nodes have ACKed (and activated) the most recent cluster state. + * Iff this is the case, triggers handleAllDistributorsInSync() on the provided FleetController * object and updates the broadcaster's last known in-sync cluster state version. */ void checkIfClusterStateIsAckedByAllDistributors(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException { - if ((clusterStateBundle == null) || (lastClusterStateInSync == clusterStateBundle.getVersion())) { + if ((clusterStateBundle == null) || currentClusterStateIsConverged()) { return; // Nothing to do for the current state } final int currentStateVersion = clusterStateBundle.getVersion(); - boolean anyOutdatedDistributorNodes = dbContext.getCluster().getNodeInfo().stream() + boolean anyDistributorsNeedStateBundle = dbContext.getCluster().getNodeInfo().stream() .filter(NodeInfo::isDistributor) - .anyMatch(this::nodeNeedsClusterState); + .anyMatch(this::nodeNeedsClusterStateBundle); + + if (!anyDistributorsNeedStateBundle && (currentStateVersion > lastStateVersionBundleAcked)) { + markCurrentClusterStateBundleAsReceivedByAllDistributors(); + if (twoPhaseTransitionEnabled(fleetController)) { + log.log(LogLevel.INFO, () -> String.format("All distributors have ACKed cluster state " + // TODO log level + "version %d, sending activation", currentStateVersion)); + } else { + markCurrentClusterStateAsConverged(database, dbContext, fleetController); + } + return; // Either converged (no two-phase) or activations must be sent before we can continue. + } - if (!anyOutdatedDistributorNodes && (currentStateVersion > lastClusterStateInSync)) { - log.log(LogLevel.DEBUG, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); - lastClusterStateInSync = currentStateVersion; - fleetController.handleAllDistributorsInSync(database, dbContext); + if (anyDistributorsNeedStateBundle || !twoPhaseTransitionEnabled(fleetController)) { + return; } + + boolean anyDistributorsNeedActivation = dbContext.getCluster().getNodeInfo().stream() + .filter(NodeInfo::isDistributor) + .anyMatch(this::nodeNeedsClusterStateActivation); + + if (!anyDistributorsNeedActivation && (currentStateVersion > lastClusterStateVersionConverged)) { + markCurrentClusterStateAsConverged(database, dbContext, fleetController); + } else { + log.log(LogLevel.INFO, () -> String.format("distributors still need activation in state %d (last converged: %d)", // TODO log level + currentStateVersion, lastClusterStateVersionConverged)); + } + } + + private void markCurrentClusterStateBundleAsReceivedByAllDistributors() { + lastStateVersionBundleAcked = clusterStateBundle.getVersion(); } - public boolean broadcastNewState(DatabaseHandler.Context dbContext, Communicator communicator) { + private void markCurrentClusterStateAsConverged(DatabaseHandler database, DatabaseHandler.Context dbContext, FleetController fleetController) throws InterruptedException { + // TODO log level + log.log(LogLevel.INFO, "All distributors have newest clusterstate, updating start timestamps in zookeeper and clearing them from cluster state"); + lastClusterStateVersionConverged = clusterStateBundle.getVersion(); + lastClusterStateBundleConverged = clusterStateBundle; + fleetController.handleAllDistributorsInSync(database, dbContext); + } + + private boolean currentClusterStateIsConverged() { + return lastClusterStateVersionConverged == clusterStateBundle.getVersion(); + } + + public boolean broadcastNewStateBundleIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) { if (clusterStateBundle == null) { return false; } @@ -159,12 +260,15 @@ public class SystemStateBroadcaster { if (nodeNeedsToObserveStartupTimestamps(node)) { // TODO this is the same for all nodes, compute only once ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext)); - log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion() - + " to node " + node + ": " + modifiedBundle); + // TODO log level + log.log(LogLevel.INFO, () -> String.format("Sending modified cluster state version %d" + + " to node %s: %s", baselineState.getVersion(), node, modifiedBundle)); communicator.setSystemState(modifiedBundle, node, setClusterStateWaiter); } else { - log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node - + ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")"); + // TODO log level + log.log(LogLevel.INFO, () -> String.format("Sending system state version %d to node %s. " + + "(went down time %d, node start time %d)", baselineState.getVersion(), node, + node.getWentDownWithStartTime(), node.getStartTimestamp())); communicator.setSystemState(clusterStateBundle, node, setClusterStateWaiter); } } @@ -172,7 +276,28 @@ public class SystemStateBroadcaster { return !recipients.isEmpty(); } - public int lastClusterStateVersionInSync() { return lastClusterStateInSync; } + public boolean broadcastStateActivationsIfRequired(DatabaseHandler.Context dbContext, Communicator communicator) { + if (clusterStateBundle == null || !clusterStateBundle.getBaselineClusterState().isOfficial()) { + return false; + } + + if ((lastStateVersionBundleAcked != clusterStateBundle.getVersion()) + || !dbContext.getFleetController().getOptions().enableTwoPhaseClusterStateActivation) { + return false; // Not yet received bundle ACK from all nodes; wait. + } + + var recipients = resolveStateActivationSendSet(dbContext); + for (NodeInfo node : recipients) { + // TODO log level + log.log(LogLevel.INFO, () -> String.format("Sending cluster state activation to node %s for version %d", + node, clusterStateBundle.getVersion())); + communicator.activateClusterStateVersion(clusterStateBundle.getVersion(), node, activateClusterStateVersionWaiter); + } + + return !recipients.isEmpty(); + } + + public int lastClusterStateVersionInSync() { return lastClusterStateVersionConverged; } private static boolean nodeNeedsToObserveStartupTimestamps(NodeInfo node) { return node.getStartTimestamp() != 0 && node.getWentDownWithStartTime() == node.getStartTimestamp(); @@ -202,7 +327,9 @@ public class SystemStateBroadcaster { private class ActivateClusterStateVersionWaiter implements Communicator.Waiter<ActivateClusterStateVersionRequest> { @Override public void done(ActivateClusterStateVersionRequest reply) { - // TODO + synchronized (monitor) { + activateClusterStateVersionReplies.add(reply); + } } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java index 764bb3a0d92..a0d53e8c93e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/listeners/SystemStateListener.java @@ -8,6 +8,14 @@ public interface SystemStateListener { // TODO consider rename to bundle void handleNewPublishedState(ClusterStateBundle states); + /** + * Invoked at the edge when all pending cluster state bundles and version activations + * have been successfully ACKed by all distributors in the cluster. + * + * @param states bundle that has converged across all distributors + */ + default void handleStateConvergedInCluster(ClusterStateBundle states) {} + default void handleNewCandidateState(ClusterStateBundle states) {} } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java index f19bb5ad9b8..c5b1da0cb66 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java @@ -11,12 +11,10 @@ import com.yahoo.vespa.clustercontroller.core.Timer; public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { - ActivateClusterStateVersionRequest request; - Timer timer; - Communicator.Waiter<ActivateClusterStateVersionRequest> waiter; + private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter; + private ActivateClusterStateVersionRequest request; - public RPCActivateClusterStateVersionWaiter(Communicator.Waiter<ActivateClusterStateVersionRequest> waiter, Timer timer) { - this.timer = timer; + public RPCActivateClusterStateVersionWaiter(Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { this.waiter = waiter; } @@ -28,7 +26,7 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { NodeInfo info = request.getNodeInfo(); if (req.isError()) { return new ActivateClusterStateVersionRequest.Reply(req.errorCode(), req.errorMessage()); - } else if (!req.checkReturnTypes("")) { + } else if (!req.checkReturnTypes("i")) { return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info); } return new ActivateClusterStateVersionRequest.Reply(); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index a28da4d02a1..f5629bda343 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -36,14 +36,15 @@ public class RPCCommunicator implements Communicator { public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName()); + public static final int ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION = 4; + public static final String ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME = "activate_cluster_state_version"; + public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3; public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates"; public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2; public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2"; - public static final String ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME = "activate_cluster_state_version"; - private final Timer timer; private final Supervisor supervisor; private double nodeStateRequestTimeoutIntervalMaxSeconds; @@ -161,12 +162,12 @@ public class RPCCommunicator implements Communicator { waiter.setRequest(stateRequest); connection.invokeAsync(req, 60, waiter); - node.setSystemStateVersionSent(baselineState); + node.setClusterStateVersionBundleSent(baselineState); } @Override public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> externalWaiter) { - var waiter = new RPCActivateClusterStateVersionWaiter(externalWaiter, timer); + var waiter = new RPCActivateClusterStateVersionWaiter(externalWaiter); Target connection = getConnection(node); if ( ! connection.isValid()) { @@ -183,6 +184,7 @@ public class RPCCommunicator implements Communicator { waiter.setRequest(activationRequest); connection.invokeAsync(req, 60, waiter); + node.setClusterStateVersionActivationSent(clusterStateVersion); } // protected for testing. diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java index 551eb34f8fa..c4238fa5c84 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/status/statuspage/VdsClusterHtmlRendrer.java @@ -248,10 +248,10 @@ public class VdsClusterHtmlRendrer { } // System state version - row.addCell(new HtmlTable.Cell("" + nodeInfo.getSystemStateVersionAcknowledged())); - if (nodeInfo.getSystemStateVersionAcknowledged() < state.getVersion() - 2) { + row.addCell(new HtmlTable.Cell("" + nodeInfo.getClusterStateVersionBundleAcknowledged())); + if (nodeInfo.getClusterStateVersionBundleAcknowledged() < state.getVersion() - 2) { row.getLastCell().addProperties(ERROR_PROPERTY); - } else if (nodeInfo.getSystemStateVersionAcknowledged() < state.getVersion()) { + } else if (nodeInfo.getClusterStateVersionBundleAcknowledged() < state.getVersion()) { row.getLastCell().addProperties(WARNING_PROPERTY); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index b1e7158f61c..185fdc0c7ca 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) { ClusterState baselineState = stateBundle.getBaselineClusterState(); DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState); - node.setSystemStateVersionSent(baselineState); + node.setClusterStateVersionBundleSent(baselineState); req.setReply(new SetClusterStateRequest.Reply()); if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) { waiter.done(req); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index 6d59a672e86..caa3fbcd6cd 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -68,6 +68,7 @@ public class DummyVdsNode { * Any access to this list or to its members must be synchronized on the timer variable. */ private List<ClusterStateBundle> clusterStateBundles = new LinkedList<>(); + private int activatedClusterStateVersion = 0; private Thread messageResponder = new Thread() { public void run() { @@ -220,6 +221,12 @@ public class DummyVdsNode { } } + public int getActivatedClusterStateVersion() { + synchronized (timer) { + return activatedClusterStateVersion; + } + } + public boolean hasPendingGetNodeStateRequest() { synchronized (timer) { return !waitingRequests.isEmpty(); @@ -300,14 +307,17 @@ public class DummyVdsNode { public ClusterStateBundle getClusterStateBundle() { synchronized(timer) { - return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0)); + return clusterStateBundles.stream() + .filter(b -> b.getVersion() <= activatedClusterStateVersion) + .findFirst() // Most recent cluster state bundle first in list + .orElse(null); } } public ClusterState getClusterState() { - synchronized(timer) { - return (clusterStateBundles.isEmpty() ? null : clusterStateBundles.get(0).getBaselineClusterState()); - } + return Optional.ofNullable(getClusterStateBundle()) + .map(b -> b.getBaselineClusterState()) + .orElse(null); } public String getSlobrokName() { @@ -369,6 +379,13 @@ public class DummyVdsNode { m.paramDesc(2, "payload", "Slime format payload"); supervisor.addMethod(m); } + if (stateCommunicationVersion >= RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + m = new Method(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME, "i", "i", this, "rpc_activateClusterStateVersion"); + m.methodDesc("Activate a given cluster state version"); + m.paramDesc(0, "stateVersion", "Cluster state version to activate"); + m.returnDesc(0, "actualVersion", "Actual cluster state version on node"); + supervisor.addMethod(m); + } } public void rpc_storageConnect(Request req) { @@ -439,7 +456,7 @@ public class DummyVdsNode { } } } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering " + req.methodName() + " request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering " + req.methodName() + " request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } @@ -499,7 +516,7 @@ public class DummyVdsNode { req.returnValues().add(new StringValue("OK")); log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state (through old setsystemstate call) " + newState); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage()); e.printStackTrace(System.err); req.returnValues().add(new Int32Value(ErrorCode.METHOD_FAILED)); req.returnValues().add(new StringValue(e.getMessage())); @@ -516,11 +533,14 @@ public class DummyVdsNode { synchronized(timer) { updateStartTimestamps(newState); clusterStateBundles.add(0, ClusterStateBundle.ofBaselineOnly(AnnotatedClusterState.withoutAnnotations(newState))); + if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + activatedClusterStateVersion = newState.getVersion(); // Simulate node that does not know of activation + } timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new system state " + newState); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setsystemstate request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setsystemstate request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } @@ -536,11 +556,40 @@ public class DummyVdsNode { synchronized(timer) { updateStartTimestamps(stateBundle.getBaselineClusterState()); clusterStateBundles.add(0, stateBundle); + if (stateCommunicationVersion < RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION) { + activatedClusterStateVersion = stateBundle.getVersion(); // Simulate node that does not know of activation + } timer.notifyAll(); } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + stateBundle); } catch (Exception e) { - log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering setdistributionstates request: " + e.getMessage()); + e.printStackTrace(System.err); + req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); + } + } + + public void rpc_activateClusterStateVersion(Request req) { + try { + if (shouldFailSetSystemStateRequests()) { + // We assume that failing setDistributionStates also implies failing version activations + req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail activateClusterStateVersion() calls"); + return; + } + int activateVersion = req.parameters().get(0).asInt32(); + synchronized(timer) { + int actualVersion = getLatestSystemStateVersion().orElse(0); + req.returnValues().add(new Int32Value(actualVersion)); + if (activateVersion != actualVersion) { + req.setError(ErrorCode.METHOD_FAILED, "State version mismatch"); + } else { + activatedClusterStateVersion = activateVersion; + timer.notifyAll(); + } + } + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion); + } catch (Exception e) { + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occurred when answering activate_cluster_state_version request: " + e.getMessage()); e.printStackTrace(System.err); req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java index bda06248d9e..bf63aebe022 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java @@ -8,6 +8,6 @@ import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; * over regular RPC. */ public class DummyVdsNodeOptions { - // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+ - public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION; + // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+, 4 - 7.24+ + public int stateCommunicationVersion = RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_VERSION; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java index d59dbb4933a..e9eaf56085b 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java @@ -4,6 +4,7 @@ package com.yahoo.vespa.clustercontroller.core; import com.yahoo.vdslib.distribution.ConfiguredNode; import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.State; +import com.yahoo.vespa.clustercontroller.core.testutils.StateWaiter; import org.junit.Test; import java.util.ArrayList; @@ -29,8 +30,16 @@ public class RpcVersionAutoDowngradeTest extends FleetControllerTest { @Test public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception { - setUpFakeCluster(2); // HEAD is at v3 + setUpFakeCluster(2); // HEAD is at v4 waitForState("version:\\d+ distributor:10 storage:10"); } + @Test + public void implicit_activation_for_nodes_that_return_not_found_for_version_activation_rpc() throws Exception { + setUpFakeCluster(3); // HEAD is at v4 + waitForState("version:\\d+ distributor:10 storage:10"); + } + + // TODO partial version setup for simulating upgrades + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index 32de3591f2d..85106ce7e3c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -171,6 +171,9 @@ public class StateChangeTest extends FleetControllerTest { options.nodeStateRequestTimeoutMS = 60 * 60 * 1000; options.minTimeBetweenNewSystemStates = 0; options.maxInitProgressTime = 50000; + // This test makes very specific assumptions about the amount of work done in a single tick. + // Two-phase cluster state activation changes this quite a bit, so disable it. At least for now. + options.enableTwoPhaseClusterStateActivation = false; initialize(options); @@ -1035,7 +1038,7 @@ public class StateChangeTest extends FleetControllerTest { // Assert that the failed node has not acknowledged the latest version. // (The version may still be larger than versionBeforeChange if the fleet controller sends a // "stable system" update without timestamps in the meantime - assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getSystemStateVersionAcknowledged() < versionAfterChange); + assertTrue(fleetController.getCluster().getNodeInfo(nodes.get(1).getNode()).getClusterStateVersionBundleAcknowledged() < versionAfterChange); // Ensure non-concurrent access to getNewestSystemStateVersionSent synchronized(timer) { @@ -1343,11 +1346,19 @@ public class StateChangeTest extends FleetControllerTest { void sendAllDeferredDistributorClusterStateAcks() throws Exception { communicator.sendAllDeferredDistributorClusterStateAcks(); - ctrl.tick(); + ctrl.tick(); // Process cluster state bundle ACKs + if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) { + ctrl.tick(); // Send activations + ctrl.tick(); // Process activation ACKs + } } void processScheduledTask() throws Exception { ctrl.tick(); // Cluster state recompute iteration and send + if (ctrl.getOptions().enableTwoPhaseClusterStateActivation) { + ctrl.tick(); // Send activations + ctrl.tick(); // Process activation ACKs + } ctrl.tick(); // Iff ACKs were received, process version dependent task(s) } @@ -1440,7 +1451,7 @@ public class StateChangeTest extends FleetControllerTest { @Test public void no_op_synchronous_remote_task_waits_until_current_state_is_acked() throws Exception { - RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); + RemoteTaskFixture fixture = createFixtureWith(optionsWithZeroTransitionTime()); communicator.setShouldDeferDistributorClusterStateAcks(true); fixture.markStorageNodeDown(0); @@ -1523,7 +1534,9 @@ public class StateChangeTest extends FleetControllerTest { public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception { FleetControllerOptions options = defaultOptions(); options.minTimeBetweenNewSystemStates = 10_000; + //options.enableTwoPhaseClusterStateActivation = false; //// RemoteTaskFixture fixture = createFixtureWith(options); + // Have to increment timer here to be able to send state generated by the scheduled task timer.advanceTime(10_000); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index aa5219147ce..75b65f5d85f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -68,7 +68,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); } @@ -79,7 +79,7 @@ public class SystemStateBroadcasterTest { ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { // Only distributor 0 should observe startup timestamps @@ -97,7 +97,7 @@ public class SystemStateBroadcasterTest { StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); } @@ -111,7 +111,7 @@ public class SystemStateBroadcasterTest { ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); - f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { // Only distributor 0 should observe startup timestamps @@ -122,4 +122,15 @@ public class SystemStateBroadcasterTest { StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } + + @Test + public void activation_not_sent_before_all_distributors_have_acked_state_bundle() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewStateBundleIfRequired(dbContextFrom(cf.cluster()), f.mockCommunicator); + + // TODO + } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index 61e9d1a90de..9eb98f4f045 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -161,7 +161,7 @@ public class RPCCommunicatorTest { waiter.handleRequestDone(req); // This would normally be done in processResponses(), but that code path is not invoked in this test. - cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false); + cf.cluster().getNodeInfo(Node.ofStorage(1)).setClusterStateBundleVersionAcknowledged(123, false); f.receivedRequest.set(null); // Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java index d140ef998b6..9734156b13f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/testutils/WaitCondition.java @@ -29,6 +29,7 @@ public interface WaitCondition { abstract class StateWait implements WaitCondition { private final Object monitor; protected ClusterState currentState; + protected ClusterState convergedState; private final SystemStateListener listener = new SystemStateListener() { @Override public void handleNewPublishedState(ClusterStateBundle state) { @@ -37,6 +38,14 @@ public interface WaitCondition { monitor.notifyAll(); } } + + @Override + public void handleStateConvergedInCluster(ClusterStateBundle states) { + synchronized (monitor) { + currentState = convergedState = states.getBaselineClusterState(); + monitor.notifyAll(); + } + } }; public StateWait(FleetController fc, Object monitor) { @@ -90,8 +99,8 @@ public interface WaitCondition { @Override public String isConditionMet() { - if (currentState != null) { - lastCheckedState = currentState; + if (convergedState != null) { + lastCheckedState = convergedState; Matcher m = pattern.matcher(lastCheckedState.toString()); if (m.matches() || !checkSpaceSubset.isEmpty()) { if (nodesToCheck != null) { |