summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-06 14:23:53 +0100
committerTor Brede Vekterli <vekterli@verizonmedia.com>2019-03-14 14:43:02 +0000
commit05f27f6cfcda786232fa6da47154784dce2483e1 (patch)
tree8306e87d559411a3fc282697bba5ecc429df34b6 /clustercontroller-core
parenta925ce4a159ff5baf1e6e4cac632794fa2ba6547 (diff)
Initial groundwork for cluster state version activation RPC
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java10
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java48
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java44
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java31
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java17
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java44
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java28
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java15
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java29
10 files changed, 206 insertions, 62 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
new file mode 100644
index 00000000000..b57004eedce
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java
@@ -0,0 +1,10 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest {
+
+ public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) {
+ super(nodeInfo, systemStateVersion);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
new file mode 100644
index 00000000000..fda6ec19752
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java
@@ -0,0 +1,48 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+public abstract class ClusterStateVersionSpecificRequest {
+
+ private final NodeInfo nodeInfo;
+ private final int clusterStateVersion;
+ private Reply reply;
+
+ public ClusterStateVersionSpecificRequest(NodeInfo nodeInfo, int clusterStateVersion) {
+ this.nodeInfo = nodeInfo;
+ this.clusterStateVersion = clusterStateVersion;
+ }
+
+ public NodeInfo getNodeInfo() { return nodeInfo; }
+
+ public int getClusterStateVersion() { return clusterStateVersion; }
+
+ public void setReply(Reply reply) { this.reply = reply; }
+
+ public Reply getReply() { return reply; }
+
+ public static class Reply {
+
+ final int returnCode;
+ final String returnMessage;
+
+ public Reply() {
+ this(0, null);
+ }
+
+ public Reply(int returnCode, String returnMessage) {
+ this.returnCode = returnCode;
+ this.returnMessage = returnMessage;
+ }
+
+ /** Returns whether this is an error response */
+ public boolean isError() { return returnCode != 0; }
+
+ /** Returns the return code, which is 0 if this request was successful */
+ public int getReturnCode() { return returnCode; }
+
+ /** Returns the message returned, or null if none */
+ public String getReturnMessage() { return returnMessage; }
+
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
index 450513343b0..900eee54cd3 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java
@@ -20,6 +20,8 @@ public interface Communicator {
void setSystemState(ClusterStateBundle states, NodeInfo node, Waiter<SetClusterStateRequest> waiter);
+ void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> waiter);
+
void shutdown();
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java
index 836876b5642..d4e79a4f2b2 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java
@@ -1,48 +1,10 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
-public abstract class SetClusterStateRequest {
-
- private final NodeInfo nodeInfo;
- private final int systemStateVersion;
- private Reply reply;
-
- public SetClusterStateRequest(NodeInfo nodeInfo, int systemStateVersion) {
- this.nodeInfo = nodeInfo;
- this.systemStateVersion = systemStateVersion;
- }
-
- public NodeInfo getNodeInfo() { return nodeInfo; }
-
- public int getSystemStateVersion() { return systemStateVersion; }
-
- public void setReply(Reply reply) { this.reply = reply; }
-
- public Reply getReply() { return reply; }
-
- public static class Reply {
-
- final int returnCode;
- final String returnMessage;
-
- public Reply() {
- this(0, null);
- }
-
- public Reply(int returnCode, String returnMessage) {
- this.returnCode = returnCode;
- this.returnMessage = returnMessage;
- }
-
- /** Returns whether this is an error response */
- public boolean isError() { return returnCode != 0; }
-
- /** Returns the return code, which is 0 if this request was successful */
- public int getReturnCode() { return returnCode; }
-
- /** Returns the message returned, or null if none */
- public String getReturnMessage() { return returnMessage; }
+public abstract class SetClusterStateRequest extends ClusterStateVersionSpecificRequest {
+ public SetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) {
+ super(nodeInfo, clusterStateVersion);
}
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 3683fe342bc..a40a45fd48a 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -19,13 +19,13 @@ public class SystemStateBroadcaster {
private final Timer timer;
private final Object monitor;
private ClusterStateBundle clusterStateBundle;
- private final List<SetClusterStateRequest> replies = new LinkedList<>();
+ private final List<SetClusterStateRequest> setClusterStateReplies = new LinkedList<>();
private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000;
private final Map<Node, Long> lastErrorReported = new TreeMap<>();
private int lastClusterStateInSync = 0;
- private final ClusterStateWaiter waiter = new ClusterStateWaiter();
+ private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter();
public SystemStateBroadcaster(Timer timer, Object monitor) {
this.timer = timer;
@@ -56,19 +56,21 @@ public class SystemStateBroadcaster {
long time = timer.getCurrentTimeInMillis();
Long lastReported = lastErrorReported.get(info.getNode());
boolean alreadySeen = (lastReported != null && time - lastReported < minTimeBetweenNodeErrorLogging);
- log.log(nodeOk && !alreadySeen ? LogLevel.WARNING : LogLevel.DEBUG, message);
- if (!alreadySeen) lastErrorReported.put(info.getNode(), time);
+ log.log((nodeOk && !alreadySeen) ? LogLevel.WARNING : LogLevel.DEBUG, message);
+ if (!alreadySeen) {
+ lastErrorReported.put(info.getNode(), time);
+ }
}
public boolean processResponses() {
boolean anyResponsesFound = false;
synchronized(monitor) {
- for(SetClusterStateRequest req : replies) {
+ for (SetClusterStateRequest req : setClusterStateReplies) {
anyResponsesFound = true;
NodeInfo info = req.getNodeInfo();
boolean nodeOk = info.getReportedState().getState().oneOf("uir");
- int version = req.getSystemStateVersion();
+ int version = req.getClusterStateVersion();
if (req.getReply().isError()) {
info.setSystemStateVersionAcknowledged(version, false);
@@ -85,7 +87,7 @@ public class SystemStateBroadcaster {
lastErrorReported.remove(info.getNode());
}
}
- replies.clear();
+ setClusterStateReplies.clear();
}
return anyResponsesFound;
}
@@ -159,11 +161,11 @@ public class SystemStateBroadcaster {
ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext));
log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion()
+ " to node " + node + ": " + modifiedBundle);
- communicator.setSystemState(modifiedBundle, node, waiter);
+ communicator.setSystemState(modifiedBundle, node, setClusterStateWaiter);
} else {
log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node
+ ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")");
- communicator.setSystemState(clusterStateBundle, node, waiter);
+ communicator.setSystemState(clusterStateBundle, node, setClusterStateWaiter);
}
}
@@ -188,13 +190,20 @@ public class SystemStateBroadcaster {
return newState;
}
- private class ClusterStateWaiter implements Communicator.Waiter<SetClusterStateRequest> {
+ private class SetClusterStateWaiter implements Communicator.Waiter<SetClusterStateRequest> {
@Override
public void done(SetClusterStateRequest reply) {
synchronized (monitor) {
- replies.add(reply);
+ setClusterStateReplies.add(reply);
}
}
}
+ private class ActivateClusterStateVersionWaiter implements Communicator.Waiter<ActivateClusterStateVersionRequest> {
+ @Override
+ public void done(ActivateClusterStateVersionRequest reply) {
+ // TODO
+ }
+ }
+
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
new file mode 100644
index 00000000000..32d4046c181
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java
@@ -0,0 +1,17 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.jrt.Request;
+import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
+import com.yahoo.vespa.clustercontroller.core.NodeInfo;
+
+public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
+
+ Request request;
+
+ public RPCActivateClusterStateVersionRequest(NodeInfo nodeInfo, Request request, int clusterStateVersion) {
+ super(nodeInfo, clusterStateVersion);
+ this.request = request;
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
new file mode 100644
index 00000000000..f19bb5ad9b8
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java
@@ -0,0 +1,44 @@
+// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.jrt.ErrorCode;
+import com.yahoo.jrt.Request;
+import com.yahoo.jrt.RequestWaiter;
+import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
+import com.yahoo.vespa.clustercontroller.core.Communicator;
+import com.yahoo.vespa.clustercontroller.core.NodeInfo;
+import com.yahoo.vespa.clustercontroller.core.Timer;
+
+public class RPCActivateClusterStateVersionWaiter implements RequestWaiter {
+
+ ActivateClusterStateVersionRequest request;
+ Timer timer;
+ Communicator.Waiter<ActivateClusterStateVersionRequest> waiter;
+
+ public RPCActivateClusterStateVersionWaiter(Communicator.Waiter<ActivateClusterStateVersionRequest> waiter, Timer timer) {
+ this.timer = timer;
+ this.waiter = waiter;
+ }
+
+ public void setRequest(RPCActivateClusterStateVersionRequest request) {
+ this.request = request;
+ }
+
+ public ActivateClusterStateVersionRequest.Reply getReply(Request req) {
+ NodeInfo info = request.getNodeInfo();
+ if (req.isError()) {
+ return new ActivateClusterStateVersionRequest.Reply(req.errorCode(), req.errorMessage());
+ } else if (!req.checkReturnTypes("")) {
+ return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
+ }
+ return new ActivateClusterStateVersionRequest.Reply();
+ }
+
+ @Override
+ public void handleRequestDone(Request request) {
+ ActivateClusterStateVersionRequest.Reply reply = getReply(request);
+ this.request.setReply(reply);
+ waiter.done(this.request);
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 9089da68e10..a28da4d02a1 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -15,6 +15,7 @@ import com.yahoo.vdslib.state.NodeState;
import com.yahoo.vdslib.state.ClusterState;
import com.yahoo.vdslib.state.State;
import com.yahoo.log.LogLevel;
+import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest;
import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
import com.yahoo.vespa.clustercontroller.core.Communicator;
import com.yahoo.vespa.clustercontroller.core.FleetControllerOptions;
@@ -41,6 +42,8 @@ public class RPCCommunicator implements Communicator {
public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
+ public static final String ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME = "activate_cluster_state_version";
+
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -106,7 +109,7 @@ public class RPCCommunicator implements Communicator {
public void getNodeState(NodeInfo node, Waiter<GetNodeStateRequest> externalWaiter) {
Target connection = getConnection(node);
if ( ! connection.isValid()) {
- log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
+ log.log(LogLevel.DEBUG, () -> String.format("Connection to '%s' could not be created.", node.getRpcAddress()));
}
NodeState currentState = node.getReportedState();
Request req = new Request("getnodestate3");
@@ -134,7 +137,7 @@ public class RPCCommunicator implements Communicator {
Target connection = getConnection(node);
if ( ! connection.isValid()) {
- log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
+ log.log(LogLevel.DEBUG, () -> String.format("Connection to '%s' could not be created.", node.getRpcAddress()));
return;
}
int nodeVersion = node.getVersion();
@@ -161,6 +164,27 @@ public class RPCCommunicator implements Communicator {
node.setSystemStateVersionSent(baselineState);
}
+ @Override
+ public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> externalWaiter) {
+ var waiter = new RPCActivateClusterStateVersionWaiter(externalWaiter, timer);
+
+ Target connection = getConnection(node);
+ if ( ! connection.isValid()) {
+ log.log(LogLevel.DEBUG, () -> String.format("Connection to '%s' could not be created.", node.getRpcAddress()));
+ return;
+ }
+
+ var req = new Request(ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME);
+ req.parameters().add(new Int32Value(clusterStateVersion));
+
+ log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d",
+ req.methodName(), node.getRpcAddress(), clusterStateVersion));
+ var activationRequest = new RPCActivateClusterStateVersionRequest(node, req, clusterStateVersion);
+ waiter.setRequest(activationRequest);
+
+ connection.invokeAsync(req, 60, waiter);
+ }
+
// protected for testing.
protected int generateNodeStateRequestTimeoutMs() {
double intervalFraction = Math.random();
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
index 5d200d65516..b1e7158f61c 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java
@@ -47,6 +47,14 @@ public class DummyCommunicator implements Communicator, NodeLookup {
}
+ public class DummyActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest {
+
+ public DummyActivateClusterStateVersionRequest(NodeInfo nodeInfo, int stateVersion) {
+ super(nodeInfo, stateVersion);
+ }
+
+ }
+
private Map<Node, DummyGetNodeStateRequest> getNodeStateRequests = new TreeMap<>();
public DummyCommunicator(List<Node> nodeList, Timer timer) {
@@ -98,6 +106,13 @@ public class DummyCommunicator implements Communicator, NodeLookup {
}
}
+ @Override
+ public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> waiter) {
+ var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion);
+ req.setReply(new ActivateClusterStateVersionRequest.Reply());
+ waiter.done(req);
+ }
+
public void sendAllDeferredDistributorClusterStateAcks() {
deferredClusterStateAcks.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond()));
deferredClusterStateAcks.clear();
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
index 7602f0c83a2..61e9d1a90de 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
@@ -99,7 +99,7 @@ public class RPCCommunicatorTest {
(RequestWaiter)any());
}
- private static class Fixture {
+ private static class Fixture<RequestType> {
final Supervisor mockSupervisor = mock(Supervisor.class);
final Target mockTarget = mock(Target.class);
final Timer timer = new FakeTimer();
@@ -107,7 +107,7 @@ public class RPCCommunicatorTest {
final AtomicReference<Request> receivedRequest = new AtomicReference<>();
final AtomicReference<RequestWaiter> receivedWaiter = new AtomicReference<>();
@SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics
- final Communicator.Waiter<SetClusterStateRequest> mockWaiter = mock(Communicator.Waiter.class);
+ final Communicator.Waiter<RequestType> mockWaiter = mock(Communicator.Waiter.class);
Fixture() {
communicator = new RPCCommunicator(
@@ -131,9 +131,9 @@ public class RPCCommunicatorTest {
@Test
public void setSystemState_v3_sends_distribution_states_rpc() {
- Fixture f = new Fixture();
- ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
- ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3");
+ var f = new Fixture<SetClusterStateRequest>();
+ var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ var sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3");
f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
Request req = f.receivedRequest.get();
@@ -147,9 +147,9 @@ public class RPCCommunicatorTest {
@Test
public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() {
- Fixture f = new Fixture();
- ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
- ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3");
+ var f = new Fixture<SetClusterStateRequest>();
+ var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ var sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3");
f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
RequestWaiter waiter = f.receivedWaiter.get();
@@ -171,4 +171,17 @@ public class RPCCommunicatorTest {
assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME));
}
+ @Test
+ public void activateClusterStateVersion_sends_version_activation_rpc() {
+ var f = new Fixture<ActivateClusterStateVersionRequest>();
+ var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ f.communicator.activateClusterStateVersion(12345, cf.cluster().getNodeInfo(Node.ofDistributor(1)), f.mockWaiter);
+
+ Request req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+ assertThat(req.methodName(), equalTo(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME));
+ assertTrue(req.parameters().satisfies("i")); // <cluster state version>
+ assertThat(req.parameters().get(0).asInt32(), equalTo(12345));
+ }
+
}