aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main/java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java19
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java4
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java31
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java14
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java3
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java7
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java2
8 files changed, 61 insertions, 22 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
index b57004eedce..26d63f7ba60 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
@@ -1,6 +1,9 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+/**
+ * Wrapper for a cluster state activation request towards a single node.
+ */
public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest {
public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) {
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
index fda6ec19752..9e8abc0608e 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
@@ -1,6 +1,10 @@
// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+/**
+ * Base class for distributor/content node node RPC requests that are bound
+ * to a particular cluster state version.
+ */
public abstract class ClusterStateVersionSpecificRequest {
private final NodeInfo nodeInfo;
@@ -24,6 +28,7 @@ public abstract class ClusterStateVersionSpecificRequest {
final int returnCode;
final String returnMessage;
+ final int actualVersion;
public Reply() {
this(0, null);
@@ -32,6 +37,17 @@ public abstract class ClusterStateVersionSpecificRequest {
public Reply(int returnCode, String returnMessage) {
this.returnCode = returnCode;
this.returnMessage = returnMessage;
+ this.actualVersion = -1;
+ }
+
+ private Reply(int actualVersion) {
+ this.returnCode = 0;
+ this.returnMessage = null;
+ this.actualVersion = actualVersion;
+ }
+
+ public static Reply withActualVersion(int version) {
+ return new Reply(version);
}
/** Returns whether this is an error response */
@@ -43,6 +59,9 @@ public abstract class ClusterStateVersionSpecificRequest {
/** Returns the message returned, or null if none */
public String getReturnMessage() { return returnMessage; }
+ /** Returns actual cluster state version active on node, or -1 if reply does not contain this information */
+ public int getActualVersion() { return actualVersion; }
+
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index 791315bb3d1..ba35243c14d 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -688,7 +688,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! newStates.isEmpty()) {
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : newStates) {
- for(SystemStateListener listener : systemStateListeners) {
+ for (SystemStateListener listener : systemStateListeners) {
listener.handleNewPublishedState(stateBundle);
}
}
@@ -698,7 +698,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
if ( ! convergedStates.isEmpty()) {
synchronized (systemStateListeners) {
for (ClusterStateBundle stateBundle : convergedStates) {
- for(SystemStateListener listener : systemStateListeners) {
+ for (SystemStateListener listener : systemStateListeners) {
listener.handleStateConvergedInCluster(stateBundle);
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 58e2fa14d4f..82d13e2d9ef 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -77,9 +77,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
*/
private int version;
- // Mapping of cluster state version -> cluster state instance
- private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>();
- private ClusterState clusterStateVersionBundleAcknowledged;
+ // Mapping of cluster state version -> cluster state bundle instance
+ private TreeMap<Integer, ClusterStateBundle> clusterStateVersionBundleSent = new TreeMap<>();
+ private ClusterStateBundle clusterStateVersionBundleAcknowledged;
private int clusterStateVersionActivationSent = -1;
private int clusterStateVersionActivationAcked = -1;
@@ -420,7 +420,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (clusterStateVersionBundleSent.isEmpty()) {
return null;
}
- return clusterStateVersionBundleSent.lastEntry().getValue();
+ return clusterStateVersionBundleSent.lastEntry().getValue().getBaselineClusterState();
}
public int getNewestSystemStateVersionSent() {
ClusterState last = getNewestSystemStateSent();
@@ -430,14 +430,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public int getClusterStateVersionBundleAcknowledged() {
return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion());
}
- public void setClusterStateVersionBundleSent(ClusterState state) {
- if (state == null) {
+ public void setClusterStateVersionBundleSent(ClusterStateBundle stateBundle) {
+ if (stateBundle == null) {
throw new Error("Should not clear info for last version sent");
}
- if (clusterStateVersionBundleSent.containsKey(state.getVersion())) {
- throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node);
+ if (clusterStateVersionBundleSent.containsKey(stateBundle.getVersion())) {
+ throw new IllegalStateException("We have already sent cluster state version " + stateBundle.getVersion() + " to " + node);
}
- clusterStateVersionBundleSent.put(state.getVersion(), state);
+ clusterStateVersionBundleSent.put(stateBundle.getVersion(), stateBundle);
}
public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) {
if (version == null) {
@@ -446,14 +446,15 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
if (!clusterStateVersionBundleSent.containsKey(version)) {
throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node);
}
- ClusterState state = clusterStateVersionBundleSent.remove(version);
- if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) {
- clusterStateVersionBundleAcknowledged = state;
+ var stateBundle = clusterStateVersionBundleSent.remove(version);
+ if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < stateBundle.getVersion())) {
+ clusterStateVersionBundleAcknowledged = stateBundle;
if (wentDownWithStartTime != 0
- && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion())
- && !state.getNodeState(node).getState().oneOf("dsm"))
+ && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < stateBundle.getVersion())
+ && !stateBundle.getBaselineClusterState().getNodeState(node).getState().oneOf("dsm"))
{
- log.log(LogLevel.DEBUG, "Clearing going down timestamp of node " + node + " after receiving ack of cluster state " + state);
+ log.log(LogLevel.DEBUG, () -> String.format("Clearing going down timestamp of node %s after " +
+ "receiving ack of cluster state bundle %s", node, stateBundle));
wentDownWithStartTime = 0;
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 84d8accd7f4..7898e90f77a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -87,17 +87,25 @@ public class SystemStateBroadcaster {
NodeInfo info = req.getNodeInfo();
int version = req.getClusterStateVersion();
boolean success = true;
- if (req.getReply().isError()) {
+ var reply = req.getReply();
+ if (reply.isError()) {
// NO_SUCH_METHOD implies node is on a version that does not understand explicit activations
// and it has already merrily started using the state version. Treat as if it had been ACKed.
- if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
+ if (reply.getReturnCode() != ErrorCode.NO_SUCH_METHOD) {
log.log(LogLevel.DEBUG, () -> String.format("Activation NACK for node %s with version %d, message %s",
- info, version, req.getReply().getReturnMessage()));
+ info, version, reply.getReturnMessage()));
success = false;
} else {
log.log(LogLevel.DEBUG, () -> String.format("Node %s did not understand state activation RPC; " +
"implicitly treating state %d as activated on node", info, version));
}
+ } else if (reply.getActualVersion() != version) {
+ log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " +
+ "reports it has an actual pending version of %d", version, info, reply.getActualVersion()));
+ success = false;
+ } else {
+ log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " +
+ "version %d", info, version));
}
info.setSystemStateVersionActivationAcked(version, success);
// TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
index 32d4046c181..c2f48ccf589 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
@@ -5,6 +5,9 @@ import com.yahoo.jrt.Request;
import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
+/**
+ * FRT RPC state implementation of a single cluster state activation request.
+ */
public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
Request request;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
index c5b1da0cb66..175a0b50cd6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
@@ -9,6 +9,10 @@ import com.yahoo.vespa.clustercontroller.core.Communicator;
import com.yahoo.vespa.clustercontroller.core.NodeInfo;
import com.yahoo.vespa.clustercontroller.core.Timer;
+/**
+ * Binds together the reply received for a particular cluster state activation RPC and
+ * the cluster controller-internal callback handler which expects to receive it.
+ */
public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter;
@@ -29,7 +33,8 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
} else if (!req.checkReturnTypes("i")) {
return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
- return new ActivateClusterStateVersionRequest.Reply();
+ int actualVersion = req.returnValues().get(0).asInt32();
+ return ActivateClusterStateVersionRequest.Reply.withActualVersion(actualVersion);
}
@Override
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index f5629bda343..c3c5c9e3b98 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -162,7 +162,7 @@ public class RPCCommunicator implements Communicator {
waiter.setRequest(stateRequest);
connection.invokeAsync(req, 60, waiter);
- node.setClusterStateVersionBundleSent(baselineState);
+ node.setClusterStateVersionBundleSent(stateBundle);
}
@Override