diff options
author | Geir Storli <geirstorli@yahoo.no> | 2018-02-28 11:12:51 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-28 11:12:51 +0100 |
commit | 29b39068a3ffd22f906e72fdf2eedb9fe4d9d096 (patch) | |
tree | cddcbd87902828e5136513ac807954220debed2a | |
parent | a35db3bc1005adb1d49335acef98aba1761368b9 (diff) | |
parent | 955fe835a6a37b7673c286ea2954f1d2359b3c87 (diff) |
Merge pull request #5164 from vespa-engine/vekterli/add-v3-cluster-state-propagation-rpc
Add v3 RPC for sending cluster states to content nodes
21 files changed, 490 insertions, 92 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index e755c2b8117..5c345b6f8a0 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -178,6 +178,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.minRatioOfStorageNodesUp); NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); Communicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, options.nodeStateRequestTimeoutMS, diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index 15650f0f4aa..55331e4bbed 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -8,6 +8,7 @@ import com.yahoo.vdslib.distribution.Distribution; import com.yahoo.vdslib.distribution.Group; import com.yahoo.vdslib.state.*; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import java.io.PrintWriter; import java.io.StringWriter; @@ -205,7 +206,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { } public int getLatestVersion() { - return 2; + return RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION; } public String getSlobrokAddress() { @@ -375,21 +376,23 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public boolean notifyNoSuchMethodError(String methodName, Timer timer) { if (methodName.equals("getnodestate3")) { if (version > 1) { - log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 1."); - version = 1; - nextAttemptTime = 0; - adjustedVersionTime = timer.getCurrentTimeInMillis(); + downgradeToRpcVersion(1, methodName, timer); return true; } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) { log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 1 and was recently adjusted, so ignoring it."); return true; } - } else if (methodName.equals("getnodestate2") || methodName.equals("setsystemstate2")) { + } else if (methodName.equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)) { + if (version == RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) { + downgradeToRpcVersion(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_VERSION, methodName, timer); + return true; + } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) { + log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Version already downgraded, so ignoring it."); + return true; + } + } else if (methodName.equals("getnodestate2") || methodName.equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) { if (version > 0) { - log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 0."); - version = 0; - nextAttemptTime = 0; - adjustedVersionTime = timer.getCurrentTimeInMillis(); + downgradeToRpcVersion(0, methodName, timer); return true; } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) { log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 0 and was recently adjusted, so ignoring it."); @@ -400,6 +403,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { return false; } + private void downgradeToRpcVersion(int newVersion, String methodName, Timer timer) { + log.log(LogLevel.DEBUG, () -> String.format("Node %s does not support %s call. Downgrading to version %d.", + toString(), methodName, newVersion)); + version = newVersion; + nextAttemptTime = 0; + adjustedVersionTime = timer.getCurrentTimeInMillis(); + } + public Target getConnection() { return connection; } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 45a14ee19cd..b0869b560f6 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -59,8 +59,8 @@ public class SystemStateBroadcaster { int version = req.getSystemStateVersion(); if (req.getReply().isError()) { + info.setSystemStateVersionAcknowledged(version, false); if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) { - info.setSystemStateVersionAcknowledged(version, false); if (info.getNewestSystemStateVersionSent() == version) { reportNodeError(nodeOk, info, "Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage() diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java new file mode 100644 index 00000000000..250d7bbe46a --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java @@ -0,0 +1,21 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +/** + * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC. + * + * Implementations may choose to compress the encoded representation of the bundle. + * + * It is important that the input given to decode() is exactly equal to that given from + * encode() for the results to be correct. Implementations must ensure that this information + * is enough to losslessly reconstruct the full encoded ClusterStateBundle. + */ +public interface ClusterStateBundleCodec { + + EncodedClusterStateBundle encode(ClusterStateBundle stateBundle); + + ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle); + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java new file mode 100644 index 00000000000..784c8c23c87 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java @@ -0,0 +1,29 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.compress.Compressor; + +/** + * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle. + * + * This bundle can in turn be sent over the wire or serialized by ensuring that all components + * of the Compressor.Compression state can be reconstructed by the receiver. In practice this + * means sending the Compression's <em>type</em>, <em>uncompressedSize</em> and <em>data</em>. + */ +public class EncodedClusterStateBundle { + + private final Compressor.Compression compression; + + private EncodedClusterStateBundle(Compressor.Compression compression) { + this.compression = compression; + } + + public static EncodedClusterStateBundle fromCompressionBuffer(Compressor.Compression compression) { + return new EncodedClusterStateBundle(compression); + } + + public Compressor.Compression getCompression() { + return compression; + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index 0cc5ef36e3c..8941b2e32ec 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator { public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName()); + public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3; + public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates"; + + public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2; + public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2"; + private final Timer timer; private final Supervisor supervisor; private double nodeStateRequestTimeoutIntervalMaxSeconds; @@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator { private int nodeStateRequestRoundTripTimeMaxSeconds; private final int fleetControllerIndex; + public static Supervisor createRealSupervisor() { + return new Supervisor(new Transport()); + } + public RPCCommunicator( + final Supervisor supervisor, final Timer t, final int index, final int nodeStateRequestTimeoutIntervalMaxMs, @@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator { this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage; this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage; this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds; - this.supervisor = new Supervisor(new Transport()); + this.supervisor = supervisor; } public void shutdown() { @@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator { if ( ! connection.isValid()) { log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); } + // TODO remove this deprecated legacy stuff if (node.getVersion() == 0 && node.getConnectionVersion() > 0) { doVersion0HandShake(connection, node); clearOldStoredNodeState(connection, node); @@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator { log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); return; } - if (node.getVersion() == 0 && node.getConnectionVersion() > 0) { + final int nodeVersion = node.getVersion(); + // TODO remove this deprecated legacy stuff + if (nodeVersion == 0 && node.getConnectionVersion() > 0) { doVersion0HandShake(connection, node); clearOldStoredNodeState(connection, node); } Request req; - if (node.getVersion() == 0) { + if (nodeVersion == 0) { req = new Request("setsystemstate"); req.parameters().add(new StringValue(baselineState.toString(true))); - } else { - req = new Request("setsystemstate2"); + } else if (nodeVersion <= 2) { + req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME); req.parameters().add(new StringValue(baselineState.toString(false))); + } else { + req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME); + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle); + Values v = req.parameters(); + v.add(new Int8Value(encodedBundle.getCompression().type().getCode())); + v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize())); + v.add(new DataValue(encodedBundle.getCompression().data())); } + log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d", + req.methodName(), node.getRpcAddress(), stateBundle.getVersion())); RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion()); waiter.setRequest(stateRequest); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java index 39506ddf7fa..96a776a65b8 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java @@ -30,7 +30,8 @@ public class RPCSetClusterStateWaiter implements RequestWaiter { public SetClusterStateRequest.Reply getReply(Request req) { NodeInfo info = request.getNodeInfo(); - if (req.methodName().equals("setsystemstate2")) { + if (req.methodName().equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME) + || req.methodName().equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) { if (req.isError() && req.errorCode() == ErrorCode.NO_SUCH_METHOD) { if (info.notifyNoSuchMethodError(req.methodName(), timer)) { return new SetClusterStateRequest.Reply(Communicator.TRANSIENT_ERROR, "Trying lower version"); @@ -39,7 +40,7 @@ public class RPCSetClusterStateWaiter implements RequestWaiter { if (req.isError()) { return new SetClusterStateRequest.Reply(req.errorCode(), req.errorMessage()); } else if (!req.checkReturnTypes("")) { - return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got setsystemstate2 response with invalid return types from " + info); + return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info); } } else { return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Unknown method " + req.methodName()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java new file mode 100644 index 00000000000..c37bd8313a9 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java @@ -0,0 +1,58 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.slime.*; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding + * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is + * intentionally extensible so that we may add other information to it later. + * + * LZ4 compression is transparently applied during encoding and decompression is + * subsequently applied during decoding. + */ +public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec { + + private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024); + + @Override + public EncodedClusterStateBundle encode(ClusterStateBundle stateBundle) { + Slime slime = new Slime(); + Cursor root = slime.setObject(); + Cursor states = root.setObject("states"); + // TODO add another function that is not toString for this..! + states.setString("baseline", stateBundle.getBaselineClusterState().toString()); + Cursor spaces = states.setObject("spaces"); + stateBundle.getDerivedBucketSpaceStates().entrySet() + .forEach(entry -> spaces.setString(entry.getKey(), entry.getValue().toString())); + + byte[] serialized = BinaryFormat.encode(slime); + Compressor.Compression compression = compressor.compress(serialized); + return EncodedClusterStateBundle.fromCompressionBuffer(compression); + } + + @Override + public ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle) { + byte[] uncompressed = compressor.decompress(encodedClusterStateBundle.getCompression()); + Slime slime = BinaryFormat.decode(uncompressed); + Inspector root = slime.get(); + Inspector states = root.field("states"); + ClusterState baseline = ClusterState.stateFromString(states.field("baseline").asString()); + + Inspector spaces = states.field("spaces"); + Map<String, ClusterState> derivedStates = new HashMap<>(); + spaces.traverse(((ObjectTraverser)(key, value) -> { + derivedStates.put(key, ClusterState.stateFromString(value.asString())); + })); + + return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates); + } +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java index 4862c83f2a6..5692e961cbe 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java @@ -20,7 +20,7 @@ import java.util.TreeMap; import static org.mockito.Mockito.mock; -class ClusterFixture { +public class ClusterFixture { public final ContentCluster cluster; public final Distribution distribution; public final FakeTimer timer; @@ -28,7 +28,7 @@ class ClusterFixture { public final StateChangeHandler nodeStateChangeHandler; public final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params(); - ClusterFixture(ContentCluster cluster, Distribution distribution) { + public ClusterFixture(ContentCluster cluster, Distribution distribution) { this.cluster = cluster; this.distribution = distribution; this.timer = new FakeTimer(); @@ -37,13 +37,13 @@ class ClusterFixture { this.params.cluster(this.cluster); } - StateChangeHandler createNodeStateChangeHandlerForCluster() { + public StateChangeHandler createNodeStateChangeHandlerForCluster() { final int controllerIndex = 0; MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), controllerIndex); return new StateChangeHandler(timer, eventLog, metricUpdater); } - ClusterFixture bringEntireClusterUp() { + public ClusterFixture bringEntireClusterUp() { cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { reportStorageNodeState(idx, State.UP); reportDistributorNodeState(idx, State.UP); @@ -51,7 +51,7 @@ class ClusterFixture { return this; } - ClusterFixture markEntireClusterDown() { + public ClusterFixture markEntireClusterDown() { cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> { reportStorageNodeState(idx, State.DOWN); reportDistributorNodeState(idx, State.DOWN); @@ -69,7 +69,7 @@ class ClusterFixture { nodeInfo.setReportedState(nodeState, timer.getCurrentTimeInMillis()); } - ClusterFixture reportStorageNodeState(final int index, State state, String description) { + public ClusterFixture reportStorageNodeState(final int index, State state, String description) { final Node node = new Node(NodeType.STORAGE, index); final NodeState nodeState = new NodeState(NodeType.STORAGE, state); nodeState.setDescription(description); @@ -77,23 +77,23 @@ class ClusterFixture { return this; } - ClusterFixture reportStorageNodeState(final int index, State state) { + public ClusterFixture reportStorageNodeState(final int index, State state) { return reportStorageNodeState(index, state, "mockdesc"); } - ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) { + public ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) { doReportNodeState(new Node(NodeType.STORAGE, index), nodeState); return this; } - ClusterFixture reportDistributorNodeState(final int index, State state) { + public ClusterFixture reportDistributorNodeState(final int index, State state) { final Node node = new Node(NodeType.DISTRIBUTOR, index); final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state); doReportNodeState(node, nodeState); return this; } - ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) { + public ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) { doReportNodeState(new Node(NodeType.DISTRIBUTOR, index), nodeState); return this; } @@ -108,18 +108,18 @@ class ClusterFixture { nodeStateChangeHandler.proposeNewNodeState(stateBefore, nodeInfo, nodeState); } - ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) { + public ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) { final Node node = new Node(NodeType.STORAGE, index); final NodeState nodeState = new NodeState(NodeType.STORAGE, state); doProposeWantedState(node, nodeState, description); return this; } - ClusterFixture proposeStorageNodeWantedState(final int index, State state) { + public ClusterFixture proposeStorageNodeWantedState(final int index, State state) { return proposeStorageNodeWantedState(index, state, "mockdesc"); } - ClusterFixture proposeDistributorWantedState(final int index, State state) { + public ClusterFixture proposeDistributorWantedState(final int index, State state) { final ClusterState stateBefore = rawGeneratedClusterState(); final Node node = new Node(NodeType.DISTRIBUTOR, index); final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state); @@ -131,12 +131,12 @@ class ClusterFixture { return this; } - ClusterFixture disableAutoClusterTakedown() { + public ClusterFixture disableAutoClusterTakedown() { setMinNodesUp(0, 0, 0.0, 0.0); return this; } - ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { + public ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) { params.minStorageNodesUp(minStorNodes) .minDistributorNodesUp(minDistNodes) .minRatioOfStorageNodesUp(minStorRatio) @@ -144,32 +144,32 @@ class ClusterFixture { return this; } - ClusterFixture setMinNodeRatioPerGroup(double upRatio) { + public ClusterFixture setMinNodeRatioPerGroup(double upRatio) { params.minNodeRatioPerGroup(upRatio); return this; } - ClusterFixture assignDummyRpcAddresses() { + public ClusterFixture assignDummyRpcAddresses() { cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0")); return this; } - static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) { + static public Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) { Map<NodeType, Integer> maxTransitionTime = new TreeMap<>(); maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime); maxTransitionTime.put(NodeType.STORAGE, storageTransitionTime); return maxTransitionTime; } - void disableTransientMaintenanceModeOnDown() { + public void disableTransientMaintenanceModeOnDown() { this.params.transitionTimes(0); } - void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) { + public void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) { this.params.transitionTimes(transitionTimeMs); } - ClusterFixture markNodeAsConfigRetired(int nodeIndex) { + public ClusterFixture markNodeAsConfigRetired(int nodeIndex) { Set<ConfiguredNode> configuredNodes = new HashSet<>(cluster.getConfiguredNodes().values()); configuredNodes.remove(new ConfiguredNode(nodeIndex, false)); configuredNodes.add(new ConfiguredNode(nodeIndex, true)); @@ -177,24 +177,24 @@ class ClusterFixture { return this; } - AnnotatedClusterState annotatedGeneratedClusterState() { + public AnnotatedClusterState annotatedGeneratedClusterState() { params.currentTimeInMilllis(timer.getCurrentTimeInMillis()); return ClusterStateGenerator.generatedStateFrom(params); } - ClusterState rawGeneratedClusterState() { + public ClusterState rawGeneratedClusterState() { return annotatedGeneratedClusterState().getClusterState(); } - String generatedClusterState() { + public String generatedClusterState() { return annotatedGeneratedClusterState().getClusterState().toString(); } - String verboseGeneratedClusterState() { + public String verboseGeneratedClusterState() { return annotatedGeneratedClusterState().getClusterState().toString(true); } - static ClusterFixture forFlatCluster(int nodeCount) { + public static ClusterFixture forFlatCluster(int nodeCount) { Collection<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(nodeCount); Distribution distribution = DistributionBuilder.forFlatCluster(nodeCount); @@ -203,7 +203,7 @@ class ClusterFixture { return new ClusterFixture(cluster, distribution); } - static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) { + public static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) { List<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(root.totalNodeCount()); Distribution distribution = DistributionBuilder.forHierarchicCluster(root); ContentCluster cluster = new ContentCluster("foo", nodes, distribution, 0, 0.0); @@ -211,19 +211,19 @@ class ClusterFixture { return new ClusterFixture(cluster, distribution); } - ClusterStateGenerator.Params generatorParams() { + public ClusterStateGenerator.Params generatorParams() { return new ClusterStateGenerator.Params().cluster(cluster); } - ContentCluster cluster() { + public ContentCluster cluster() { return this.cluster; } - static Node storageNode(int index) { + public static Node storageNode(int index) { return new Node(NodeType.STORAGE, index); } - static Node distributorNode(int index) { + public static Node distributorNode(int index) { return new Node(NodeType.DISTRIBUTOR, index); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java new file mode 100644 index 00000000000..e097874682a --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java @@ -0,0 +1,20 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Helper functions for constructing a ClusterStateBundle for a baseline state and zero or more + * explicit bucket space states. + */ +public class ClusterStateBundleUtil { + + public static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) { + return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)), + Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state))); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java index a6e62d29ab5..ae59336b5ef 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java @@ -17,8 +17,9 @@ public class DistributionBitCountTest extends FleetControllerTest { private void setUpSystem(String testName) throws Exception { List<ConfiguredNode> configuredNodes = new ArrayList<>(); - for (int i = 0 ; i < 10; i++) + for (int i = 0 ; i < 10; i++) { configuredNodes.add(new ConfiguredNode(i, false)); + } FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes); options.distributionBits = 17; setUpFleetController(false, options); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java index 5fd7c26617e..a2da6bd1c6c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java @@ -1,6 +1,8 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; import com.yahoo.jrt.*; import com.yahoo.jrt.StringValue; import com.yahoo.jrt.slobrok.api.BackOffPolicy; @@ -8,6 +10,10 @@ import com.yahoo.jrt.slobrok.api.Register; import com.yahoo.jrt.slobrok.api.SlobrokList; import com.yahoo.log.LogLevel; import com.yahoo.vdslib.state.*; +import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil; +import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec; import java.net.InetAddress; import java.net.UnknownHostException; @@ -345,6 +351,14 @@ public class DummyVdsNode { supervisor.addMethod(m); } } + if (stateCommunicationVersion >= RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) { + m = new Method(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME, "bix", "", this, "rpc_setDistributionStates"); + m.methodDesc("Set distribution states for cluster and bucket spaces"); + m.paramDesc(0, "compressionType", "Compression type for payload"); + m.paramDesc(1, "uncompressedSize", "Uncompressed size of payload"); + m.paramDesc(2, "payload", "Slime format payload"); + supervisor.addMethod(m); + } } public void rpc_storageConnect(Request req) { @@ -501,4 +515,25 @@ public class DummyVdsNode { req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); } } + + public void rpc_setDistributionStates(Request req) { + try { + if (shouldFailSetSystemStateRequests()) { + req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setDistributionStates() calls"); + return; + } + ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req); + ClusterState newState = stateBundle.getBaselineClusterState(); + synchronized(timer) { + updateStartTimestamps(newState); + systemState.add(0, newState); + timer.notifyAll(); + } + log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState); + } catch (Exception e) { + log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage()); + e.printStackTrace(System.err); + req.setError(ErrorCode.METHOD_FAILED, e.getMessage()); + } + } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java index b0b833000d2..bda06248d9e 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java @@ -1,9 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; + /** - * + * Options for simulated content nodes that are registered in Slobrok and communicate + * over regular RPC. */ public class DummyVdsNodeOptions { - public int stateCommunicationVersion = 2; // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+ + // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+ + public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION; } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java index 662c36b29f7..19d87b878d4 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java @@ -144,6 +144,7 @@ public abstract class FleetControllerTest implements Waiter { options.minRatioOfStorageNodesUp); NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); Communicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, options.nodeStateRequestTimeoutMS, diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java index fa92a4d5246..0e2b9557cb9 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java @@ -12,7 +12,7 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -public class MaintenancehenPendingGlobalMergesTest { +public class MaintenanceWhenPendingGlobalMergesTest { private static class Fixture { public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java new file mode 100644 index 00000000000..d59dbb4933a --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java @@ -0,0 +1,36 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.distribution.ConfiguredNode; +import com.yahoo.vdslib.state.NodeState; +import com.yahoo.vdslib.state.State; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +public class RpcVersionAutoDowngradeTest extends FleetControllerTest { + + private void setUpFakeCluster(int nodeRpcVersion) throws Exception { + List<ConfiguredNode> configuredNodes = new ArrayList<>(); + for (int i = 0 ; i < 10; i++) { + configuredNodes.add(new ConfiguredNode(i, false)); + } + FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes); + setUpFleetController(false, options); + DummyVdsNodeOptions nodeOptions = new DummyVdsNodeOptions(); + nodeOptions.stateCommunicationVersion = nodeRpcVersion; + List<DummyVdsNode> nodes = setUpVdsNodes(false, nodeOptions, true, configuredNodes); + for (DummyVdsNode node : nodes) { + node.setNodeState(new NodeState(node.getType(), State.UP).setMinUsedBits(20)); + node.connect(); + } + } + + @Test + public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception { + setUpFakeCluster(2); // HEAD is at v3 + waitForState("version:\\d+ distributor:10 storage:10"); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java new file mode 100644 index 00000000000..f6eaf86e8f4 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java @@ -0,0 +1,24 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +import com.yahoo.vdslib.state.ClusterState; + +/** + * Tuple representing a mapping from a named bucket space to the derived ClusterState + * for that space. + */ +public class StateMapping { + + final String bucketSpace; + final ClusterState state; + + private StateMapping(String bucketSpace, ClusterState state) { + this.bucketSpace = bucketSpace; + this.state = state; + } + + public static StateMapping of(String bucketSpace, String state) { + return new StateMapping(bucketSpace, ClusterState.stateFromString(state)); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java index 93aac5c83ed..aa5219147ce 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java @@ -7,7 +7,6 @@ import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListen import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler; import org.junit.Test; -import java.util.stream.Collectors; import java.util.stream.Stream; import static org.mockito.Matchers.any; @@ -63,29 +62,10 @@ public class SystemStateBroadcasterTest { return Stream.of(nodes).map(c::getNodeInfo); } - private static class StateMapping { - final String bucketSpace; - final ClusterState state; - - StateMapping(String bucketSpace, ClusterState state) { - this.bucketSpace = bucketSpace; - this.state = state; - } - } - - private static StateMapping mapping(String bucketSpace, String state) { - return new StateMapping(bucketSpace, ClusterState.stateFromString(state)); - } - - private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) { - return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)), - Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state))); - } - @Test public void always_publish_baseline_cluster_state() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); @@ -95,7 +75,7 @@ public class SystemStateBroadcasterTest { @Test public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2"); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); @@ -105,16 +85,16 @@ public class SystemStateBroadcasterTest { // Only distributor 0 should observe startup timestamps verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); }); - ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700"); + ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700"); verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } @Test public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", - mapping("default", "distributor:2 storage:2 .0.s:d"), - mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.broadcaster.handleNewClusterStates(stateBundle); f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator); @@ -125,9 +105,9 @@ public class SystemStateBroadcasterTest { @Test public void non_observed_startup_timestamps_are_published_per_bucket_space_state() { Fixture f = new Fixture(); - ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2", - mapping("default", "distributor:2 storage:2 .0.s:d"), - mapping("upsidedown", "distributor:2 .0.s:d storage:2")); + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses(); f.simulateNodePartitionedAwaySilently(cf); f.broadcaster.handleNewClusterStates(stateBundle); @@ -137,9 +117,9 @@ public class SystemStateBroadcasterTest { // Only distributor 0 should observe startup timestamps verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any()); }); - ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700", - mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"), - mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); + ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700")); verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any()); } } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index 73692dbdbfa..2665175c637 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -1,24 +1,24 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core.rpc; -import com.yahoo.jrt.Request; -import com.yahoo.jrt.RequestWaiter; -import com.yahoo.jrt.Target; +import com.yahoo.jrt.*; import com.yahoo.vdslib.state.Node; -import com.yahoo.vdslib.state.NodeType; import com.yahoo.vespa.clustercontroller.core.*; import org.junit.Test; import org.mockito.Mockito; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.notNullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; import static org.hamcrest.core.IsNot.not; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -34,6 +34,7 @@ public class RPCCommunicatorTest { @Test public void testGenerateNodeStateRequestTimeoutMs() throws Exception { final RPCCommunicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS, @@ -59,7 +60,7 @@ public class RPCCommunicatorTest { @Test public void testGenerateNodeStateRequestTimeoutMsWithUpdates() throws Exception { - final RPCCommunicator communicator = new RPCCommunicator(null /* Timer */, INDEX, 1, 1, 100, 0); + final RPCCommunicator communicator = new RPCCommunicator(RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, 1, 1, 100, 0); FleetControllerOptions fleetControllerOptions = new FleetControllerOptions(null /*clustername*/); fleetControllerOptions.nodeStateRequestTimeoutEarliestPercentage = 100; fleetControllerOptions.nodeStateRequestTimeoutLatestPercentage = 100; @@ -73,6 +74,7 @@ public class RPCCommunicatorTest { public void testRoundtripLatency() throws Exception { final Timer timer = new FakeTimer(); final RPCCommunicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), timer, INDEX, NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS, @@ -91,4 +93,77 @@ public class RPCCommunicatorTest { eq(ROUNDTRIP_LATENCY_SECONDS + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS/1000.0), (RequestWaiter)any()); } + + private static class Fixture { + final Supervisor mockSupervisor = mock(Supervisor.class); + final Target mockTarget = mock(Target.class); + final Timer timer = new FakeTimer(); + final RPCCommunicator communicator; + final AtomicReference<Request> receivedRequest = new AtomicReference<>(); + final AtomicReference<RequestWaiter> receivedWaiter = new AtomicReference<>(); + @SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics + final Communicator.Waiter<SetClusterStateRequest> mockWaiter = mock(Communicator.Waiter.class); + + Fixture() { + communicator = new RPCCommunicator( + mockSupervisor, + timer, + INDEX, + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS, + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_STOP_PERCENTAGE, + 100, + ROUNDTRIP_LATENCY_SECONDS); + + when(mockSupervisor.connect(any())).thenReturn(mockTarget); + when(mockTarget.isValid()).thenReturn(true); + doAnswer((invocation) -> { + receivedRequest.set((Request) invocation.getArguments()[0]); + receivedWaiter.set((RequestWaiter) invocation.getArguments()[2]); + return null; + }).when(mockTarget).invokeAsync(any(), anyDouble(), any()); + } + } + + @Test + public void setSystemState_v3_sends_distribution_states_rpc() { + Fixture f = new Fixture(); + ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3"); + f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); + + Request req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + assertThat(req.methodName(), equalTo(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)); + assertTrue(req.parameters().satisfies("bix")); // <compression type>, <uncompressed size>, <payload> + + ClusterStateBundle receivedBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req); + assertThat(receivedBundle, equalTo(sentBundle)); + } + + @Test + public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() { + Fixture f = new Fixture(); + ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3"); + f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); + + RequestWaiter waiter = f.receivedWaiter.get(); + assertThat(waiter, notNullValue()); + Request req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + + req.setError(ErrorCode.NO_SUCH_METHOD, "que?"); + waiter.handleRequestDone(req); + + // This would normally be done in processResponses(), but that code path is not invoked in this test. + cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false); + + f.receivedRequest.set(null); + // Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC + f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); + req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)); + } + } diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java new file mode 100644 index 00000000000..2fdf6b5c8a8 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java @@ -0,0 +1,21 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.jrt.Request; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +public class RPCUtil { + + public static ClusterStateBundle decodeStateBundleFromSetDistributionStatesRequest(Request req) { + final CompressionType type = CompressionType.valueOf(req.parameters().get(0).asInt8()); + final int uncompressedSize = req.parameters().get(1).asInt32(); + final byte[] compressedPayload = req.parameters().get(2).asData(); + + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + Compressor.Compression compression = new Compressor.Compression(type, uncompressedSize, compressedPayload); + return codec.decode(EncodedClusterStateBundle.fromCompressionBuffer(compression)); + } + +} diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java new file mode 100644 index 00000000000..f2fe494ce61 --- /dev/null +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java @@ -0,0 +1,56 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundleUtil; +import com.yahoo.vespa.clustercontroller.core.StateMapping; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.lessThan; +import static org.junit.Assert.assertThat; + +public class SlimeClusterStateBundleCodecTest { + + private static ClusterStateBundle roundtripEncode(ClusterStateBundle stateBundle) { + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encoded = codec.encode(stateBundle); + return codec.decode(encoded); + } + + @Test + public void baseline_only_bundle_can_be_round_trip_encoded() { + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2"); + assertThat(roundtripEncode(stateBundle), equalTo(stateBundle)); + } + + @Test + public void multi_space_state_bundle_can_be_round_trip_encoded() { + ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2", + StateMapping.of("default", "distributor:2 storage:2 .0.s:d"), + StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2")); + assertThat(roundtripEncode(stateBundle), equalTo(stateBundle)); + } + + private static ClusterStateBundle makeCompressableBundle() { + StringBuilder allDownStates = new StringBuilder(2048); + for (int i = 0; i < 99; ++i) { + allDownStates.append(" .").append(i).append(".s:d"); + } + // Exact same state set string repeated twice; perfect compression fodder. + return ClusterStateBundleUtil.makeBundle(String.format("distributor:100%s storage:100%s", + allDownStates.toString(), allDownStates.toString())); + } + + @Test + public void encoded_cluster_states_can_be_compressed() { + ClusterStateBundle stateBundle = makeCompressableBundle(); + + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encoded = codec.encode(stateBundle); + assertThat(encoded.getCompression().data().length, lessThan(stateBundle.getBaselineClusterState().toString().length())); + ClusterStateBundle decodedBundle = codec.decode(encoded); + assertThat(decodedBundle, equalTo(stateBundle)); + } + +} |