aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-02-23 15:41:26 +0100
committerTor Brede Vekterli <vekterli@oath.com>2018-02-27 16:05:50 +0100
commit9063ffeccfa036493390bb09741448c232151cdf (patch)
tree10460a25295206050f42fc97f01a8a14966c409f /clustercontroller-core/src/main/java/com/yahoo
parent86029a0a0eefacc98b5c86cd8921eed98f4882e6 (diff)
Add v3 RPC for sending cluster states to content nodes
Supports sending states for multiple bucket spaces at once, with possibilities for extending protocol later for sending distribution config etc. Payload is LZ4-compressed when possible. If a receiver node does not understand the v3 protocol, the protocol version is transparently downgraded to legacy v2 RPC.
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java1
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java16
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java12
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java22
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java50
8 files changed, 132 insertions, 10 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index e100fea780a..cd3007ef3d5 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -178,6 +178,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log);
Communicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
options.fleetControllerIndex,
options.nodeStateRequestTimeoutMS,
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 15650f0f4aa..bcef7709155 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.distribution.Group;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -205,7 +206,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
}
public int getLatestVersion() {
- return 2;
+ return RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
}
public String getSlobrokAddress() {
@@ -384,7 +385,18 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 1 and was recently adjusted, so ignoring it.");
return true;
}
- } else if (methodName.equals("getnodestate2") || methodName.equals("setsystemstate2")) {
+ } else if (methodName.equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)) {
+ if (version == RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) {
+ log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Downgrading version to 2 (setsystemstate2).");
+ version = RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_VERSION;
+ nextAttemptTime = 0;
+ adjustedVersionTime = timer.getCurrentTimeInMillis();
+ return true;
+ } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
+ log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Version already downgraded, so ignoring it.");
+ return true;
+ }
+ } else if (methodName.equals("getnodestate2") || methodName.equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (version > 0) {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 0.");
version = 0;
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 45a14ee19cd..b0869b560f6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -59,8 +59,8 @@ public class SystemStateBroadcaster {
int version = req.getSystemStateVersion();
if (req.getReply().isError()) {
+ info.setSystemStateVersionAcknowledged(version, false);
if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
- info.setSystemStateVersionAcknowledged(version, false);
if (info.getNewestSystemStateVersionSent() == version) {
reportNodeError(nodeOk, info,
"Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage()
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
new file mode 100644
index 00000000000..4200aec55a9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
@@ -0,0 +1,12 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+public interface ClusterStateBundleCodec {
+
+ EncodedClusterStateBundle encode(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
new file mode 100644
index 00000000000..d502c5ce598
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
@@ -0,0 +1,22 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.Compressor;
+
+public class EncodedClusterStateBundle {
+
+ private final Compressor.Compression compression;
+
+ private EncodedClusterStateBundle(Compressor.Compression compression) {
+ this.compression = compression;
+ }
+
+ public static EncodedClusterStateBundle fromCompressionBuffer(Compressor.Compression compression) {
+ return new EncodedClusterStateBundle(compression);
+ }
+
+ public Compressor.Compression getCompression() {
+ return compression;
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 0cc5ef36e3c..8941b2e32ec 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator {
public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName());
+ public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3;
+ public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates";
+
+ public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
+ public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
+
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator {
private int nodeStateRequestRoundTripTimeMaxSeconds;
private final int fleetControllerIndex;
+ public static Supervisor createRealSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
public RPCCommunicator(
+ final Supervisor supervisor,
final Timer t,
final int index,
final int nodeStateRequestTimeoutIntervalMaxMs,
@@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator {
this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage;
this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage;
this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds;
- this.supervisor = new Supervisor(new Transport());
+ this.supervisor = supervisor;
}
public void shutdown() {
@@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator {
if ( ! connection.isValid()) {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
}
+ // TODO remove this deprecated legacy stuff
if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
@@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
return;
}
- if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
+ final int nodeVersion = node.getVersion();
+ // TODO remove this deprecated legacy stuff
+ if (nodeVersion == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
}
Request req;
- if (node.getVersion() == 0) {
+ if (nodeVersion == 0) {
req = new Request("setsystemstate");
req.parameters().add(new StringValue(baselineState.toString(true)));
- } else {
- req = new Request("setsystemstate2");
+ } else if (nodeVersion <= 2) {
+ req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME);
req.parameters().add(new StringValue(baselineState.toString(false)));
+ } else {
+ req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME);
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle);
+ Values v = req.parameters();
+ v.add(new Int8Value(encodedBundle.getCompression().type().getCode()));
+ v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize()));
+ v.add(new DataValue(encodedBundle.getCompression().data()));
}
+ log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d",
+ req.methodName(), node.getRpcAddress(), stateBundle.getVersion()));
RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
index 39506ddf7fa..96a776a65b8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
@@ -30,7 +30,8 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
public SetClusterStateRequest.Reply getReply(Request req) {
NodeInfo info = request.getNodeInfo();
- if (req.methodName().equals("setsystemstate2")) {
+ if (req.methodName().equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)
+ || req.methodName().equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (req.isError() && req.errorCode() == ErrorCode.NO_SUCH_METHOD) {
if (info.notifyNoSuchMethodError(req.methodName(), timer)) {
return new SetClusterStateRequest.Reply(Communicator.TRANSIENT_ERROR, "Trying lower version");
@@ -39,7 +40,7 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
if (req.isError()) {
return new SetClusterStateRequest.Reply(req.errorCode(), req.errorMessage());
} else if (!req.checkReturnTypes("")) {
- return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got setsystemstate2 response with invalid return types from " + info);
+ return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
} else {
return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Unknown method " + req.methodName());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
new file mode 100644
index 00000000000..eb6c2280491
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -0,0 +1,50 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.slime.*;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+
+ private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
+
+ @Override
+ public EncodedClusterStateBundle encode(ClusterStateBundle stateBundle) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ Cursor states = root.setObject("states");
+ // TODO add another function that is not toString for this..!
+ states.setString("baseline", stateBundle.getBaselineClusterState().toString());
+ Cursor spaces = states.setObject("spaces");
+ stateBundle.getDerivedBucketSpaceStates().entrySet()
+ .forEach(entry -> spaces.setString(entry.getKey(), entry.getValue().toString()));
+
+ byte[] serialized = BinaryFormat.encode(slime);
+ Compressor.Compression compression = compressor.compress(serialized);
+ return EncodedClusterStateBundle.fromCompressionBuffer(compression);
+ }
+
+ @Override
+ public ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle) {
+ byte[] uncompressed = compressor.decompress(encodedClusterStateBundle.getCompression());
+ Slime slime = BinaryFormat.decode(uncompressed);
+ Inspector root = slime.get();
+ Inspector states = root.field("states");
+ ClusterState baseline = ClusterState.stateFromString(states.field("baseline").asString());
+
+ Inspector spaces = states.field("spaces");
+ Map<String, ClusterState> derivedStates = new HashMap<>();
+ spaces.traverse(((ObjectTraverser)(key, value) -> {
+ derivedStates.put(key, ClusterState.stateFromString(value.asString()));
+ }));
+
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
+ }
+}