diff options
Diffstat (limited to 'clustercontroller-core/src/test')
13 files changed, 319 insertions, 74 deletions
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java index 4862c83f2a6..5692e961cbe 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java @@ -20,7 +20,7 @@ import java.util.TreeMap; import static org.mockito.Mockito.mock; -class ClusterFixture { +public class ClusterFixture { public final ContentCluster cluster; public final Distribution distribution; public final FakeTimer timer; @@ -28,7 +28,7 @@ class ClusterFixture { public final StateChangeHandler nodeStateChangeHandler; public final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params(); - ClusterFixture(ContentCluster cluster, Distribution distribution) { + public ClusterFixture(ContentCluster cluster, Distribution distribution) { this.cluster = cluster; this.distribution = distribution; this.timer = new FakeTimer(); @@ -37,13 +37,13 @@ class ClusterFixture { this.params.cluster(this.cluster); } - StateChangeHandler createNodeStateChangeHandlerForCluster() { + public StateChangeHandler createNodeStateChangeHandlerForCluster() { final int controllerIndex = 0; MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), controllerIndex); return new StateChangeHandler(timer, eventLog, metricUpdater); } - ClusterFixture bringEntireClusterUp() { + public ClusterFixture bringEntireClusterUp() { cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { reportStorageNodeState(idx, State.UP); reportDistributorNodeState(idx, State.UP); @@ -51,7 +51,7 @@ class ClusterFixture { return this; } - ClusterFixture markEntireClusterDown() { + public ClusterFixture markEntireClusterDown() { cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { reportStorageNodeState(idx, State.DOWN); reportDistributorNodeState(idx, State.DOWN); @@ -69,7 +69,7 @@ class ClusterFixture { nodeInfo.setReportedState(nodeState, timer.getCurrentTimeInMillis()); } - ClusterFixture reportStorageNodeState(final int index, State state, String description) { + public ClusterFixture reportStorageNodeState(final int index, State state, String description) { final Node node = new Node(NodeType.STORAGE, index); final NodeState nodeState = new NodeState(NodeType.STORAGE, state); nodeState.setDescription(description); @@ -77,23 +77,23 @@ class ClusterFixture { return this; } - ClusterFixture reportStorageNodeState(final int index, State state) { + public ClusterFixture reportStorageNodeState(final int index, State state) { return reportStorageNodeState(index, state, "mockdesc"); } - ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) { + public ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) { doReportNodeState(new Node(NodeType.STORAGE, index), nodeState); return this; } - ClusterFixture reportDistributorNodeState(final int index, State state) { + public ClusterFixture reportDistributorNodeState(final int index, State state) { final Node node = new Node(NodeType.DISTRIBUTOR, index); final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state); doReportNodeState(node, nodeState); return this; } - ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) { + public ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) { doReportNodeState(new Node(NodeType.DISTRIBUTOR, index), nodeState); return this; } @@ -108,18 +108,18 @@ class ClusterFixture { nodeStateChangeHandler.proposeNewNodeState(stateBefore, nodeInfo, nodeState); } - ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) { + public ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) { final Node node = new Node(NodeType.STORAGE, index); final NodeState nodeState = new NodeState(NodeType.STORAGE, state); doProposeWantedState(node, nodeState, description); return this; } - ClusterFixture proposeStorageNodeWantedState(final int index, State state) { + public ClusterFixture proposeStorageNodeWantedState(final int index, State state) { return proposeStorageNodeWantedState(index, state, "mockdesc"); } - ClusterFixture proposeDistributorWantedState(final int index, State state) { + public ClusterFixture proposeDistributorWantedState(final int index, State state) { final ClusterState stateBefore = rawGeneratedClusterState(); final Node node = new Node(NodeType.DISTRIBUTOR, index); final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state); @@ -131,12 +131,12 @@ class ClusterFixture { return this; } - ClusterFixture disableAutoClusterTakedown() { + public ClusterFixture disableAutoClusterTakedown() { setMinNodesUp(0, 0, 0.0, 0.0); return this; } - ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { + public ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { params.minStorageNodesUp(minStorNodes) .minDistributorNodesUp(minDistNodes) .minRatioOfStorageNodesUp(minStorRatio) @@ -144,32 +144,32 @@ class ClusterFixture { return this; } - ClusterFixture setMinNodeRatioPerGroup(double upRatio) { + public ClusterFixture setMinNodeRatioPerGroup(double upRatio) { params.minNodeRatioPerGroup(upRatio); return this; } - ClusterFixture assignDummyRpcAddresses() { + public ClusterFixture assignDummyRpcAddresses() { cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0")); return this; } - static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) { + static public Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) { Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime); maxTransitionTime.put(NodeType.STORAGE, storageTransitionTime); return maxTransitionTime; } - void disableTransientMaintenanceModeOnDown() { + public void disableTransientMaintenanceModeOnDown() { this.params.transitionTimes(0); } - void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) { + public void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) { this.params.transitionTimes(transitionTimeMs); } - ClusterFixture markNodeAsConfigRetired(int nodeIndex) { + public ClusterFixture markNodeAsConfigRetired(int nodeIndex) { Set<ConfiguredNode> configuredNodes = new HashSet<>(cluster.getConfiguredNodes().values()); configuredNodes.remove(new ConfiguredNode(nodeIndex, false)); configuredNodes.add(new ConfiguredNode(nodeIndex, true)); @@ -177,24 +177,24 @@ class ClusterFixture { return this; } - AnnotatedClusterState annotatedGeneratedClusterState() { + public AnnotatedClusterState annotatedGeneratedClusterState() { params.currentTimeInMilllis(timer.getCurrentTimeInMillis()); return ClusterStateGenerator.generatedStateFrom(params); } - ClusterState rawGeneratedClusterState() { + public ClusterState rawGeneratedClusterState() { return annotatedGeneratedClusterState().getClusterState(); } - String generatedClusterState() { + public String generatedClusterState() { return annotatedGeneratedClusterState().getClusterState().toString(); } - String verboseGeneratedClusterState() { + public String verboseGeneratedClusterState() { return annotatedGeneratedClusterState().getClusterState().toString(true); } - static ClusterFixture forFlatCluster(int nodeCount) { + public static ClusterFixture forFlatCluster(int nodeCount) { Collection<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(nodeCount); Distribution distribution = DistributionBuilder.forFlatCluster(nodeCount); @@ -203,7 +203,7 @@ class ClusterFixture { return new ClusterFixture(cluster, distribution); } - static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) { + public static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) { List<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(root.totalNodeCount()); Distribution distribution = DistributionBuilder.forHierarchicCluster(root); ContentCluster cluster = new ContentCluster("foo", nodes, distribution, 0, 0.0); @@ -211,19 +211,19 @@ class ClusterFixture { return new ClusterFixture(cluster, distribution); } - ClusterStateGenerator.Params generatorParams() { + public ClusterStateGenerator.Params generatorParams() { return new ClusterStateGenerator.Params().cluster(cluster); } - ContentCluster cluster() { + public ContentCluster cluster() { return this.cluster; } - static Node storageNode(int index) { + public static Node storageNode(int index) { return new Node(NodeType.STORAGE, index); } - static Node distributorNode(int index) { + public static Node distributorNode(int index) { return new Node(NodeType.DISTRIBUTOR, index); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java new file mode 100644 index 00000000000..31adb93fc49 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java @@ -0,0 +1,16 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class ClusterStateBundleUtil { + + public static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) { + return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)), + Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state))); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java index a6e62d29ab5..ae59336b5ef 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java @@ -17,8 +17,9 @@ public class DistributionBitCountTest extends FleetControllerTest { private void setUpSystem(String testName) throws Exception { List<ConfiguredNode> configuredNodes = new ArrayList<>(); - for (int i = 0 ; i < 10; i++) + for (int i = 0 ; i < 10; i++) { configuredNodes.add(new ConfiguredNode(i, false)); + } FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes); options.distributionBits = 17; setUpFleetController(false, options); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index 5fd7c26617e..a2da6bd1c6c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; import com.yahoo.jrt.*; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.slobrok.api.BackOffPolicy; @@ -8,6 +10,10 @@ import com.yahoo.jrt.slobrok.api.Register; import com.yahoo.jrt.slobrok.api.SlobrokList; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.*; +import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil; +import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; import java.net.InetAddress; import java.net.UnknownHostException; @@ -345,6 +351,14 @@ public class DummyVdsNode { supervisor.addMethod(m); } } + if (stateCommunicationVersion >= RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) { + m = new Method(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME, "bix", "", this, "rpc_setDistributionStates"); + m.methodDesc("Set distribution states for cluster and bucket spaces"); + m.paramDesc(0, "compressionType", "Compression type for payload"); + m.paramDesc(1, "uncompressedSize", "Uncompressed size of payload"); + m.paramDesc(2, "payload", "Slime format payload"); + supervisor.addMethod(m); + } } public void rpc_storageConnect(Request req) { @@ -501,4 +515,25 @@ public class DummyVdsNode { req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } } + + public void rpc_setDistributionStates(Request req) { + try { + if (shouldFailSetSystemStateRequests()) { + req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setDistributionStates() calls"); + return; + } + ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req); + ClusterState newState = stateBundle.getBaselineClusterState(); + synchronized(timer) { + updateStartTimestamps(newState); + systemState.add(0, newState); + timer.notifyAll(); + } + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState); + } catch (Exception e) { + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); + e.printStackTrace(System.err); + req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); + } + } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java index b0b833000d2..bda06248d9e 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java @@ -1,9 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; + /** - * + * Options for simulated content nodes that are registered in Slobrok and communicate + * over regular RPC. */ public class DummyVdsNodeOptions { - public int stateCommunicationVersion = 2; // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+ + // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+ + public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 662c36b29f7..19d87b878d4 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -144,6 +144,7 @@ public abstract class FleetControllerTest implements Waiter { options.minRatioOfStorageNodesUp); NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); Communicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, options.nodeStateRequestTimeoutMS, diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java index fa92a4d5246..0e2b9557cb9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java @@ -12,7 +12,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class MaintenancehenPendingGlobalMergesTest { +public class MaintenanceWhenPendingGlobalMergesTest { private static class Fixture { public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java new file mode 100644 index 00000000000..d59dbb4933a --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java @@ -0,0 +1,36 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.State; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class RpcVersionAutoDowngradeTest extends FleetControllerTest { + + private void setUpFakeCluster(int nodeRpcVersion) throws Exception { + List<ConfiguredNode> configuredNodes = new ArrayList<>(); + for (int i = 0 ; i < 10; i++) { + configuredNodes.add(new ConfiguredNode(i, false)); + } + FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes); + setUpFleetController(false, options); + DummyVdsNodeOptions nodeOptions = new DummyVdsNodeOptions(); + nodeOptions.stateCommunicationVersion = nodeRpcVersion; + List<DummyVdsNode> nodes = setUpVdsNodes(false, nodeOptions, true, configuredNodes); + for (DummyVdsNode node : nodes) { + node.setNodeState(new NodeState(node.getType(), State.UP).setMinUsedBits(20)); + node.connect(); + } + } + + @Test + public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception { + setUpFakeCluster(2); // HEAD is at v3 + waitForState("version:\\d+ distributor:10 storage:10"); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java new file mode 100644 index 00000000000..f88802c09ef --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java @@ -0,0 +1,20 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +public class StateMapping { + + final String bucketSpace; + final ClusterState state; + + private StateMapping(String bucketSpace, ClusterState state) { + this.bucketSpace = bucketSpace; + this.state = state; + } + + public static StateMapping of(String bucketSpace, String state) { + return new StateMapping(bucketSpace, ClusterState.stateFromString(state)); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 93aac5c83ed..aa5219147ce 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -7,7 +7,6 @@ import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListen import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; import org.junit.Test; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.mockito.Matchers.any; @@ -63,29 +62,10 @@ public class SystemStateBroadcasterTest { return Stream.of(nodes).map(c::getNodeInfo); } - private static class StateMapping { - final String bucketSpace; - final ClusterState state; - - StateMapping(String bucketSpace, ClusterState state) { - this.bucketSpace = bucketSpace; - this.state = state; - } - } - - private static StateMapping mapping(String bucketSpace, String state) { - return new StateMapping(bucketSpace, ClusterState.stateFromString(state)); - } - - private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) { - return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)), - Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state))); - } - @Test public void always_publish_baseline_cluster_state() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); @@ -95,7 +75,7 @@ public class SystemStateBroadcasterTest { @Test public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); @@ -105,16 +85,16 @@ public class SystemStateBroadcasterTest { // Only distributor 0 should observe startup timestamps verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); }); - ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700"); + ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700"); verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } @Test public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", - mapping("default", "distributor:2 storage:2 .0.s:d"), - mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); @@ -125,9 +105,9 @@ public class SystemStateBroadcasterTest { @Test public void non_observed_startup_timestamps_are_published_per_bucket_space_state() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", - mapping("default", "distributor:2 storage:2 .0.s:d"), - mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); @@ -137,9 +117,9 @@ public class SystemStateBroadcasterTest { // Only distributor 0 should observe startup timestamps verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); }); - ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700", - mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"), - mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); + ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index 73692dbdbfa..2665175c637 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -1,24 +1,24 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.rpc; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.RequestWaiter; -import com.yahoo.jrt.Target; +import com.yahoo.jrt.*; import com.yahoo.vdslib.state.Node; -import com.yahoo.vdslib.state.NodeType; import com.yahoo.vespa.clustercontroller.core.*; import org.junit.Test; import org.mockito.Mockito; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,6 +34,7 @@ public class RPCCommunicatorTest { @Test public void testGenerateNodeStateRequestTimeoutMs() throws Exception { final RPCCommunicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS, @@ -59,7 +60,7 @@ public class RPCCommunicatorTest { @Test public void testGenerateNodeStateRequestTimeoutMsWithUpdates() throws Exception { - final RPCCommunicator communicator = new RPCCommunicator(null /* Timer */, INDEX, 1, 1, 100, 0); + final RPCCommunicator communicator = new RPCCommunicator(RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, 1, 1, 100, 0); FleetControllerOptions fleetControllerOptions = new FleetControllerOptions(null /*clustername*/); fleetControllerOptions.nodeStateRequestTimeoutEarliestPercentage = 100; fleetControllerOptions.nodeStateRequestTimeoutLatestPercentage = 100; @@ -73,6 +74,7 @@ public class RPCCommunicatorTest { public void testRoundtripLatency() throws Exception { final Timer timer = new FakeTimer(); final RPCCommunicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), timer, INDEX, NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS, @@ -91,4 +93,77 @@ public class RPCCommunicatorTest { eq(ROUNDTRIP_LATENCY_SECONDS + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS/1000.0), (RequestWaiter)any()); } + + private static class Fixture { + final Supervisor mockSupervisor = mock(Supervisor.class); + final Target mockTarget = mock(Target.class); + final Timer timer = new FakeTimer(); + final RPCCommunicator communicator; + final AtomicReference<Request> receivedRequest = new AtomicReference<>(); + final AtomicReference<RequestWaiter> receivedWaiter = new AtomicReference<>(); + @SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics + final Communicator.Waiter<SetClusterStateRequest> mockWaiter = mock(Communicator.Waiter.class); + + Fixture() { + communicator = new RPCCommunicator( + mockSupervisor, + timer, + INDEX, + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS, + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_STOP_PERCENTAGE, + 100, + ROUNDTRIP_LATENCY_SECONDS); + + when(mockSupervisor.connect(any())).thenReturn(mockTarget); + when(mockTarget.isValid()).thenReturn(true); + doAnswer((invocation) -> { + receivedRequest.set((Request) invocation.getArguments()[0]); + receivedWaiter.set((RequestWaiter) invocation.getArguments()[2]); + return null; + }).when(mockTarget).invokeAsync(any(), anyDouble(), any()); + } + } + + @Test + public void setSystemState_v3_sends_distribution_states_rpc() { + Fixture f = new Fixture(); + ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3"); + f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); + + Request req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + assertThat(req.methodName(), equalTo(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)); + assertTrue(req.parameters().satisfies("bix")); // <compression type>, <uncompressed size>, <payload> + + ClusterStateBundle receivedBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req); + assertThat(receivedBundle, equalTo(sentBundle)); + } + + @Test + public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() { + Fixture f = new Fixture(); + ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3"); + f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); + + RequestWaiter waiter = f.receivedWaiter.get(); + assertThat(waiter, notNullValue()); + Request req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + + req.setError(ErrorCode.NO_SUCH_METHOD, "que?"); + waiter.handleRequestDone(req); + + // This would normally be done in processResponses(), but that code path is not invoked in this test. + cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false); + + f.receivedRequest.set(null); + // Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC + f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); + req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)); + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java new file mode 100644 index 00000000000..2fdf6b5c8a8 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java @@ -0,0 +1,21 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.jrt.Request; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +public class RPCUtil { + + public static ClusterStateBundle decodeStateBundleFromSetDistributionStatesRequest(Request req) { + final CompressionType type = CompressionType.valueOf(req.parameters().get(0).asInt8()); + final int uncompressedSize = req.parameters().get(1).asInt32(); + final byte[] compressedPayload = req.parameters().get(2).asData(); + + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + Compressor.Compression compression = new Compressor.Compression(type, uncompressedSize, compressedPayload); + return codec.decode(EncodedClusterStateBundle.fromCompressionBuffer(compression)); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java new file mode 100644 index 00000000000..f2fe494ce61 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java @@ -0,0 +1,56 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundleUtil; +import com.yahoo.vespa.clustercontroller.core.StateMapping; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertThat; + +public class SlimeClusterStateBundleCodecTest { + + private static ClusterStateBundle roundtripEncode(ClusterStateBundle stateBundle) { + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encoded = codec.encode(stateBundle); + return codec.decode(encoded); + } + + @Test + public void baseline_only_bundle_can_be_round_trip_encoded() { + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); + assertThat(roundtripEncode(stateBundle), equalTo(stateBundle)); + } + + @Test + public void multi_space_state_bundle_can_be_round_trip_encoded() { + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); + assertThat(roundtripEncode(stateBundle), equalTo(stateBundle)); + } + + private static ClusterStateBundle makeCompressableBundle() { + StringBuilder allDownStates = new StringBuilder(2048); + for (int i = 0; i < 99; ++i) { + allDownStates.append(" .").append(i).append(".s:d"); + } + // Exact same state set string repeated twice; perfect compression fodder. + return ClusterStateBundleUtil.makeBundle(String.format("distributor:100%s storage:100%s", + allDownStates.toString(), allDownStates.toString())); + } + + @Test + public void encoded_cluster_states_can_be_compressed() { + ClusterStateBundle stateBundle = makeCompressableBundle(); + + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encoded = codec.encode(stateBundle); + assertThat(encoded.getCompression().data().length, lessThan(stateBundle.getBaselineClusterState().toString().length())); + ClusterStateBundle decodedBundle = codec.decode(encoded); + assertThat(decodedBundle, equalTo(stateBundle)); + } + +} |