summaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main/java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java1
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java31
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java2
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java21
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java29
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java34
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java5
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java58
8 files changed, 163 insertions, 18 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
index e755c2b8117..5c345b6f8a0 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java
@@ -178,6 +178,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd
options.minRatioOfStorageNodesUp);
NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log);
Communicator communicator = new RPCCommunicator(
+ RPCCommunicator.createRealSupervisor(),
timer,
options.fleetControllerIndex,
options.nodeStateRequestTimeoutMS,
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
index 15650f0f4aa..55331e4bbed 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java
@@ -8,6 +8,7 @@ import com.yahoo.vdslib.distribution.Distribution;
import com.yahoo.vdslib.distribution.Group;
import com.yahoo.vdslib.state.*;
import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo;
+import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator;
import java.io.PrintWriter;
import java.io.StringWriter;
@@ -205,7 +206,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
}
public int getLatestVersion() {
- return 2;
+ return RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION;
}
public String getSlobrokAddress() {
@@ -375,21 +376,23 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
public boolean notifyNoSuchMethodError(String methodName, Timer timer) {
if (methodName.equals("getnodestate3")) {
if (version > 1) {
- log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 1.");
- version = 1;
- nextAttemptTime = 0;
- adjustedVersionTime = timer.getCurrentTimeInMillis();
+ downgradeToRpcVersion(1, methodName, timer);
return true;
} else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 1 and was recently adjusted, so ignoring it.");
return true;
}
- } else if (methodName.equals("getnodestate2") || methodName.equals("setsystemstate2")) {
+ } else if (methodName.equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)) {
+ if (version == RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) {
+ downgradeToRpcVersion(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_VERSION, methodName, timer);
+ return true;
+ } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
+ log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Version already downgraded, so ignoring it.");
+ return true;
+ }
+ } else if (methodName.equals("getnodestate2") || methodName.equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (version > 0) {
- log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 0.");
- version = 0;
- nextAttemptTime = 0;
- adjustedVersionTime = timer.getCurrentTimeInMillis();
+ downgradeToRpcVersion(0, methodName, timer);
return true;
} else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) {
log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 0 and was recently adjusted, so ignoring it.");
@@ -400,6 +403,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> {
return false;
}
+ private void downgradeToRpcVersion(int newVersion, String methodName, Timer timer) {
+ log.log(LogLevel.DEBUG, () -> String.format("Node %s does not support %s call. Downgrading to version %d.",
+ toString(), methodName, newVersion));
+ version = newVersion;
+ nextAttemptTime = 0;
+ adjustedVersionTime = timer.getCurrentTimeInMillis();
+ }
+
public Target getConnection() {
return connection;
}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
index 45a14ee19cd..b0869b560f6 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java
@@ -59,8 +59,8 @@ public class SystemStateBroadcaster {
int version = req.getSystemStateVersion();
if (req.getReply().isError()) {
+ info.setSystemStateVersionAcknowledged(version, false);
if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) {
- info.setSystemStateVersionAcknowledged(version, false);
if (info.getNewestSystemStateVersionSent() == version) {
reportNodeError(nodeOk, info,
"Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage()
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
new file mode 100644
index 00000000000..250d7bbe46a
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java
@@ -0,0 +1,21 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+/**
+ * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC.
+ *
+ * Implementations may choose to compress the encoded representation of the bundle.
+ *
+ * It is important that the input given to decode() is exactly equal to that given from
+ * encode() for the results to be correct. Implementations must ensure that this information
+ * is enough to losslessly reconstruct the full encoded ClusterStateBundle.
+ */
+public interface ClusterStateBundleCodec {
+
+ EncodedClusterStateBundle encode(ClusterStateBundle stateBundle);
+
+ ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle);
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
new file mode 100644
index 00000000000..784c8c23c87
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java
@@ -0,0 +1,29 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.Compressor;
+
+/**
+ * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle.
+ *
+ * This bundle can in turn be sent over the wire or serialized by ensuring that all components
+ * of the Compressor.Compression state can be reconstructed by the receiver. In practice this
+ * means sending the Compression's <em>type</em>, <em>uncompressedSize</em> and <em>data</em>.
+ */
+public class EncodedClusterStateBundle {
+
+ private final Compressor.Compression compression;
+
+ private EncodedClusterStateBundle(Compressor.Compression compression) {
+ this.compression = compression;
+ }
+
+ public static EncodedClusterStateBundle fromCompressionBuffer(Compressor.Compression compression) {
+ return new EncodedClusterStateBundle(compression);
+ }
+
+ public Compressor.Compression getCompression() {
+ return compression;
+ }
+
+}
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 0cc5ef36e3c..8941b2e32ec 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator {
public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName());
+ public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3;
+ public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates";
+
+ public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
+ public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
+
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator {
private int nodeStateRequestRoundTripTimeMaxSeconds;
private final int fleetControllerIndex;
+ public static Supervisor createRealSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
public RPCCommunicator(
+ final Supervisor supervisor,
final Timer t,
final int index,
final int nodeStateRequestTimeoutIntervalMaxMs,
@@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator {
this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage;
this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage;
this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds;
- this.supervisor = new Supervisor(new Transport());
+ this.supervisor = supervisor;
}
public void shutdown() {
@@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator {
if ( ! connection.isValid()) {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
}
+ // TODO remove this deprecated legacy stuff
if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
@@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
return;
}
- if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
+ final int nodeVersion = node.getVersion();
+ // TODO remove this deprecated legacy stuff
+ if (nodeVersion == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
}
Request req;
- if (node.getVersion() == 0) {
+ if (nodeVersion == 0) {
req = new Request("setsystemstate");
req.parameters().add(new StringValue(baselineState.toString(true)));
- } else {
- req = new Request("setsystemstate2");
+ } else if (nodeVersion <= 2) {
+ req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME);
req.parameters().add(new StringValue(baselineState.toString(false)));
+ } else {
+ req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME);
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle);
+ Values v = req.parameters();
+ v.add(new Int8Value(encodedBundle.getCompression().type().getCode()));
+ v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize()));
+ v.add(new DataValue(encodedBundle.getCompression().data()));
}
+ log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d",
+ req.methodName(), node.getRpcAddress(), stateBundle.getVersion()));
RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
index 39506ddf7fa..96a776a65b8 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java
@@ -30,7 +30,8 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
public SetClusterStateRequest.Reply getReply(Request req) {
NodeInfo info = request.getNodeInfo();
- if (req.methodName().equals("setsystemstate2")) {
+ if (req.methodName().equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)
+ || req.methodName().equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) {
if (req.isError() && req.errorCode() == ErrorCode.NO_SUCH_METHOD) {
if (info.notifyNoSuchMethodError(req.methodName(), timer)) {
return new SetClusterStateRequest.Reply(Communicator.TRANSIENT_ERROR, "Trying lower version");
@@ -39,7 +40,7 @@ public class RPCSetClusterStateWaiter implements RequestWaiter {
if (req.isError()) {
return new SetClusterStateRequest.Reply(req.errorCode(), req.errorMessage());
} else if (!req.checkReturnTypes("")) {
- return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got setsystemstate2 response with invalid return types from " + info);
+ return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info);
}
} else {
return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Unknown method " + req.methodName());
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
new file mode 100644
index 00000000000..c37bd8313a9
--- /dev/null
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java
@@ -0,0 +1,58 @@
+// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.vespa.clustercontroller.core.rpc;
+
+import com.yahoo.compress.CompressionType;
+import com.yahoo.compress.Compressor;
+import com.yahoo.slime.*;
+import com.yahoo.vdslib.state.ClusterState;
+import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState;
+import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding
+ * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is
+ * intentionally extensible so that we may add other information to it later.
+ *
+ * LZ4 compression is transparently applied during encoding and decompression is
+ * subsequently applied during decoding.
+ */
+public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec {
+
+ private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024);
+
+ @Override
+ public EncodedClusterStateBundle encode(ClusterStateBundle stateBundle) {
+ Slime slime = new Slime();
+ Cursor root = slime.setObject();
+ Cursor states = root.setObject("states");
+ // TODO add another function that is not toString for this..!
+ states.setString("baseline", stateBundle.getBaselineClusterState().toString());
+ Cursor spaces = states.setObject("spaces");
+ stateBundle.getDerivedBucketSpaceStates().entrySet()
+ .forEach(entry -> spaces.setString(entry.getKey(), entry.getValue().toString()));
+
+ byte[] serialized = BinaryFormat.encode(slime);
+ Compressor.Compression compression = compressor.compress(serialized);
+ return EncodedClusterStateBundle.fromCompressionBuffer(compression);
+ }
+
+ @Override
+ public ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle) {
+ byte[] uncompressed = compressor.decompress(encodedClusterStateBundle.getCompression());
+ Slime slime = BinaryFormat.decode(uncompressed);
+ Inspector root = slime.get();
+ Inspector states = root.field("states");
+ ClusterState baseline = ClusterState.stateFromString(states.field("baseline").asString());
+
+ Inspector spaces = states.field("spaces");
+ Map<String, ClusterState> derivedStates = new HashMap<>();
+ spaces.traverse(((ObjectTraverser)(key, value) -> {
+ derivedStates.put(key, ClusterState.stateFromString(value.asString()));
+ }));
+
+ return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates);
+ }
+}