diff options
Diffstat (limited to 'clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java')
-rw-r--r-- | clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java | 145 |
1 files changed, 145 insertions, 0 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java new file mode 100644 index 00000000000..93aac5c83ed --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -0,0 +1,145 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.*; +import com.yahoo.vespa.clustercontroller.core.database.DatabaseHandler; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListener; +import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; +import org.junit.Test; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class SystemStateBroadcasterTest { + + private static class Fixture { + FakeTimer timer = new FakeTimer(); + final Object monitor = new Object(); + SystemStateBroadcaster broadcaster = new SystemStateBroadcaster(timer, monitor); + Communicator mockCommunicator = mock(Communicator.class); + + void simulateNodePartitionedAwaySilently(ClusterFixture cf) { + cf.cluster().getNodeInfo(Node.ofStorage(0)).setStartTimestamp(600); + cf.cluster().getNodeInfo(Node.ofStorage(1)).setStartTimestamp(700); + // Simulate a distributor being partitioned away from the controller without actually going down. It will + // need to observe all startup timestamps to infer if it should fetch bucket info from nodes. + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setStartTimestamp(500); // FIXME multiple sources of timestamps are... rather confusing + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 1000); + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.DOWN).setStartTimestamp(500), 2000); + cf.cluster().getNodeInfo(Node.ofDistributor(0)).setReportedState(new NodeState(NodeType.DISTRIBUTOR, State.UP).setStartTimestamp(500), 3000); + } + } + + private static DatabaseHandler.Context dbContextFrom(ContentCluster cluster) { + return new DatabaseHandler.Context() { + @Override + public ContentCluster getCluster() { + return cluster; + } + + @Override + public FleetController getFleetController() { + return null; // We assume the broadcaster doesn't use this for our test purposes + } + + @Override + public NodeAddedOrRemovedListener getNodeAddedOrRemovedListener() { + return null; + } + + @Override + public NodeStateOrHostInfoChangeHandler getNodeStateUpdateListener() { + return null; + } + }; + } + + private static Stream<NodeInfo> clusterNodeInfos(ContentCluster c, Node... nodes) { + return Stream.of(nodes).map(c::getNodeInfo); + } + + private static class StateMapping { + final String bucketSpace; + final ClusterState state; + + StateMapping(String bucketSpace, ClusterState state) { + this.bucketSpace = bucketSpace; + this.state = state; + } + } + + private static StateMapping mapping(String bucketSpace, String state) { + return new StateMapping(bucketSpace, ClusterState.stateFromString(state)); + } + + private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) { + return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)), + Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state))); + } + + @Test + public void always_publish_baseline_cluster_state() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); + } + + @Test + public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.simulateNodePartitionedAwaySilently(cf); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { + // Only distributor 0 should observe startup timestamps + verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); + }); + ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700"); + verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); + } + + @Test + public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", + mapping("default", "distributor:2 storage:2 .0.s:d"), + mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + + cf.cluster().getNodeInfo().forEach(nodeInfo -> verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any())); + } + + @Test + public void non_observed_startup_timestamps_are_published_per_bucket_space_state() { + Fixture f = new Fixture(); + ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", + mapping("default", "distributor:2 storage:2 .0.s:d"), + mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); + f.simulateNodePartitionedAwaySilently(cf); + f.broadcaster.handleNewClusterStates(stateBundle); + f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); + + clusterNodeInfos(cf.cluster(), Node.ofDistributor(1), Node.ofStorage(0), Node.ofStorage(1)).forEach(nodeInfo -> { + // Only distributor 0 should observe startup timestamps + verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); + }); + ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700", + mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"), + mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); + verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); + } +} |