diff options
Diffstat (limited to 'clustercontroller-core')
13 files changed, 113 insertions, 35 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java index b57004eedce..26d63f7ba60 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java @@ -1,6 +1,9 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +/** + * Wrapper for a cluster state activation request towards a single node. + */ public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest { public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) { diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java index fda6ec19752..9e8abc0608e 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java @@ -1,6 +1,10 @@ // Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +/** + * Base class for distributor/content node node RPC requests that are bound + * to a particular cluster state version. + */ public abstract class ClusterStateVersionSpecificRequest { private final NodeInfo nodeInfo; @@ -24,6 +28,7 @@ public abstract class ClusterStateVersionSpecificRequest { final int returnCode; final String returnMessage; + final int actualVersion; public Reply() { this(0, null); @@ -32,6 +37,17 @@ public abstract class ClusterStateVersionSpecificRequest { public Reply(int returnCode, String returnMessage) { this.returnCode = returnCode; this.returnMessage = returnMessage; + this.actualVersion = -1; + } + + private Reply(int actualVersion) { + this.returnCode = 0; + this.returnMessage = null; + this.actualVersion = actualVersion; + } + + public static Reply withActualVersion(int version) { + return new Reply(version); } /** Returns whether this is an error response */ @@ -43,6 +59,9 @@ public abstract class ClusterStateVersionSpecificRequest { /** Returns the message returned, or null if none */ public String getReturnMessage() { return returnMessage; } + /** Returns actual cluster state version active on node, or -1 if reply does not contain this information */ + public int getActualVersion() { return actualVersion; } + } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index 791315bb3d1..ba35243c14d 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -688,7 +688,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! newStates.isEmpty()) { synchronized (systemStateListeners) { for (ClusterStateBundle stateBundle : newStates) { - for(SystemStateListener listener : systemStateListeners) { + for (SystemStateListener listener : systemStateListeners) { listener.handleNewPublishedState(stateBundle); } } @@ -698,7 +698,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd if ( ! convergedStates.isEmpty()) { synchronized (systemStateListeners) { for (ClusterStateBundle stateBundle : convergedStates) { - for(SystemStateListener listener : systemStateListeners) { + for (SystemStateListener listener : systemStateListeners) { listener.handleStateConvergedInCluster(stateBundle); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index 58e2fa14d4f..82d13e2d9ef 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -77,9 +77,9 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { */ private int version; - // Mapping of cluster state version -> cluster state instance - private TreeMap<Integer, ClusterState> clusterStateVersionBundleSent = new TreeMap<>(); - private ClusterState clusterStateVersionBundleAcknowledged; + // Mapping of cluster state version -> cluster state bundle instance + private TreeMap<Integer, ClusterStateBundle> clusterStateVersionBundleSent = new TreeMap<>(); + private ClusterStateBundle clusterStateVersionBundleAcknowledged; private int clusterStateVersionActivationSent = -1; private int clusterStateVersionActivationAcked = -1; @@ -420,7 +420,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { if (clusterStateVersionBundleSent.isEmpty()) { return null; } - return clusterStateVersionBundleSent.lastEntry().getValue(); + return clusterStateVersionBundleSent.lastEntry().getValue().getBaselineClusterState(); } public int getNewestSystemStateVersionSent() { ClusterState last = getNewestSystemStateSent(); @@ -430,14 +430,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public int getClusterStateVersionBundleAcknowledged() { return (clusterStateVersionBundleAcknowledged == null ? -1 : clusterStateVersionBundleAcknowledged.getVersion()); } - public void setClusterStateVersionBundleSent(ClusterState state) { - if (state == null) { + public void setClusterStateVersionBundleSent(ClusterStateBundle stateBundle) { + if (stateBundle == null) { throw new Error("Should not clear info for last version sent"); } - if (clusterStateVersionBundleSent.containsKey(state.getVersion())) { - throw new IllegalStateException("We have already sent cluster state version " + state.getVersion() + " to " + node); + if (clusterStateVersionBundleSent.containsKey(stateBundle.getVersion())) { + throw new IllegalStateException("We have already sent cluster state version " + stateBundle.getVersion() + " to " + node); } - clusterStateVersionBundleSent.put(state.getVersion(), state); + clusterStateVersionBundleSent.put(stateBundle.getVersion(), stateBundle); } public void setClusterStateBundleVersionAcknowledged(Integer version, boolean success) { if (version == null) { @@ -446,14 +446,15 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { if (!clusterStateVersionBundleSent.containsKey(version)) { throw new IllegalStateException("Got response for cluster state " + version + " which is not tracked as pending for node " + node); } - ClusterState state = clusterStateVersionBundleSent.remove(version); - if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < state.getVersion())) { - clusterStateVersionBundleAcknowledged = state; + var stateBundle = clusterStateVersionBundleSent.remove(version); + if (success && (clusterStateVersionBundleAcknowledged == null || clusterStateVersionBundleAcknowledged.getVersion() < stateBundle.getVersion())) { + clusterStateVersionBundleAcknowledged = stateBundle; if (wentDownWithStartTime != 0 - && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < state.getVersion()) - && !state.getNodeState(node).getState().oneOf("dsm")) + && (wentDownAtClusterState == null || wentDownAtClusterState.getVersion() < stateBundle.getVersion()) + && !stateBundle.getBaselineClusterState().getNodeState(node).getState().oneOf("dsm")) { - log.log(LogLevel.DEBUG, "Clearing going down timestamp of node " + node + " after receiving ack of cluster state " + state); + log.log(LogLevel.DEBUG, () -> String.format("Clearing going down timestamp of node %s after " + + "receiving ack of cluster state bundle %s", node, stateBundle)); wentDownWithStartTime = 0; } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 84d8accd7f4..7898e90f77a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -87,17 +87,25 @@ public class SystemStateBroadcaster { NodeInfo info = req.getNodeInfo(); int version = req.getClusterStateVersion(); boolean success = true; - if (req.getReply().isError()) { + var reply = req.getReply(); + if (reply.isError()) { // NO_SUCH_METHOD implies node is on a version that does not understand explicit activations // and it has already merrily started using the state version. Treat as if it had been ACKed. - if (req.getReply().getReturnCode() != ErrorCode.NO_SUCH_METHOD) { + if (reply.getReturnCode() != ErrorCode.NO_SUCH_METHOD) { log.log(LogLevel.DEBUG, () -> String.format("Activation NACK for node %s with version %d, message %s", - info, version, req.getReply().getReturnMessage())); + info, version, reply.getReturnMessage())); success = false; } else { log.log(LogLevel.DEBUG, () -> String.format("Node %s did not understand state activation RPC; " + "implicitly treating state %d as activated on node", info, version)); } + } else if (reply.getActualVersion() != version) { + log.log(LogLevel.DEBUG, () -> String.format("Activation of version %d did not take effect, node %s " + + "reports it has an actual pending version of %d", version, info, reply.getActualVersion())); + success = false; + } else { + log.log(LogLevel.DEBUG, () -> String.format("Node %s reports successful activation of state " + + "version %d", info, version)); } info.setSystemStateVersionActivationAcked(version, success); // TODO we currently don't invoke reportNodeError here.. We assume that node errors will be reported diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java index 32d4046c181..c2f48ccf589 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java @@ -5,6 +5,9 @@ import com.yahoo.jrt.Request; import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest; import com.yahoo.vespa.clustercontroller.core.NodeInfo; +/** + * FRT RPC state implementation of a single cluster state activation request. + */ public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { Request request; diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java index c5b1da0cb66..175a0b50cd6 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java @@ -9,6 +9,10 @@ import com.yahoo.vespa.clustercontroller.core.Communicator; import com.yahoo.vespa.clustercontroller.core.NodeInfo; import com.yahoo.vespa.clustercontroller.core.Timer; +/** + * Binds together the reply received for a particular cluster state activation RPC and + * the cluster controller-internal callback handler which expects to receive it. + */ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { private final Communicator.Waiter<ActivateClusterStateVersionRequest> waiter; @@ -29,7 +33,8 @@ public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { } else if (!req.checkReturnTypes("i")) { return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info); } - return new ActivateClusterStateVersionRequest.Reply(); + int actualVersion = req.returnValues().get(0).asInt32(); + return ActivateClusterStateVersionRequest.Reply.withActualVersion(actualVersion); } @Override diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index f5629bda343..c3c5c9e3b98 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -162,7 +162,7 @@ public class RPCCommunicator implements Communicator { waiter.setRequest(stateRequest); connection.invokeAsync(req, 60, waiter); - node.setClusterStateVersionBundleSent(baselineState); + node.setClusterStateVersionBundleSent(stateBundle); } @Override diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java index 127211fc911..339d305e823 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleTest.java @@ -154,9 +154,9 @@ public class ClusterStateBundleTest { public void simple_bundle_without_derived_states_propagates_deferred_activation_flag() { var bundle = ClusterStateBundle .builder(annotatedStateOf("distributor:2 storage:2")) - .deferredActivation(false) // defaults to true + .deferredActivation(true) // defaults to false .deriveAndBuild(); - assertFalse(bundle.deferredActivation()); + assertTrue(bundle.deferredActivation()); } @Test diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index 185fdc0c7ca..8314839336e 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -24,7 +24,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { this.shouldDeferDistributorClusterStateAcks = shouldDeferDistributorClusterStateAcks; } - public class DummyGetNodeStateRequest extends GetNodeStateRequest { + class DummyGetNodeStateRequest extends GetNodeStateRequest { Waiter<GetNodeStateRequest> waiter; public DummyGetNodeStateRequest(NodeInfo nodeInfo, Waiter<GetNodeStateRequest> waiter) { @@ -97,7 +97,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { public void setSystemState(ClusterStateBundle stateBundle, NodeInfo node, Waiter<SetClusterStateRequest> waiter) { ClusterState baselineState = stateBundle.getBaselineClusterState(); DummySetClusterStateRequest req = new DummySetClusterStateRequest(node, baselineState); - node.setClusterStateVersionBundleSent(baselineState); + node.setClusterStateVersionBundleSent(stateBundle); req.setReply(new SetClusterStateRequest.Reply()); if (node.isStorage() || !shouldDeferDistributorClusterStateAcks) { waiter.done(req); @@ -109,7 +109,7 @@ public class DummyCommunicator implements Communicator, NodeLookup { @Override public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> waiter) { var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion); - req.setReply(new ActivateClusterStateVersionRequest.Reply()); + req.setReply(ActivateClusterStateVersionRequest.Reply.withActualVersion(clusterStateVersion)); waiter.done(req); } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index caa3fbcd6cd..d8994ff52df 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -580,11 +580,12 @@ public class DummyVdsNode { synchronized(timer) { int actualVersion = getLatestSystemStateVersion().orElse(0); req.returnValues().add(new Int32Value(actualVersion)); - if (activateVersion != actualVersion) { - req.setError(ErrorCode.METHOD_FAILED, "State version mismatch"); - } else { + if (activateVersion == actualVersion) { activatedClusterStateVersion = activateVersion; timer.notifyAll(); + } else { + log.log(LogLevel.DEBUG, () -> String.format("Dummy node %s: got a mismatching activation (request version %d, " + + "actual %d), not marking version as active", this, activateVersion, actualVersion)); } } log.log(LogLevel.DEBUG, "Dummy node " + this + ": Activating cluster state version " + activateVersion); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java index 4c040798eb0..ca246b3549f 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateChangeTest.java @@ -1534,7 +1534,6 @@ public class StateChangeTest extends FleetControllerTest { public void cluster_state_ack_is_not_dependent_on_state_send_grace_period() throws Exception { FleetControllerOptions options = defaultOptions(); options.minTimeBetweenNewSystemStates = 10_000; - //options.enableTwoPhaseClusterStateActivation = false; //// RemoteTaskFixture fixture = createFixtureWith(options); // Have to increment timer here to be able to send state generated by the scheduled task diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 98ad19b954a..a7c7845fea9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -11,6 +11,7 @@ import org.mockito.ArgumentCaptor; import java.util.stream.Stream; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -157,7 +158,7 @@ public class SystemStateBroadcasterTest { ClusterStateBundle stateBundle, Communicator.Waiter<SetClusterStateRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... - nodeInfo.setClusterStateVersionBundleSent(stateBundle.getBaselineClusterState()); + nodeInfo.setClusterStateVersionBundleSent(stateBundle); var req = new MockSetClusterStateRequest(nodeInfo, stateBundle.getVersion()); req.setReply(new ClusterStateVersionSpecificRequest.Reply()); @@ -166,15 +167,22 @@ public class SystemStateBroadcasterTest { private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, ClusterStateBundle stateBundle, + int actualVersion, Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { // Have to patch in that we've actually sent the bundle in the first place... nodeInfo.setClusterStateVersionActivationSent(stateBundle.getVersion()); var req = new MockActivateClusterStateVersionRequest(nodeInfo, stateBundle.getVersion()); - req.setReply(new ClusterStateVersionSpecificRequest.Reply()); + req.setReply(ClusterStateVersionSpecificRequest.Reply.withActualVersion(actualVersion)); waiter.done(req); } + private void respondToActivateClusterStateVersion(NodeInfo nodeInfo, + ClusterStateBundle stateBundle, + Communicator.Waiter<ActivateClusterStateVersionRequest> waiter) { + respondToActivateClusterStateVersion(nodeInfo, stateBundle, stateBundle.getVersion(), waiter); + } + private static class StateActivationFixture extends Fixture { ClusterStateBundle stateBundle; ClusterFixture cf; @@ -226,8 +234,7 @@ public class SystemStateBroadcasterTest { // No activations should be sent yet cf.cluster().getNodeInfo().forEach(nodeInfo -> { - verify(f.mockCommunicator, times(0)).activateClusterStateVersion( - eq(123), eq(nodeInfo), any()); + verify(f.mockCommunicator, times(0)).activateClusterStateVersion(eq(123), eq(nodeInfo), any()); }); assertNull(f.broadcaster.getLastClusterStateBundleConverged()); @@ -298,4 +305,36 @@ public class SystemStateBroadcasterTest { }); } + @Test + public void activation_convergence_considers_actual_version_returned_from_node() { + var f = StateActivationFixture.withTwoPhaseEnabled(); + var cf = f.cf; + + f.expectSetSystemStateInvocationsToBothDistributors(); + f.simulateBroadcastTick(cf); + // ACK state bundle from both distributors + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(0)), f.stateBundle, f.d0Waiter.getValue()); + respondToSetClusterStateBundle(cf.cluster.getNodeInfo(Node.ofDistributor(1)), f.stateBundle, f.d1Waiter.getValue()); + + f.simulateBroadcastTick(cf); + + final var d0ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + final var d1ActivateWaiter = ArgumentCaptor.forClass(Communicator.Waiter.class); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(0), Node.ofDistributor(1)).forEach(nodeInfo -> { + verify(f.mockCommunicator).activateClusterStateVersion(eq(123), eq(nodeInfo), + (nodeInfo.getNodeIndex() == 0 ? d0ActivateWaiter : d1ActivateWaiter).capture()); + }); + + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(0)), + f.stateBundle, d0ActivateWaiter.getValue()); + // Distributor 1 reports higher actual version, should not cause this version to be + // considered converged since it's not an exact version match. + respondToActivateClusterStateVersion(cf.cluster.getNodeInfo(Node.ofDistributor(1)), + f.stateBundle, 124, d1ActivateWaiter.getValue()); + f.simulateBroadcastTick(cf); + + assertNull(f.broadcaster.getLastClusterStateBundleConverged()); + } + } |