diff options
Diffstat (limited to 'clustercontroller-core/src/main/java')
8 files changed, 61 insertions, 22 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java index b57004eedce..26d63f7ba60 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java @@ -1,6 +1,9 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +/** + * Wrapper for a cluster state activation request towards a single node. + */ public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest { public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java index fda6ec19752..9e8abc0608e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java @@ -1,6 +1,10 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +/** + * Base class for distributor/content node node RPC requests that are bound + * to a particular cluster state version. + */ public abstract class ClusterStateVersionSpecificRequest { private final NodeInfo nodeInfo; @@ -24,6 +28,7 @@ public abstract class ClusterStateVersionSpecificRequest { final int returnCode; final String returnMessage; + final int actualVersion; public Reply() { this(0, null); @@ -32,6 +37,17 @@ public abstract class ClusterStateVersionSpecificRequest { public Reply(int returnCode, String returnMessage) { this.returnCode = returnCode; this.returnMessage = returnMessage; + this.actualVersion = -1; + } + + private Reply(int actualVersion) { + this.returnCode = 0; + this.returnMessage = null; + this.actualVersion = actualVersion; + } + + public static Reply withActualVersion(int version) { + return new Reply(version); } /** Returns whether this is an error response */ @@ -43,6 +59,9 @@ public abstract class ClusterStateVersionSpecificRequest { /** Returns the message returned, or null if none */ public String getReturnMessage() { return returnMessage; } + /** Returns actual cluster state version active on node, or -1 if reply does not contain this information */ + public int getActualVersion() { return actualVersion; } + } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 791315bb3d1..ba35243c14d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -688,7 +688,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! newStates.isEmpty()) { synchronized (systemStateListeners) { for (ClusterStateBundle stateBundle : newStates) { - for(SystemStateListener listener : systemStateListeners) { + for (SystemStateListener listener : systemStateListeners) { listener.handleNewPublishedState(stateBundle); } } @@ -698,7 +698,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! convergedStates.isEmpty()) { synchronized (systemStateListeners) { for (ClusterStateBundle stateBundle : convergedStates) { - for(SystemStateListener listener : systemStateListeners) { + for (SystemStateListener listener : systemStateListeners) { listener.handleStateConvergedInCluster(stateBundle); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index 58e2fa14d4f..82d13e2d9ef 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -77,9 +77,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { */ private int version; - // Mapping of cluster state version -> cluster state instance - private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>(); - private ClusterState clusterStateVersionBundleAcknowledged; + // Mapping of cluster state version -> cluster state bundle instance + private TreeMap<Integer, ClusterStateBundle> clusterStateVersionBundleSent = new TreeMap<>(); + private ClusterStateBundle clusterStateVersionBundleAcknowledged; private int clusterStateVersionActivationSent = -1; private int clusterStateVersionActivationAcked = -1; @@ -420,7 +420,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { if (clusterStateVersionBundleSent.isEmpty()) { return null; } - return clusterStateVersionBundleSent.lastEntry().getValue(); + return clusterStateVersionBundleSent.lastEntry().getValue().getBaselineClusterState(); } public int getNewestSystemStateVersionSent() { ClusterState last = getNewestSystemStateSent(); @@ -430,14 +430,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public int getClusterStateVersionBundleAcknowledged() { return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion()); } - public void setClusterStateVersionBundleSent(ClusterState state) { - if (state == null) { + public void setClusterStateVersionBundleSent(ClusterStateBundle stateBundle) { + if (stateBundle == null) { throw new Error("Should not clear info for last version sent"); } - if (clusterStateVersionBundleSent.containsKey(state.getVersion())) { - throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node); + if (clusterStateVersionBundleSent.containsKey(stateBundle.getVersion())) { + throw new IllegalStateException("We have already sent cluster state version " + stateBundle.getVersion() + " to " + node); } - clusterStateVersionBundleSent.put(state.getVersion(), state); + clusterStateVersionBundleSent.put(stateBundle.getVersion(), stateBundle); } public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) { if (version == null) { @@ -446,14 +446,15 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { if (!clusterStateVersionBundleSent.containsKey(version)) { throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node); } - ClusterState state = clusterStateVersionBundleSent.remove(version); - if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) { - clusterStateVersionBundleAcknowledged = state; + var stateBundle = clusterStateVersionBundleSent.remove(version); + if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < stateBundle.getVersion())) { + clusterStateVersionBundleAcknowledged = stateBundle; if (wentDownWithStartTime != 0 - && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion()) - && !state.getNodeState(node).getState().oneOf("dsm")) + && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < stateBundle.getVersion()) + && !stateBundle.getBaselineClusterState().getNodeState(node).getState().oneOf("dsm")) { - log.log(LogLevel.DEBUG, "Clearing going down timestamp of node " + node + " after receiving ack of cluster state " + state); + log.log(LogLevel.DEBUG, () -> String.format("Clearing going down timestamp of node %s after " + + "receiving ack of cluster state bundle %s", node, stateBundle)); wentDownWithStartTime = 0; } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 84d8accd7f4..7898e90f77a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -87,17 +87,25 @@ public class SystemStateBroadcaster { NodeInfo info = req.getNodeInfo(); int version = req.getClusterStateVersion(); boolean success = true; - if (req.getReply().isError()) { + var reply = req.getReply(); + if (reply.isError()) { // NO_SUCH_METHOD implies node is on a version that does not understand explicit activations // and it has already merrily started using the state version. Treat as if it had been ACKed. - if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) { + if (reply.getReturnCode() != ErrorCode.NO_SUCH_METHOD) { log.log(LogLevel.DEBUG, () -> String.format("Activation NACK for node %s with version %d, message %s", - info, version, req.getReply().getReturnMessage())); + info, version, reply.getReturnMessage())); success = false; } else { log.log(LogLevel.DEBUG, () -> String.format("Node %s did not understand state activation RPC; " + "implicitly treating state %d as activated on node", info, version)); } + } else if (reply.getActualVersion() != version) { + log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " + + "reports it has an actual pending version of %d", version, info, reply.getActualVersion())); + success = false; + } else { + log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " + + "version %d", info, version)); } info.setSystemStateVersionActivationAcked(version, success); // TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java index 32d4046c181..c2f48ccf589 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java @@ -5,6 +5,9 @@ import com.yahoo.jrt.Request; import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest; import com.yahoo.vespa.clustercontroller.core.NodeInfo; +/** + * FRT RPC state implementation of a single cluster state activation request. + */ public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { Request request; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java index c5b1da0cb66..175a0b50cd6 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java @@ -9,6 +9,10 @@ import com.yahoo.vespa.clustercontroller.core.Communicator; import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.Timer; +/** + * Binds together the reply received for a particular cluster state activation RPC and + * the cluster controller-internal callback handler which expects to receive it. + */ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter; @@ -29,7 +33,8 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { } else if (!req.checkReturnTypes("i")) { return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info); } - return new ActivateClusterStateVersionRequest.Reply(); + int actualVersion = req.returnValues().get(0).asInt32(); + return ActivateClusterStateVersionRequest.Reply.withActualVersion(actualVersion); } @Override diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index f5629bda343..c3c5c9e3b98 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -162,7 +162,7 @@ public class RPCCommunicator implements Communicator { waiter.setRequest(stateRequest); connection.invokeAsync(req, 60, waiter); - node.setClusterStateVersionBundleSent(baselineState); + node.setClusterStateVersionBundleSent(stateBundle); } @Override |