summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-02-23 15:41:26 +0100
committerTor Brede Vekterli <vekterli@oath.com>2018-02-27 16:05:50 +0100
commit9063ffeccfa036493390bb09741448c232151cdf (patch)
tree10460a25295206050f42fc97f01a8a14966c409f /clustercontroller-core
parent86029a0a0eefacc98b5c86cd8921eed98f4882e6 (diff)
Add v3 RPC for sending cluster states to content nodes
Supports sending states for multiple bucket spaces at once, with possibilities for extending protocol later for sending distribution config etc. Payload is LZ4-compressed when possible. If a receiver node does not understand the v3 protocol, the protocol version is transparently downgraded to legacy v2 RPC.
Diffstat (limited to 'clustercontroller-core')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java1
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java12
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java22
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java50
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java62
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java16
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java3
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java35
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java1
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java (renamed from clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java)2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java36
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java20
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java44
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java89
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java21
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java56
21 files changed, 451 insertions, 84 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index e100fea780a..cd3007ef3d5 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -178,6 +178,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log);
Communicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
options.fleetControllerIndex,
options.nodeStateRequestTimeoutMS,
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 15650f0f4aa..bcef7709155 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.distribution.Group;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -205,7 +206,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
}
public int getLatestVersion() {
- return 2;
+ return RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
}
public String getSlobrokAddress() {
@@ -384,7 +385,18 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 1 and was recently adjusted, so ignoring it.");
return true;
}
- } else if (methodName.equals("getnodestate2") || methodName.equals("setsystemstate2")) {
+ } else if (methodName.equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)) {
+ if (version == RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) {
+ log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Downgrading version to 2 (setsystemstate2).");
+ version = RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_VERSION;
+ nextAttemptTime = 0;
+ adjustedVersionTime = timer.getCurrentTimeInMillis();
+ return true;
+ } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
+ log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Version already downgraded, so ignoring it.");
+ return true;
+ }
+ } else if (methodName.equals("getnodestate2") || methodName.equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (version > 0) {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 0.");
version = 0;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 45a14ee19cd..b0869b560f6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -59,8 +59,8 @@ public class SystemStateBroadcaster {
int version = req.getSystemStateVersion();
if (req.getReply().isError()) {
+ info.setSystemStateVersionAcknowledged(version, false);
if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
- info.setSystemStateVersionAcknowledged(version, false);
if (info.getNewestSystemStateVersionSent() == version) {
reportNodeError(nodeOk, info,
"Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage()
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
new file mode 100644
index 00000000000..4200aec55a9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
@@ -0,0 +1,12 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+public interface ClusterStateBundleCodec {
+
+ EncodedClusterStateBundle encode(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
new file mode 100644
index 00000000000..d502c5ce598
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
@@ -0,0 +1,22 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.Compressor;
+
+public class EncodedClusterStateBundle {
+
+ private final Compressor.Compression compression;
+
+ private EncodedClusterStateBundle(Compressor.Compression compression) {
+ this.compression = compression;
+ }
+
+ public static EncodedClusterStateBundle fromCompressionBuffer(Compressor.Compression compression) {
+ return new EncodedClusterStateBundle(compression);
+ }
+
+ public Compressor.Compression getCompression() {
+ return compression;
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 0cc5ef36e3c..8941b2e32ec 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator {
public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName());
+ public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3;
+ public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates";
+
+ public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
+ public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
+
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator {
private int nodeStateRequestRoundTripTimeMaxSeconds;
private final int fleetControllerIndex;
+ public static Supervisor createRealSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
public RPCCommunicator(
+ final Supervisor supervisor,
final Timer t,
final int index,
final int nodeStateRequestTimeoutIntervalMaxMs,
@@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator {
this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage;
this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage;
this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds;
- this.supervisor = new Supervisor(new Transport());
+ this.supervisor = supervisor;
}
public void shutdown() {
@@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator {
if ( ! connection.isValid()) {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
}
+ // TODO remove this deprecated legacy stuff
if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
@@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
return;
}
- if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
+ final int nodeVersion = node.getVersion();
+ // TODO remove this deprecated legacy stuff
+ if (nodeVersion == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
}
Request req;
- if (node.getVersion() == 0) {
+ if (nodeVersion == 0) {
req = new Request("setsystemstate");
req.parameters().add(new StringValue(baselineState.toString(true)));
- } else {
- req = new Request("setsystemstate2");
+ } else if (nodeVersion <= 2) {
+ req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME);
req.parameters().add(new StringValue(baselineState.toString(false)));
+ } else {
+ req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME);
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle);
+ Values v = req.parameters();
+ v.add(new Int8Value(encodedBundle.getCompression().type().getCode()));
+ v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize()));
+ v.add(new DataValue(encodedBundle.getCompression().data()));
}
+ log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d",
+ req.methodName(), node.getRpcAddress(), stateBundle.getVersion()));
RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
index 39506ddf7fa..96a776a65b8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
@@ -30,7 +30,8 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
public SetClusterStateRequest.Reply getReply(Request req) {
NodeInfo info = request.getNodeInfo();
- if (req.methodName().equals("setsystemstate2")) {
+ if (req.methodName().equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)
+ || req.methodName().equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (req.isError() && req.errorCode() == ErrorCode.NO_SUCH_METHOD) {
if (info.notifyNoSuchMethodError(req.methodName(), timer)) {
return new SetClusterStateRequest.Reply(Communicator.TRANSIENT_ERROR, "Trying lower version");
@@ -39,7 +40,7 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
if (req.isError()) {
return new SetClusterStateRequest.Reply(req.errorCode(), req.errorMessage());
} else if (!req.checkReturnTypes("")) {
- return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got setsystemstate2 response with invalid return types from " + info);
+ return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
} else {
return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Unknown method " + req.methodName());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
new file mode 100644
index 00000000000..eb6c2280491
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -0,0 +1,50 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.slime.*;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+
+ private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
+
+ @Override
+ public EncodedClusterStateBundle encode(ClusterStateBundle stateBundle) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ Cursor states = root.setObject("states");
+ // TODO add another function that is not toString for this..!
+ states.setString("baseline", stateBundle.getBaselineClusterState().toString());
+ Cursor spaces = states.setObject("spaces");
+ stateBundle.getDerivedBucketSpaceStates().entrySet()
+ .forEach(entry -> spaces.setString(entry.getKey(), entry.getValue().toString()));
+
+ byte[] serialized = BinaryFormat.encode(slime);
+ Compressor.Compression compression = compressor.compress(serialized);
+ return EncodedClusterStateBundle.fromCompressionBuffer(compression);
+ }
+
+ @Override
+ public ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle) {
+ byte[] uncompressed = compressor.decompress(encodedClusterStateBundle.getCompression());
+ Slime slime = BinaryFormat.decode(uncompressed);
+ Inspector root = slime.get();
+ Inspector states = root.field("states");
+ ClusterState baseline = ClusterState.stateFromString(states.field("baseline").asString());
+
+ Inspector spaces = states.field("spaces");
+ Map<String, ClusterState> derivedStates = new HashMap<>();
+ spaces.traverse(((ObjectTraverser)(key, value) -> {
+ derivedStates.put(key, ClusterState.stateFromString(value.asString()));
+ }));
+
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
+ }
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
index 4862c83f2a6..5692e961cbe 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
@@ -20,7 +20,7 @@ import java.util.TreeMap;
import static org.mockito.Mockito.mock;
-class ClusterFixture {
+public class ClusterFixture {
public final ContentCluster cluster;
public final Distribution distribution;
public final FakeTimer timer;
@@ -28,7 +28,7 @@ class ClusterFixture {
public final StateChangeHandler nodeStateChangeHandler;
public final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params();
- ClusterFixture(ContentCluster cluster, Distribution distribution) {
+ public ClusterFixture(ContentCluster cluster, Distribution distribution) {
this.cluster = cluster;
this.distribution = distribution;
this.timer = new FakeTimer();
@@ -37,13 +37,13 @@ class ClusterFixture {
this.params.cluster(this.cluster);
}
- StateChangeHandler createNodeStateChangeHandlerForCluster() {
+ public StateChangeHandler createNodeStateChangeHandlerForCluster() {
final int controllerIndex = 0;
MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), controllerIndex);
return new StateChangeHandler(timer, eventLog, metricUpdater);
}
- ClusterFixture bringEntireClusterUp() {
+ public ClusterFixture bringEntireClusterUp() {
cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> {
reportStorageNodeState(idx, State.UP);
reportDistributorNodeState(idx, State.UP);
@@ -51,7 +51,7 @@ class ClusterFixture {
return this;
}
- ClusterFixture markEntireClusterDown() {
+ public ClusterFixture markEntireClusterDown() {
cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> {
reportStorageNodeState(idx, State.DOWN);
reportDistributorNodeState(idx, State.DOWN);
@@ -69,7 +69,7 @@ class ClusterFixture {
nodeInfo.setReportedState(nodeState, timer.getCurrentTimeInMillis());
}
- ClusterFixture reportStorageNodeState(final int index, State state, String description) {
+ public ClusterFixture reportStorageNodeState(final int index, State state, String description) {
final Node node = new Node(NodeType.STORAGE, index);
final NodeState nodeState = new NodeState(NodeType.STORAGE, state);
nodeState.setDescription(description);
@@ -77,23 +77,23 @@ class ClusterFixture {
return this;
}
- ClusterFixture reportStorageNodeState(final int index, State state) {
+ public ClusterFixture reportStorageNodeState(final int index, State state) {
return reportStorageNodeState(index, state, "mockdesc");
}
- ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) {
+ public ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) {
doReportNodeState(new Node(NodeType.STORAGE, index), nodeState);
return this;
}
- ClusterFixture reportDistributorNodeState(final int index, State state) {
+ public ClusterFixture reportDistributorNodeState(final int index, State state) {
final Node node = new Node(NodeType.DISTRIBUTOR, index);
final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state);
doReportNodeState(node, nodeState);
return this;
}
- ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) {
+ public ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) {
doReportNodeState(new Node(NodeType.DISTRIBUTOR, index), nodeState);
return this;
}
@@ -108,18 +108,18 @@ class ClusterFixture {
nodeStateChangeHandler.proposeNewNodeState(stateBefore, nodeInfo, nodeState);
}
- ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) {
+ public ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) {
final Node node = new Node(NodeType.STORAGE, index);
final NodeState nodeState = new NodeState(NodeType.STORAGE, state);
doProposeWantedState(node, nodeState, description);
return this;
}
- ClusterFixture proposeStorageNodeWantedState(final int index, State state) {
+ public ClusterFixture proposeStorageNodeWantedState(final int index, State state) {
return proposeStorageNodeWantedState(index, state, "mockdesc");
}
- ClusterFixture proposeDistributorWantedState(final int index, State state) {
+ public ClusterFixture proposeDistributorWantedState(final int index, State state) {
final ClusterState stateBefore = rawGeneratedClusterState();
final Node node = new Node(NodeType.DISTRIBUTOR, index);
final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state);
@@ -131,12 +131,12 @@ class ClusterFixture {
return this;
}
- ClusterFixture disableAutoClusterTakedown() {
+ public ClusterFixture disableAutoClusterTakedown() {
setMinNodesUp(0, 0, 0.0, 0.0);
return this;
}
- ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) {
+ public ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) {
params.minStorageNodesUp(minStorNodes)
.minDistributorNodesUp(minDistNodes)
.minRatioOfStorageNodesUp(minStorRatio)
@@ -144,32 +144,32 @@ class ClusterFixture {
return this;
}
- ClusterFixture setMinNodeRatioPerGroup(double upRatio) {
+ public ClusterFixture setMinNodeRatioPerGroup(double upRatio) {
params.minNodeRatioPerGroup(upRatio);
return this;
}
- ClusterFixture assignDummyRpcAddresses() {
+ public ClusterFixture assignDummyRpcAddresses() {
cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0"));
return this;
}
- static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
+ static public Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime);
maxTransitionTime.put(NodeType.STORAGE, storageTransitionTime);
return maxTransitionTime;
}
- void disableTransientMaintenanceModeOnDown() {
+ public void disableTransientMaintenanceModeOnDown() {
this.params.transitionTimes(0);
}
- void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) {
+ public void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) {
this.params.transitionTimes(transitionTimeMs);
}
- ClusterFixture markNodeAsConfigRetired(int nodeIndex) {
+ public ClusterFixture markNodeAsConfigRetired(int nodeIndex) {
Set<ConfiguredNode> configuredNodes = new HashSet<>(cluster.getConfiguredNodes().values());
configuredNodes.remove(new ConfiguredNode(nodeIndex, false));
configuredNodes.add(new ConfiguredNode(nodeIndex, true));
@@ -177,24 +177,24 @@ class ClusterFixture {
return this;
}
- AnnotatedClusterState annotatedGeneratedClusterState() {
+ public AnnotatedClusterState annotatedGeneratedClusterState() {
params.currentTimeInMilllis(timer.getCurrentTimeInMillis());
return ClusterStateGenerator.generatedStateFrom(params);
}
- ClusterState rawGeneratedClusterState() {
+ public ClusterState rawGeneratedClusterState() {
return annotatedGeneratedClusterState().getClusterState();
}
- String generatedClusterState() {
+ public String generatedClusterState() {
return annotatedGeneratedClusterState().getClusterState().toString();
}
- String verboseGeneratedClusterState() {
+ public String verboseGeneratedClusterState() {
return annotatedGeneratedClusterState().getClusterState().toString(true);
}
- static ClusterFixture forFlatCluster(int nodeCount) {
+ public static ClusterFixture forFlatCluster(int nodeCount) {
Collection<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(nodeCount);
Distribution distribution = DistributionBuilder.forFlatCluster(nodeCount);
@@ -203,7 +203,7 @@ class ClusterFixture {
return new ClusterFixture(cluster, distribution);
}
- static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) {
+ public static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) {
List<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(root.totalNodeCount());
Distribution distribution = DistributionBuilder.forHierarchicCluster(root);
ContentCluster cluster = new ContentCluster("foo", nodes, distribution, 0, 0.0);
@@ -211,19 +211,19 @@ class ClusterFixture {
return new ClusterFixture(cluster, distribution);
}
- ClusterStateGenerator.Params generatorParams() {
+ public ClusterStateGenerator.Params generatorParams() {
return new ClusterStateGenerator.Params().cluster(cluster);
}
- ContentCluster cluster() {
+ public ContentCluster cluster() {
return this.cluster;
}
- static Node storageNode(int index) {
+ public static Node storageNode(int index) {
return new Node(NodeType.STORAGE, index);
}
- static Node distributorNode(int index) {
+ public static Node distributorNode(int index) {
return new Node(NodeType.DISTRIBUTOR, index);
}
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java
new file mode 100644
index 00000000000..31adb93fc49
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java
@@ -0,0 +1,16 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ClusterStateBundleUtil {
+
+ public static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
+ Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java
index a6e62d29ab5..ae59336b5ef 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java
@@ -17,8 +17,9 @@ public class DistributionBitCountTest extends FleetControllerTest {
private void setUpSystem(String testName) throws Exception {
List<ConfiguredNode> configuredNodes = new ArrayList<>();
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0 ; i < 10; i++) {
configuredNodes.add(new ConfiguredNode(i, false));
+ }
FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes);
options.distributionBits = 17;
setUpFleetController(false, options);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index 5fd7c26617e..a2da6bd1c6c 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
import com.yahoo.jrt.*;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.slobrok.api.BackOffPolicy;
@@ -8,6 +10,10 @@ import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.*;
+import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil;
+import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -345,6 +351,14 @@ public class DummyVdsNode {
supervisor.addMethod(m);
}
}
+ if (stateCommunicationVersion >= RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) {
+ m = new Method(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME, "bix", "", this, "rpc_setDistributionStates");
+ m.methodDesc("Set distribution states for cluster and bucket spaces");
+ m.paramDesc(0, "compressionType", "Compression type for payload");
+ m.paramDesc(1, "uncompressedSize", "Uncompressed size of payload");
+ m.paramDesc(2, "payload", "Slime format payload");
+ supervisor.addMethod(m);
+ }
}
public void rpc_storageConnect(Request req) {
@@ -501,4 +515,25 @@ public class DummyVdsNode {
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
}
+
+ public void rpc_setDistributionStates(Request req) {
+ try {
+ if (shouldFailSetSystemStateRequests()) {
+ req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setDistributionStates() calls");
+ return;
+ }
+ ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
+ ClusterState newState = stateBundle.getBaselineClusterState();
+ synchronized(timer) {
+ updateStartTimestamps(newState);
+ systemState.add(0, newState);
+ timer.notifyAll();
+ }
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState);
+ } catch (Exception e) {
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
+ e.printStackTrace(System.err);
+ req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
+ }
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
index b0b833000d2..bda06248d9e 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
@@ -1,9 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
+
/**
- *
+ * Options for simulated content nodes that are registered in Slobrok and communicate
+ * over regular RPC.
*/
public class DummyVdsNodeOptions {
- public int stateCommunicationVersion = 2; // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+
+ // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+
+ public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 662c36b29f7..19d87b878d4 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -144,6 +144,7 @@ public abstract class FleetControllerTest implements Waiter {
options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log);
Communicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
options.fleetControllerIndex,
options.nodeStateRequestTimeoutMS,
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java
index fa92a4d5246..0e2b9557cb9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java
@@ -12,7 +12,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class MaintenancehenPendingGlobalMergesTest {
+public class MaintenanceWhenPendingGlobalMergesTest {
private static class Fixture {
public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
new file mode 100644
index 00000000000..d59dbb4933a
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
@@ -0,0 +1,36 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.distribution.ConfiguredNode;
+import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.State;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RpcVersionAutoDowngradeTest extends FleetControllerTest {
+
+ private void setUpFakeCluster(int nodeRpcVersion) throws Exception {
+ List<ConfiguredNode> configuredNodes = new ArrayList<>();
+ for (int i = 0 ; i < 10; i++) {
+ configuredNodes.add(new ConfiguredNode(i, false));
+ }
+ FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes);
+ setUpFleetController(false, options);
+ DummyVdsNodeOptions nodeOptions = new DummyVdsNodeOptions();
+ nodeOptions.stateCommunicationVersion = nodeRpcVersion;
+ List<DummyVdsNode> nodes = setUpVdsNodes(false, nodeOptions, true, configuredNodes);
+ for (DummyVdsNode node : nodes) {
+ node.setNodeState(new NodeState(node.getType(), State.UP).setMinUsedBits(20));
+ node.connect();
+ }
+ }
+
+ @Test
+ public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception {
+ setUpFakeCluster(2); // HEAD is at v3
+ waitForState("version:\\d+ distributor:10 storage:10");
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java
new file mode 100644
index 00000000000..f88802c09ef
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java
@@ -0,0 +1,20 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+public class StateMapping {
+
+ final String bucketSpace;
+ final ClusterState state;
+
+ private StateMapping(String bucketSpace, ClusterState state) {
+ this.bucketSpace = bucketSpace;
+ this.state = state;
+ }
+
+ public static StateMapping of(String bucketSpace, String state) {
+ return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 93aac5c83ed..aa5219147ce 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -7,7 +7,6 @@ import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListen
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import org.junit.Test;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.Matchers.any;
@@ -63,29 +62,10 @@ public class SystemStateBroadcasterTest {
return Stream.of(nodes).map(c::getNodeInfo);
}
- private static class StateMapping {
- final String bucketSpace;
- final ClusterState state;
-
- StateMapping(String bucketSpace, ClusterState state) {
- this.bucketSpace = bucketSpace;
- this.state = state;
- }
- }
-
- private static StateMapping mapping(String bucketSpace, String state) {
- return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
- }
-
- private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
- return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
- Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
- }
-
@Test
public void always_publish_baseline_cluster_state() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
@@ -95,7 +75,7 @@ public class SystemStateBroadcasterTest {
@Test
public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
@@ -105,16 +85,16 @@ public class SystemStateBroadcasterTest {
// Only distributor 0 should observe startup timestamps
verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
});
- ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
+ ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
@Test
public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
- mapping("default", "distributor:2 storage:2 .0.s:d"),
- mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
@@ -125,9 +105,9 @@ public class SystemStateBroadcasterTest {
@Test
public void non_observed_startup_timestamps_are_published_per_bucket_space_state() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
- mapping("default", "distributor:2 storage:2 .0.s:d"),
- mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
@@ -137,9 +117,9 @@ public class SystemStateBroadcasterTest {
// Only distributor 0 should observe startup timestamps
verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
});
- ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
- mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
- mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
+ ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
index 73692dbdbfa..2665175c637 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
@@ -1,24 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.rpc;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.RequestWaiter;
-import com.yahoo.jrt.Target;
+import com.yahoo.jrt.*;
import com.yahoo.vdslib.state.Node;
-import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vespa.clustercontroller.core.*;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -34,6 +34,7 @@ public class RPCCommunicatorTest {
@Test
public void testGenerateNodeStateRequestTimeoutMs() throws Exception {
final RPCCommunicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
null /* Timer */,
INDEX,
NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS,
@@ -59,7 +60,7 @@ public class RPCCommunicatorTest {
@Test
public void testGenerateNodeStateRequestTimeoutMsWithUpdates() throws Exception {
- final RPCCommunicator communicator = new RPCCommunicator(null /* Timer */, INDEX, 1, 1, 100, 0);
+ final RPCCommunicator communicator = new RPCCommunicator(RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, 1, 1, 100, 0);
FleetControllerOptions fleetControllerOptions = new FleetControllerOptions(null /*clustername*/);
fleetControllerOptions.nodeStateRequestTimeoutEarliestPercentage = 100;
fleetControllerOptions.nodeStateRequestTimeoutLatestPercentage = 100;
@@ -73,6 +74,7 @@ public class RPCCommunicatorTest {
public void testRoundtripLatency() throws Exception {
final Timer timer = new FakeTimer();
final RPCCommunicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
INDEX,
NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS,
@@ -91,4 +93,77 @@ public class RPCCommunicatorTest {
eq(ROUNDTRIP_LATENCY_SECONDS + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS/1000.0),
(RequestWaiter)any());
}
+
+ private static class Fixture {
+ final Supervisor mockSupervisor = mock(Supervisor.class);
+ final Target mockTarget = mock(Target.class);
+ final Timer timer = new FakeTimer();
+ final RPCCommunicator communicator;
+ final AtomicReference<Request> receivedRequest = new AtomicReference<>();
+ final AtomicReference<RequestWaiter> receivedWaiter = new AtomicReference<>();
+ @SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics
+ final Communicator.Waiter<SetClusterStateRequest> mockWaiter = mock(Communicator.Waiter.class);
+
+ Fixture() {
+ communicator = new RPCCommunicator(
+ mockSupervisor,
+ timer,
+ INDEX,
+ NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS,
+ NODE_STATE_REQUEST_TIMEOUT_INTERVAL_STOP_PERCENTAGE,
+ 100,
+ ROUNDTRIP_LATENCY_SECONDS);
+
+ when(mockSupervisor.connect(any())).thenReturn(mockTarget);
+ when(mockTarget.isValid()).thenReturn(true);
+ doAnswer((invocation) -> {
+ receivedRequest.set((Request) invocation.getArguments()[0]);
+ receivedWaiter.set((RequestWaiter) invocation.getArguments()[2]);
+ return null;
+ }).when(mockTarget).invokeAsync(any(), anyDouble(), any());
+ }
+ }
+
+ @Test
+ public void setSystemState_v3_sends_distribution_states_rpc() {
+ Fixture f = new Fixture();
+ ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3");
+ f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
+
+ Request req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+ assertThat(req.methodName(), equalTo(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME));
+ assertTrue(req.parameters().satisfies("bix")); // <compression type>, <uncompressed size>, <payload>
+
+ ClusterStateBundle receivedBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
+ assertThat(receivedBundle, equalTo(sentBundle));
+ }
+
+ @Test
+ public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() {
+ Fixture f = new Fixture();
+ ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3");
+ f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
+
+ RequestWaiter waiter = f.receivedWaiter.get();
+ assertThat(waiter, notNullValue());
+ Request req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+
+ req.setError(ErrorCode.NO_SUCH_METHOD, "que?");
+ waiter.handleRequestDone(req);
+
+ // This would normally be done in processResponses(), but that code path is not invoked in this test.
+ cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false);
+
+ f.receivedRequest.set(null);
+ // Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC
+ f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
+ req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+ assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME));
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java
new file mode 100644
index 00000000000..2fdf6b5c8a8
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java
@@ -0,0 +1,21 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.jrt.Request;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+public class RPCUtil {
+
+ public static ClusterStateBundle decodeStateBundleFromSetDistributionStatesRequest(Request req) {
+ final CompressionType type = CompressionType.valueOf(req.parameters().get(0).asInt8());
+ final int uncompressedSize = req.parameters().get(1).asInt32();
+ final byte[] compressedPayload = req.parameters().get(2).asData();
+
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ Compressor.Compression compression = new Compressor.Compression(type, uncompressedSize, compressedPayload);
+ return codec.decode(EncodedClusterStateBundle.fromCompressionBuffer(compression));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
new file mode 100644
index 00000000000..f2fe494ce61
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
@@ -0,0 +1,56 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundleUtil;
+import com.yahoo.vespa.clustercontroller.core.StateMapping;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertThat;
+
+public class SlimeClusterStateBundleCodecTest {
+
+ private static ClusterStateBundle roundtripEncode(ClusterStateBundle stateBundle) {
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encoded = codec.encode(stateBundle);
+ return codec.decode(encoded);
+ }
+
+ @Test
+ public void baseline_only_bundle_can_be_round_trip_encoded() {
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
+ assertThat(roundtripEncode(stateBundle), equalTo(stateBundle));
+ }
+
+ @Test
+ public void multi_space_state_bundle_can_be_round_trip_encoded() {
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+ assertThat(roundtripEncode(stateBundle), equalTo(stateBundle));
+ }
+
+ private static ClusterStateBundle makeCompressableBundle() {
+ StringBuilder allDownStates = new StringBuilder(2048);
+ for (int i = 0; i < 99; ++i) {
+ allDownStates.append(" .").append(i).append(".s:d");
+ }
+ // Exact same state set string repeated twice; perfect compression fodder.
+ return ClusterStateBundleUtil.makeBundle(String.format("distributor:100%s storage:100%s",
+ allDownStates.toString(), allDownStates.toString()));
+ }
+
+ @Test
+ public void encoded_cluster_states_can_be_compressed() {
+ ClusterStateBundle stateBundle = makeCompressableBundle();
+
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encoded = codec.encode(stateBundle);
+ assertThat(encoded.getCompression().data().length, lessThan(stateBundle.getBaselineClusterState().toString().length()));
+ ClusterStateBundle decodedBundle = codec.decode(encoded);
+ assertThat(decodedBundle, equalTo(stateBundle));
+ }
+
+}