diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-06 14:23:53 +0100 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:43:02 +0000 |
commit | 05f27f6cfcda786232fa6da47154784dce2483e1 (patch) | |
tree | 8306e87d559411a3fc282697bba5ecc429df34b6 /clustercontroller-core/src/test/java | |
parent | a925ce4a159ff5baf1e6e4cac632794fa2ba6547 (diff) |
Initial groundwork for cluster state version activation RPC
Diffstat (limited to 'clustercontroller-core/src/test/java')
2 files changed, 36 insertions, 8 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index 5d200d65516..b1e7158f61c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -47,6 +47,14 @@ public class DummyCommunicator implements Communicator, NodeLookup { } + public class DummyActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { + + public DummyActivateClusterStateVersionRequest(NodeInfo nodeInfo, int stateVersion) { + super(nodeInfo, stateVersion); + } + + } + private Map<Node, DummyGetNodeStateRequest> getNodeStateRequests = new TreeMap<>(); public DummyCommunicator(List<Node> nodeList, Timer timer) { @@ -98,6 +106,13 @@ public class DummyCommunicator implements Communicator, NodeLookup { } } + @Override + public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter<ActivateClusterStateVersionRequest> waiter) { + var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion); + req.setReply(new ActivateClusterStateVersionRequest.Reply()); + waiter.done(req); + } + public void sendAllDeferredDistributorClusterStateAcks() { deferredClusterStateAcks.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond())); deferredClusterStateAcks.clear(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index 7602f0c83a2..61e9d1a90de 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -99,7 +99,7 @@ public class RPCCommunicatorTest { (RequestWaiter)any()); } - private static class Fixture { + private static class Fixture<RequestType> { final Supervisor mockSupervisor = mock(Supervisor.class); final Target mockTarget = mock(Target.class); final Timer timer = new FakeTimer(); @@ -107,7 +107,7 @@ public class RPCCommunicatorTest { final AtomicReference<Request> receivedRequest = new AtomicReference<>(); final AtomicReference<RequestWaiter> receivedWaiter = new AtomicReference<>(); @SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics - final Communicator.Waiter<SetClusterStateRequest> mockWaiter = mock(Communicator.Waiter.class); + final Communicator.Waiter<RequestType> mockWaiter = mock(Communicator.Waiter.class); Fixture() { communicator = new RPCCommunicator( @@ -131,9 +131,9 @@ public class RPCCommunicatorTest { @Test public void setSystemState_v3_sends_distribution_states_rpc() { - Fixture f = new Fixture(); - ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); - ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3"); + var f = new Fixture<SetClusterStateRequest>(); + var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + var sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3"); f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); Request req = f.receivedRequest.get(); @@ -147,9 +147,9 @@ public class RPCCommunicatorTest { @Test public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() { - Fixture f = new Fixture(); - ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); - ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3"); + var f = new Fixture<SetClusterStateRequest>(); + var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + var sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3"); f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); RequestWaiter waiter = f.receivedWaiter.get(); @@ -171,4 +171,17 @@ public class RPCCommunicatorTest { assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)); } + @Test + public void activateClusterStateVersion_sends_version_activation_rpc() { + var f = new Fixture<ActivateClusterStateVersionRequest>(); + var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + f.communicator.activateClusterStateVersion(12345, cf.cluster().getNodeInfo(Node.ofDistributor(1)), f.mockWaiter); + + Request req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + assertThat(req.methodName(), equalTo(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME)); + assertTrue(req.parameters().satisfies("i")); // <cluster state version> + assertThat(req.parameters().get(0).asInt32(), equalTo(12345)); + } + } |