aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2018-02-28 11:12:51 +0100
committerGitHub <noreply@github.com>2018-02-28 11:12:51 +0100
commit29b39068a3ffd22f906e72fdf2eedb9fe4d9d096 (patch)
treecddcbd87902828e5136513ac807954220debed2a
parenta35db3bc1005adb1d49335acef98aba1761368b9 (diff)
parent955fe835a6a37b7673c286ea2954f1d2359b3c87 (diff)
Merge pull request #5164 from vespa-engine/vekterli/add-v3-cluster-state-propagation-rpc
Add v3 RPC for sending cluster states to content nodes
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java1
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java31
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java21
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java29
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java58
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java62
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java20
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java3
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java35
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java8
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java1
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java (renamed from clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java)2
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java36
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java24
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java44
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java89
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java21
-rw-r--r--clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java56
21 files changed, 490 insertions, 92 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index e755c2b8117..5c345b6f8a0 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -178,6 +178,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log);
Communicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
options.fleetControllerIndex,
options.nodeStateRequestTimeoutMS,
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 15650f0f4aa..55331e4bbed 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.distribution.Group;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -205,7 +206,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
}
public int getLatestVersion() {
- return 2;
+ return RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
}
public String getSlobrokAddress() {
@@ -375,21 +376,23 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public boolean notifyNoSuchMethodError(String methodName, Timer timer) {
if (methodName.equals("getnodestate3")) {
if (version > 1) {
- log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 1.");
- version = 1;
- nextAttemptTime = 0;
- adjustedVersionTime = timer.getCurrentTimeInMillis();
+ downgradeToRpcVersion(1, methodName, timer);
return true;
} else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 1 and was recently adjusted, so ignoring it.");
return true;
}
- } else if (methodName.equals("getnodestate2") || methodName.equals("setsystemstate2")) {
+ } else if (methodName.equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)) {
+ if (version == RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) {
+ downgradeToRpcVersion(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_VERSION, methodName, timer);
+ return true;
+ } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
+ log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Version already downgraded, so ignoring it.");
+ return true;
+ }
+ } else if (methodName.equals("getnodestate2") || methodName.equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (version > 0) {
- log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 0.");
- version = 0;
- nextAttemptTime = 0;
- adjustedVersionTime = timer.getCurrentTimeInMillis();
+ downgradeToRpcVersion(0, methodName, timer);
return true;
} else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 0 and was recently adjusted, so ignoring it.");
@@ -400,6 +403,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
return false;
}
+ private void downgradeToRpcVersion(int newVersion, String methodName, Timer timer) {
+ log.log(LogLevel.DEBUG, () -> String.format("Node %s does not support %s call. Downgrading to version %d.",
+ toString(), methodName, newVersion));
+ version = newVersion;
+ nextAttemptTime = 0;
+ adjustedVersionTime = timer.getCurrentTimeInMillis();
+ }
+
public Target getConnection() {
return connection;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 45a14ee19cd..b0869b560f6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -59,8 +59,8 @@ public class SystemStateBroadcaster {
int version = req.getSystemStateVersion();
if (req.getReply().isError()) {
+ info.setSystemStateVersionAcknowledged(version, false);
if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
- info.setSystemStateVersionAcknowledged(version, false);
if (info.getNewestSystemStateVersionSent() == version) {
reportNodeError(nodeOk, info,
"Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage()
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
new file mode 100644
index 00000000000..250d7bbe46a
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
@@ -0,0 +1,21 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+/**
+ * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC.
+ *
+ * Implementations may choose to compress the encoded representation of the bundle.
+ *
+ * It is important that the input given to decode() is exactly equal to that given from
+ * encode() for the results to be correct. Implementations must ensure that this information
+ * is enough to losslessly reconstruct the full encoded ClusterStateBundle.
+ */
+public interface ClusterStateBundleCodec {
+
+ EncodedClusterStateBundle encode(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
new file mode 100644
index 00000000000..784c8c23c87
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
@@ -0,0 +1,29 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.Compressor;
+
+/**
+ * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle.
+ *
+ * This bundle can in turn be sent over the wire or serialized by ensuring that all components
+ * of the Compressor.Compression state can be reconstructed by the receiver. In practice this
+ * means sending the Compression's <em>type</em>, <em>uncompressedSize</em> and <em>data</em>.
+ */
+public class EncodedClusterStateBundle {
+
+ private final Compressor.Compression compression;
+
+ private EncodedClusterStateBundle(Compressor.Compression compression) {
+ this.compression = compression;
+ }
+
+ public static EncodedClusterStateBundle fromCompressionBuffer(Compressor.Compression compression) {
+ return new EncodedClusterStateBundle(compression);
+ }
+
+ public Compressor.Compression getCompression() {
+ return compression;
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 0cc5ef36e3c..8941b2e32ec 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator {
public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName());
+ public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3;
+ public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates";
+
+ public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
+ public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
+
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator {
private int nodeStateRequestRoundTripTimeMaxSeconds;
private final int fleetControllerIndex;
+ public static Supervisor createRealSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
public RPCCommunicator(
+ final Supervisor supervisor,
final Timer t,
final int index,
final int nodeStateRequestTimeoutIntervalMaxMs,
@@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator {
this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage;
this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage;
this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds;
- this.supervisor = new Supervisor(new Transport());
+ this.supervisor = supervisor;
}
public void shutdown() {
@@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator {
if ( ! connection.isValid()) {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
}
+ // TODO remove this deprecated legacy stuff
if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
@@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
return;
}
- if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
+ final int nodeVersion = node.getVersion();
+ // TODO remove this deprecated legacy stuff
+ if (nodeVersion == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
}
Request req;
- if (node.getVersion() == 0) {
+ if (nodeVersion == 0) {
req = new Request("setsystemstate");
req.parameters().add(new StringValue(baselineState.toString(true)));
- } else {
- req = new Request("setsystemstate2");
+ } else if (nodeVersion <= 2) {
+ req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME);
req.parameters().add(new StringValue(baselineState.toString(false)));
+ } else {
+ req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME);
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle);
+ Values v = req.parameters();
+ v.add(new Int8Value(encodedBundle.getCompression().type().getCode()));
+ v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize()));
+ v.add(new DataValue(encodedBundle.getCompression().data()));
}
+ log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d",
+ req.methodName(), node.getRpcAddress(), stateBundle.getVersion()));
RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
index 39506ddf7fa..96a776a65b8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
@@ -30,7 +30,8 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
public SetClusterStateRequest.Reply getReply(Request req) {
NodeInfo info = request.getNodeInfo();
- if (req.methodName().equals("setsystemstate2")) {
+ if (req.methodName().equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)
+ || req.methodName().equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (req.isError() && req.errorCode() == ErrorCode.NO_SUCH_METHOD) {
if (info.notifyNoSuchMethodError(req.methodName(), timer)) {
return new SetClusterStateRequest.Reply(Communicator.TRANSIENT_ERROR, "Trying lower version");
@@ -39,7 +40,7 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
if (req.isError()) {
return new SetClusterStateRequest.Reply(req.errorCode(), req.errorMessage());
} else if (!req.checkReturnTypes("")) {
- return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got setsystemstate2 response with invalid return types from " + info);
+ return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
} else {
return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Unknown method " + req.methodName());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
new file mode 100644
index 00000000000..c37bd8313a9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -0,0 +1,58 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.slime.*;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding
+ * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is
+ * intentionally extensible so that we may add other information to it later.
+ *
+ * LZ4 compression is transparently applied during encoding and decompression is
+ * subsequently applied during decoding.
+ */
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+
+ private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
+
+ @Override
+ public EncodedClusterStateBundle encode(ClusterStateBundle stateBundle) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ Cursor states = root.setObject("states");
+ // TODO add another function that is not toString for this..!
+ states.setString("baseline", stateBundle.getBaselineClusterState().toString());
+ Cursor spaces = states.setObject("spaces");
+ stateBundle.getDerivedBucketSpaceStates().entrySet()
+ .forEach(entry -> spaces.setString(entry.getKey(), entry.getValue().toString()));
+
+ byte[] serialized = BinaryFormat.encode(slime);
+ Compressor.Compression compression = compressor.compress(serialized);
+ return EncodedClusterStateBundle.fromCompressionBuffer(compression);
+ }
+
+ @Override
+ public ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle) {
+ byte[] uncompressed = compressor.decompress(encodedClusterStateBundle.getCompression());
+ Slime slime = BinaryFormat.decode(uncompressed);
+ Inspector root = slime.get();
+ Inspector states = root.field("states");
+ ClusterState baseline = ClusterState.stateFromString(states.field("baseline").asString());
+
+ Inspector spaces = states.field("spaces");
+ Map<String, ClusterState> derivedStates = new HashMap<>();
+ spaces.traverse(((ObjectTraverser)(key, value) -> {
+ derivedStates.put(key, ClusterState.stateFromString(value.asString()));
+ }));
+
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
+ }
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
index 4862c83f2a6..5692e961cbe 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterFixture.java
@@ -20,7 +20,7 @@ import java.util.TreeMap;
import static org.mockito.Mockito.mock;
-class ClusterFixture {
+public class ClusterFixture {
public final ContentCluster cluster;
public final Distribution distribution;
public final FakeTimer timer;
@@ -28,7 +28,7 @@ class ClusterFixture {
public final StateChangeHandler nodeStateChangeHandler;
public final ClusterStateGenerator.Params params = new ClusterStateGenerator.Params();
- ClusterFixture(ContentCluster cluster, Distribution distribution) {
+ public ClusterFixture(ContentCluster cluster, Distribution distribution) {
this.cluster = cluster;
this.distribution = distribution;
this.timer = new FakeTimer();
@@ -37,13 +37,13 @@ class ClusterFixture {
this.params.cluster(this.cluster);
}
- StateChangeHandler createNodeStateChangeHandlerForCluster() {
+ public StateChangeHandler createNodeStateChangeHandlerForCluster() {
final int controllerIndex = 0;
MetricUpdater metricUpdater = new MetricUpdater(new NoMetricReporter(), controllerIndex);
return new StateChangeHandler(timer, eventLog, metricUpdater);
}
- ClusterFixture bringEntireClusterUp() {
+ public ClusterFixture bringEntireClusterUp() {
cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> {
reportStorageNodeState(idx, State.UP);
reportDistributorNodeState(idx, State.UP);
@@ -51,7 +51,7 @@ class ClusterFixture {
return this;
}
- ClusterFixture markEntireClusterDown() {
+ public ClusterFixture markEntireClusterDown() {
cluster.clusterInfo().getConfiguredNodes().forEach((idx, node) -> {
reportStorageNodeState(idx, State.DOWN);
reportDistributorNodeState(idx, State.DOWN);
@@ -69,7 +69,7 @@ class ClusterFixture {
nodeInfo.setReportedState(nodeState, timer.getCurrentTimeInMillis());
}
- ClusterFixture reportStorageNodeState(final int index, State state, String description) {
+ public ClusterFixture reportStorageNodeState(final int index, State state, String description) {
final Node node = new Node(NodeType.STORAGE, index);
final NodeState nodeState = new NodeState(NodeType.STORAGE, state);
nodeState.setDescription(description);
@@ -77,23 +77,23 @@ class ClusterFixture {
return this;
}
- ClusterFixture reportStorageNodeState(final int index, State state) {
+ public ClusterFixture reportStorageNodeState(final int index, State state) {
return reportStorageNodeState(index, state, "mockdesc");
}
- ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) {
+ public ClusterFixture reportStorageNodeState(final int index, NodeState nodeState) {
doReportNodeState(new Node(NodeType.STORAGE, index), nodeState);
return this;
}
- ClusterFixture reportDistributorNodeState(final int index, State state) {
+ public ClusterFixture reportDistributorNodeState(final int index, State state) {
final Node node = new Node(NodeType.DISTRIBUTOR, index);
final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state);
doReportNodeState(node, nodeState);
return this;
}
- ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) {
+ public ClusterFixture reportDistributorNodeState(final int index, NodeState nodeState) {
doReportNodeState(new Node(NodeType.DISTRIBUTOR, index), nodeState);
return this;
}
@@ -108,18 +108,18 @@ class ClusterFixture {
nodeStateChangeHandler.proposeNewNodeState(stateBefore, nodeInfo, nodeState);
}
- ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) {
+ public ClusterFixture proposeStorageNodeWantedState(final int index, State state, String description) {
final Node node = new Node(NodeType.STORAGE, index);
final NodeState nodeState = new NodeState(NodeType.STORAGE, state);
doProposeWantedState(node, nodeState, description);
return this;
}
- ClusterFixture proposeStorageNodeWantedState(final int index, State state) {
+ public ClusterFixture proposeStorageNodeWantedState(final int index, State state) {
return proposeStorageNodeWantedState(index, state, "mockdesc");
}
- ClusterFixture proposeDistributorWantedState(final int index, State state) {
+ public ClusterFixture proposeDistributorWantedState(final int index, State state) {
final ClusterState stateBefore = rawGeneratedClusterState();
final Node node = new Node(NodeType.DISTRIBUTOR, index);
final NodeState nodeState = new NodeState(NodeType.DISTRIBUTOR, state);
@@ -131,12 +131,12 @@ class ClusterFixture {
return this;
}
- ClusterFixture disableAutoClusterTakedown() {
+ public ClusterFixture disableAutoClusterTakedown() {
setMinNodesUp(0, 0, 0.0, 0.0);
return this;
}
- ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) {
+ public ClusterFixture setMinNodesUp(int minDistNodes, int minStorNodes, double minDistRatio, double minStorRatio) {
params.minStorageNodesUp(minStorNodes)
.minDistributorNodesUp(minDistNodes)
.minRatioOfStorageNodesUp(minStorRatio)
@@ -144,32 +144,32 @@ class ClusterFixture {
return this;
}
- ClusterFixture setMinNodeRatioPerGroup(double upRatio) {
+ public ClusterFixture setMinNodeRatioPerGroup(double upRatio) {
params.minNodeRatioPerGroup(upRatio);
return this;
}
- ClusterFixture assignDummyRpcAddresses() {
+ public ClusterFixture assignDummyRpcAddresses() {
cluster.getNodeInfo().forEach(ni -> ni.setRpcAddress("tcp/localhost:0"));
return this;
}
- static Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
+ static public Map<NodeType, Integer> buildTransitionTimeMap(int distributorTransitionTime, int storageTransitionTime) {
Map<NodeType, Integer> maxTransitionTime = new TreeMap<>();
maxTransitionTime.put(NodeType.DISTRIBUTOR, distributorTransitionTime);
maxTransitionTime.put(NodeType.STORAGE, storageTransitionTime);
return maxTransitionTime;
}
- void disableTransientMaintenanceModeOnDown() {
+ public void disableTransientMaintenanceModeOnDown() {
this.params.transitionTimes(0);
}
- void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) {
+ public void enableTransientMaintenanceModeOnDown(final int transitionTimeMs) {
this.params.transitionTimes(transitionTimeMs);
}
- ClusterFixture markNodeAsConfigRetired(int nodeIndex) {
+ public ClusterFixture markNodeAsConfigRetired(int nodeIndex) {
Set<ConfiguredNode> configuredNodes = new HashSet<>(cluster.getConfiguredNodes().values());
configuredNodes.remove(new ConfiguredNode(nodeIndex, false));
configuredNodes.add(new ConfiguredNode(nodeIndex, true));
@@ -177,24 +177,24 @@ class ClusterFixture {
return this;
}
- AnnotatedClusterState annotatedGeneratedClusterState() {
+ public AnnotatedClusterState annotatedGeneratedClusterState() {
params.currentTimeInMilllis(timer.getCurrentTimeInMillis());
return ClusterStateGenerator.generatedStateFrom(params);
}
- ClusterState rawGeneratedClusterState() {
+ public ClusterState rawGeneratedClusterState() {
return annotatedGeneratedClusterState().getClusterState();
}
- String generatedClusterState() {
+ public String generatedClusterState() {
return annotatedGeneratedClusterState().getClusterState().toString();
}
- String verboseGeneratedClusterState() {
+ public String verboseGeneratedClusterState() {
return annotatedGeneratedClusterState().getClusterState().toString(true);
}
- static ClusterFixture forFlatCluster(int nodeCount) {
+ public static ClusterFixture forFlatCluster(int nodeCount) {
Collection<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(nodeCount);
Distribution distribution = DistributionBuilder.forFlatCluster(nodeCount);
@@ -203,7 +203,7 @@ class ClusterFixture {
return new ClusterFixture(cluster, distribution);
}
- static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) {
+ public static ClusterFixture forHierarchicCluster(DistributionBuilder.GroupBuilder root) {
List<ConfiguredNode> nodes = DistributionBuilder.buildConfiguredNodes(root.totalNodeCount());
Distribution distribution = DistributionBuilder.forHierarchicCluster(root);
ContentCluster cluster = new ContentCluster("foo", nodes, distribution, 0, 0.0);
@@ -211,19 +211,19 @@ class ClusterFixture {
return new ClusterFixture(cluster, distribution);
}
- ClusterStateGenerator.Params generatorParams() {
+ public ClusterStateGenerator.Params generatorParams() {
return new ClusterStateGenerator.Params().cluster(cluster);
}
- ContentCluster cluster() {
+ public ContentCluster cluster() {
return this.cluster;
}
- static Node storageNode(int index) {
+ public static Node storageNode(int index) {
return new Node(NodeType.STORAGE, index);
}
- static Node distributorNode(int index) {
+ public static Node distributorNode(int index) {
return new Node(NodeType.DISTRIBUTOR, index);
}
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java
new file mode 100644
index 00000000000..e097874682a
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/ClusterStateBundleUtil.java
@@ -0,0 +1,20 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Helper functions for constructing a ClusterStateBundle for a baseline state and zero or more
+ * explicit bucket space states.
+ */
+public class ClusterStateBundleUtil {
+
+ public static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
+ Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java
index a6e62d29ab5..ae59336b5ef 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DistributionBitCountTest.java
@@ -17,8 +17,9 @@ public class DistributionBitCountTest extends FleetControllerTest {
private void setUpSystem(String testName) throws Exception {
List<ConfiguredNode> configuredNodes = new ArrayList<>();
- for (int i = 0 ; i < 10; i++)
+ for (int i = 0 ; i < 10; i++) {
configuredNodes.add(new ConfiguredNode(i, false));
+ }
FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes);
options.distributionBits = 17;
setUpFleetController(false, options);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
index 5fd7c26617e..a2da6bd1c6c 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNode.java
@@ -1,6 +1,8 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
import com.yahoo.jrt.*;
import com.yahoo.jrt.StringValue;
import com.yahoo.jrt.slobrok.api.BackOffPolicy;
@@ -8,6 +10,10 @@ import com.yahoo.jrt.slobrok.api.Register;
import com.yahoo.jrt.slobrok.api.SlobrokList;
import com.yahoo.log.LogLevel;
import com.yahoo.vdslib.state.*;
+import com.yahoo.vespa.clustercontroller.core.rpc.EncodedClusterStateBundle;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCUtil;
+import com.yahoo.vespa.clustercontroller.core.rpc.SlimeClusterStateBundleCodec;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -345,6 +351,14 @@ public class DummyVdsNode {
supervisor.addMethod(m);
}
}
+ if (stateCommunicationVersion >= RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) {
+ m = new Method(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME, "bix", "", this, "rpc_setDistributionStates");
+ m.methodDesc("Set distribution states for cluster and bucket spaces");
+ m.paramDesc(0, "compressionType", "Compression type for payload");
+ m.paramDesc(1, "uncompressedSize", "Uncompressed size of payload");
+ m.paramDesc(2, "payload", "Slime format payload");
+ supervisor.addMethod(m);
+ }
}
public void rpc_storageConnect(Request req) {
@@ -501,4 +515,25 @@ public class DummyVdsNode {
req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
}
}
+
+ public void rpc_setDistributionStates(Request req) {
+ try {
+ if (shouldFailSetSystemStateRequests()) {
+ req.setError(ErrorCode.GENERAL_ERROR, "Dummy node configured to fail setDistributionStates() calls");
+ return;
+ }
+ ClusterStateBundle stateBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
+ ClusterState newState = stateBundle.getBaselineClusterState();
+ synchronized(timer) {
+ updateStartTimestamps(newState);
+ systemState.add(0, newState);
+ timer.notifyAll();
+ }
+ log.log(LogLevel.DEBUG, "Dummy node " + this + ": Got new cluster state " + newState);
+ } catch (Exception e) {
+ log.log(LogLevel.ERROR, "Dummy node " + this + ": An error occured when answering setdistributionstates request: " + e.getMessage());
+ e.printStackTrace(System.err);
+ req.setError(ErrorCode.METHOD_FAILED, e.getMessage());
+ }
+ }
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
index b0b833000d2..bda06248d9e 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyVdsNodeOptions.java
@@ -1,9 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
+
/**
- *
+ * Options for simulated content nodes that are registered in Slobrok and communicate
+ * over regular RPC.
*/
public class DummyVdsNodeOptions {
- public int stateCommunicationVersion = 2; // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+
+ // 0 - 4.1, 1 - 4.2-5.0.10, 2 - 5.0.11+, 3 - 6.220+
+ public int stateCommunicationVersion = RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
index 662c36b29f7..19d87b878d4 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/FleetControllerTest.java
@@ -144,6 +144,7 @@ public abstract class FleetControllerTest implements Waiter {
options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log);
Communicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
options.fleetControllerIndex,
options.nodeStateRequestTimeoutMS,
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java
index fa92a4d5246..0e2b9557cb9 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenancehenPendingGlobalMergesTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/MaintenanceWhenPendingGlobalMergesTest.java
@@ -12,7 +12,7 @@ import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class MaintenancehenPendingGlobalMergesTest {
+public class MaintenanceWhenPendingGlobalMergesTest {
private static class Fixture {
public MergePendingChecker mockPendingChecker = mock(MergePendingChecker.class);
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
new file mode 100644
index 00000000000..d59dbb4933a
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/RpcVersionAutoDowngradeTest.java
@@ -0,0 +1,36 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.distribution.ConfiguredNode;
+import com.yahoo.vdslib.state.NodeState;
+import com.yahoo.vdslib.state.State;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class RpcVersionAutoDowngradeTest extends FleetControllerTest {
+
+ private void setUpFakeCluster(int nodeRpcVersion) throws Exception {
+ List<ConfiguredNode> configuredNodes = new ArrayList<>();
+ for (int i = 0 ; i < 10; i++) {
+ configuredNodes.add(new ConfiguredNode(i, false));
+ }
+ FleetControllerOptions options = new FleetControllerOptions("mycluster", configuredNodes);
+ setUpFleetController(false, options);
+ DummyVdsNodeOptions nodeOptions = new DummyVdsNodeOptions();
+ nodeOptions.stateCommunicationVersion = nodeRpcVersion;
+ List<DummyVdsNode> nodes = setUpVdsNodes(false, nodeOptions, true, configuredNodes);
+ for (DummyVdsNode node : nodes) {
+ node.setNodeState(new NodeState(node.getType(), State.UP).setMinUsedBits(20));
+ node.connect();
+ }
+ }
+
+ @Test
+ public void cluster_state_rpc_version_is_auto_downgraded_and_retried_for_older_nodes() throws Exception {
+ setUpFakeCluster(2); // HEAD is at v3
+ waitForState("version:\\d+ distributor:10 storage:10");
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java
new file mode 100644
index 00000000000..f6eaf86e8f4
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/StateMapping.java
@@ -0,0 +1,24 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core;
+
+import com.yahoo.vdslib.state.ClusterState;
+
+/**
+ * Tuple representing a mapping from a named bucket space to the derived ClusterState
+ * for that space.
+ */
+public class StateMapping {
+
+ final String bucketSpace;
+ final ClusterState state;
+
+ private StateMapping(String bucketSpace, ClusterState state) {
+ this.bucketSpace = bucketSpace;
+ this.state = state;
+ }
+
+ public static StateMapping of(String bucketSpace, String state) {
+ return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
index 93aac5c83ed..aa5219147ce 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcasterTest.java
@@ -7,7 +7,6 @@ import com.yahoo.vespa.clustercontroller.core.listeners.NodeAddedOrRemovedListen
import com.yahoo.vespa.clustercontroller.core.listeners.NodeStateOrHostInfoChangeHandler;
import org.junit.Test;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.mockito.Matchers.any;
@@ -63,29 +62,10 @@ public class SystemStateBroadcasterTest {
return Stream.of(nodes).map(c::getNodeInfo);
}
- private static class StateMapping {
- final String bucketSpace;
- final ClusterState state;
-
- StateMapping(String bucketSpace, ClusterState state) {
- this.bucketSpace = bucketSpace;
- this.state = state;
- }
- }
-
- private static StateMapping mapping(String bucketSpace, String state) {
- return new StateMapping(bucketSpace, ClusterState.stateFromString(state));
- }
-
- private static ClusterStateBundle makeBundle(String baselineState, StateMapping... bucketSpaceStates) {
- return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(ClusterState.stateFromString(baselineState)),
- Stream.of(bucketSpaceStates).collect(Collectors.toMap(sm -> sm.bucketSpace, sm -> sm.state)));
- }
-
@Test
public void always_publish_baseline_cluster_state() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
@@ -95,7 +75,7 @@ public class SystemStateBroadcasterTest {
@Test
public void non_observed_startup_timestamps_are_published_per_node_for_baseline_state() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2");
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
@@ -105,16 +85,16 @@ public class SystemStateBroadcasterTest {
// Only distributor 0 should observe startup timestamps
verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
});
- ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
+ ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700");
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
@Test
public void bucket_space_states_are_published_verbatim_when_no_additional_timestamps_needed() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
- mapping("default", "distributor:2 storage:2 .0.s:d"),
- mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.broadcaster.handleNewClusterStates(stateBundle);
f.broadcaster.broadcastNewState(dbContextFrom(cf.cluster()), f.mockCommunicator);
@@ -125,9 +105,9 @@ public class SystemStateBroadcasterTest {
@Test
public void non_observed_startup_timestamps_are_published_per_bucket_space_state() {
Fixture f = new Fixture();
- ClusterStateBundle stateBundle = makeBundle("distributor:2 storage:2",
- mapping("default", "distributor:2 storage:2 .0.s:d"),
- mapping("upsidedown", "distributor:2 .0.s:d storage:2"));
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
ClusterFixture cf = ClusterFixture.forFlatCluster(2).bringEntireClusterUp().assignDummyRpcAddresses();
f.simulateNodePartitionedAwaySilently(cf);
f.broadcaster.handleNewClusterStates(stateBundle);
@@ -137,9 +117,9 @@ public class SystemStateBroadcasterTest {
// Only distributor 0 should observe startup timestamps
verify(f.mockCommunicator).setSystemState(eq(stateBundle), eq(nodeInfo), any());
});
- ClusterStateBundle expectedDistr0Bundle = makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
- mapping("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
- mapping("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
+ ClusterStateBundle expectedDistr0Bundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2 .0.t:600 .1.t:700",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d .0.t:600 .1.t:700"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2 .0.t:600 .1.t:700"));
verify(f.mockCommunicator).setSystemState(eq(expectedDistr0Bundle), eq(cf.cluster().getNodeInfo(Node.ofDistributor(0))), any());
}
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
index 73692dbdbfa..2665175c637 100644
--- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java
@@ -1,24 +1,24 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.clustercontroller.core.rpc;
-import com.yahoo.jrt.Request;
-import com.yahoo.jrt.RequestWaiter;
-import com.yahoo.jrt.Target;
+import com.yahoo.jrt.*;
import com.yahoo.vdslib.state.Node;
-import com.yahoo.vdslib.state.NodeType;
import com.yahoo.vespa.clustercontroller.core.*;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsNot.not;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -34,6 +34,7 @@ public class RPCCommunicatorTest {
@Test
public void testGenerateNodeStateRequestTimeoutMs() throws Exception {
final RPCCommunicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
null /* Timer */,
INDEX,
NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS,
@@ -59,7 +60,7 @@ public class RPCCommunicatorTest {
@Test
public void testGenerateNodeStateRequestTimeoutMsWithUpdates() throws Exception {
- final RPCCommunicator communicator = new RPCCommunicator(null /* Timer */, INDEX, 1, 1, 100, 0);
+ final RPCCommunicator communicator = new RPCCommunicator(RPCCommunicator.createRealSupervisor(), null /* Timer */, INDEX, 1, 1, 100, 0);
FleetControllerOptions fleetControllerOptions = new FleetControllerOptions(null /*clustername*/);
fleetControllerOptions.nodeStateRequestTimeoutEarliestPercentage = 100;
fleetControllerOptions.nodeStateRequestTimeoutLatestPercentage = 100;
@@ -73,6 +74,7 @@ public class RPCCommunicatorTest {
public void testRoundtripLatency() throws Exception {
final Timer timer = new FakeTimer();
final RPCCommunicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
INDEX,
NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS,
@@ -91,4 +93,77 @@ public class RPCCommunicatorTest {
eq(ROUNDTRIP_LATENCY_SECONDS + NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS/1000.0),
(RequestWaiter)any());
}
+
+ private static class Fixture {
+ final Supervisor mockSupervisor = mock(Supervisor.class);
+ final Target mockTarget = mock(Target.class);
+ final Timer timer = new FakeTimer();
+ final RPCCommunicator communicator;
+ final AtomicReference<Request> receivedRequest = new AtomicReference<>();
+ final AtomicReference<RequestWaiter> receivedWaiter = new AtomicReference<>();
+ @SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics
+ final Communicator.Waiter<SetClusterStateRequest> mockWaiter = mock(Communicator.Waiter.class);
+
+ Fixture() {
+ communicator = new RPCCommunicator(
+ mockSupervisor,
+ timer,
+ INDEX,
+ NODE_STATE_REQUEST_TIMEOUT_INTERVAL_MAX_MS,
+ NODE_STATE_REQUEST_TIMEOUT_INTERVAL_STOP_PERCENTAGE,
+ 100,
+ ROUNDTRIP_LATENCY_SECONDS);
+
+ when(mockSupervisor.connect(any())).thenReturn(mockTarget);
+ when(mockTarget.isValid()).thenReturn(true);
+ doAnswer((invocation) -> {
+ receivedRequest.set((Request) invocation.getArguments()[0]);
+ receivedWaiter.set((RequestWaiter) invocation.getArguments()[2]);
+ return null;
+ }).when(mockTarget).invokeAsync(any(), anyDouble(), any());
+ }
+ }
+
+ @Test
+ public void setSystemState_v3_sends_distribution_states_rpc() {
+ Fixture f = new Fixture();
+ ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3");
+ f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
+
+ Request req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+ assertThat(req.methodName(), equalTo(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME));
+ assertTrue(req.parameters().satisfies("bix")); // <compression type>, <uncompressed size>, <payload>
+
+ ClusterStateBundle receivedBundle = RPCUtil.decodeStateBundleFromSetDistributionStatesRequest(req);
+ assertThat(receivedBundle, equalTo(sentBundle));
+ }
+
+ @Test
+ public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() {
+ Fixture f = new Fixture();
+ ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses();
+ ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3");
+ f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
+
+ RequestWaiter waiter = f.receivedWaiter.get();
+ assertThat(waiter, notNullValue());
+ Request req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+
+ req.setError(ErrorCode.NO_SUCH_METHOD, "que?");
+ waiter.handleRequestDone(req);
+
+ // This would normally be done in processResponses(), but that code path is not invoked in this test.
+ cf.cluster().getNodeInfo(Node.ofStorage(1)).setSystemStateVersionAcknowledged(123, false);
+
+ f.receivedRequest.set(null);
+ // Now when we try again, we should have been downgraded to the legacy setsystemstate2 RPC
+ f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter);
+ req = f.receivedRequest.get();
+ assertThat(req, notNullValue());
+ assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME));
+ }
+
}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java
new file mode 100644
index 00000000000..2fdf6b5c8a8
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCUtil.java
@@ -0,0 +1,21 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.jrt.Request;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+public class RPCUtil {
+
+ public static ClusterStateBundle decodeStateBundleFromSetDistributionStatesRequest(Request req) {
+ final CompressionType type = CompressionType.valueOf(req.parameters().get(0).asInt8());
+ final int uncompressedSize = req.parameters().get(1).asInt32();
+ final byte[] compressedPayload = req.parameters().get(2).asData();
+
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ Compressor.Compression compression = new Compressor.Compression(type, uncompressedSize, compressedPayload);
+ return codec.decode(EncodedClusterStateBundle.fromCompressionBuffer(compression));
+ }
+
+}
diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
new file mode 100644
index 00000000000..f2fe494ce61
--- /dev/null
+++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodecTest.java
@@ -0,0 +1,56 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundleUtil;
+import com.yahoo.vespa.clustercontroller.core.StateMapping;
+import org.junit.Test;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertThat;
+
+public class SlimeClusterStateBundleCodecTest {
+
+ private static ClusterStateBundle roundtripEncode(ClusterStateBundle stateBundle) {
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encoded = codec.encode(stateBundle);
+ return codec.decode(encoded);
+ }
+
+ @Test
+ public void baseline_only_bundle_can_be_round_trip_encoded() {
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2");
+ assertThat(roundtripEncode(stateBundle), equalTo(stateBundle));
+ }
+
+ @Test
+ public void multi_space_state_bundle_can_be_round_trip_encoded() {
+ ClusterStateBundle stateBundle = ClusterStateBundleUtil.makeBundle("distributor:2 storage:2",
+ StateMapping.of("default", "distributor:2 storage:2 .0.s:d"),
+ StateMapping.of("upsidedown", "distributor:2 .0.s:d storage:2"));
+ assertThat(roundtripEncode(stateBundle), equalTo(stateBundle));
+ }
+
+ private static ClusterStateBundle makeCompressableBundle() {
+ StringBuilder allDownStates = new StringBuilder(2048);
+ for (int i = 0; i < 99; ++i) {
+ allDownStates.append(" .").append(i).append(".s:d");
+ }
+ // Exact same state set string repeated twice; perfect compression fodder.
+ return ClusterStateBundleUtil.makeBundle(String.format("distributor:100%s storage:100%s",
+ allDownStates.toString(), allDownStates.toString()));
+ }
+
+ @Test
+ public void encoded_cluster_states_can_be_compressed() {
+ ClusterStateBundle stateBundle = makeCompressableBundle();
+
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encoded = codec.encode(stateBundle);
+ assertThat(encoded.getCompression().data().length, lessThan(stateBundle.getBaselineClusterState().toString().length()));
+ ClusterStateBundle decodedBundle = codec.decode(encoded);
+ assertThat(decodedBundle, equalTo(stateBundle));
+ }
+
+}