diff options
Diffstat (limited to 'clustercontroller-core/src/main/java')
8 files changed, 163 insertions, 18 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java index e755c2b8117..5c345b6f8a0 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/FleetController.java @@ -178,6 +178,7 @@ public class FleetController implements NodeStateOrHostInfoChangeHandler, NodeAd options.minRatioOfStorageNodesUp); NodeStateGatherer stateGatherer = new NodeStateGatherer(timer, timer, log); Communicator communicator = new RPCCommunicator( + RPCCommunicator.createRealSupervisor(), timer, options.fleetControllerIndex, options.nodeStateRequestTimeoutMS, diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java index 15650f0f4aa..55331e4bbed 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/NodeInfo.java @@ -8,6 +8,7 @@ import com.yahoo.vdslib.distribution.Distribution; import com.yahoo.vdslib.distribution.Group; import com.yahoo.vdslib.state.*; import com.yahoo.vespa.clustercontroller.core.hostinfo.HostInfo; +import com.yahoo.vespa.clustercontroller.core.rpc.RPCCommunicator; import java.io.PrintWriter; import java.io.StringWriter; @@ -205,7 +206,7 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { } public int getLatestVersion() { - return 2; + return RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION; } public String getSlobrokAddress() { @@ -375,21 +376,23 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { public boolean notifyNoSuchMethodError(String methodName, Timer timer) { if (methodName.equals("getnodestate3")) { if (version > 1) { - log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 1."); - version = 1; - nextAttemptTime = 0; - adjustedVersionTime = timer.getCurrentTimeInMillis(); + downgradeToRpcVersion(1, methodName, timer); return true; } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) { log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 1 and was recently adjusted, so ignoring it."); return true; } - } else if (methodName.equals("getnodestate2") || methodName.equals("setsystemstate2")) { + } else if (methodName.equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME)) { + if (version == RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_VERSION) { + downgradeToRpcVersion(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_VERSION, methodName, timer); + return true; + } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) { + log.log(LogLevel.DEBUG, () -> "Node " + toString() + " does not support " + methodName + " call. Version already downgraded, so ignoring it."); + return true; + } + } else if (methodName.equals("getnodestate2") || methodName.equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) { if (version > 0) { - log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Setting version to 0."); - version = 0; - nextAttemptTime = 0; - adjustedVersionTime = timer.getCurrentTimeInMillis(); + downgradeToRpcVersion(0, methodName, timer); return true; } else if (timer.getCurrentTimeInMillis() - 2000 < adjustedVersionTime) { log.log(LogLevel.DEBUG, "Node " + toString() + " does not support " + methodName + " call. Version already at 0 and was recently adjusted, so ignoring it."); @@ -400,6 +403,14 @@ abstract public class NodeInfo implements Comparable<NodeInfo> { return false; } + private void downgradeToRpcVersion(int newVersion, String methodName, Timer timer) { + log.log(LogLevel.DEBUG, () -> String.format("Node %s does not support %s call. Downgrading to version %d.", + toString(), methodName, newVersion)); + version = newVersion; + nextAttemptTime = 0; + adjustedVersionTime = timer.getCurrentTimeInMillis(); + } + public Target getConnection() { return connection; } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 45a14ee19cd..b0869b560f6 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -59,8 +59,8 @@ public class SystemStateBroadcaster { int version = req.getSystemStateVersion(); if (req.getReply().isError()) { + info.setSystemStateVersionAcknowledged(version, false); if (req.getReply().getReturnCode() != Communicator.TRANSIENT_ERROR) { - info.setSystemStateVersionAcknowledged(version, false); if (info.getNewestSystemStateVersionSent() == version) { reportNodeError(nodeOk, info, "Got error response " + req.getReply().getReturnCode() + ": " + req.getReply().getReturnMessage() diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java new file mode 100644 index 00000000000..250d7bbe46a --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/ClusterStateBundleCodec.java @@ -0,0 +1,21 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +/** + * Provides opaque encoding and decoding of ClusterStateBundles for transmission over RPC. + * + * Implementations may choose to compress the encoded representation of the bundle. + * + * It is important that the input given to decode() is exactly equal to that given from + * encode() for the results to be correct. Implementations must ensure that this information + * is enough to losslessly reconstruct the full encoded ClusterStateBundle. + */ +public interface ClusterStateBundleCodec { + + EncodedClusterStateBundle encode(ClusterStateBundle stateBundle); + + ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle); + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java new file mode 100644 index 00000000000..784c8c23c87 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/EncodedClusterStateBundle.java @@ -0,0 +1,29 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.compress.Compressor; + +/** + * Contains an opaque encoded (possibly compressed) representation of a ClusterStateBundle. + * + * This bundle can in turn be sent over the wire or serialized by ensuring that all components + * of the Compressor.Compression state can be reconstructed by the receiver. In practice this + * means sending the Compression's <em>type</em>, <em>uncompressedSize</em> and <em>data</em>. + */ +public class EncodedClusterStateBundle { + + private final Compressor.Compression compression; + + private EncodedClusterStateBundle(Compressor.Compression compression) { + this.compression = compression; + } + + public static EncodedClusterStateBundle fromCompressionBuffer(Compressor.Compression compression) { + return new EncodedClusterStateBundle(compression); + } + + public Compressor.Compression getCompression() { + return compression; + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index 0cc5ef36e3c..8941b2e32ec 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator { public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName()); + public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3; + public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates"; + + public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2; + public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2"; + private final Timer timer; private final Supervisor supervisor; private double nodeStateRequestTimeoutIntervalMaxSeconds; @@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator { private int nodeStateRequestRoundTripTimeMaxSeconds; private final int fleetControllerIndex; + public static Supervisor createRealSupervisor() { + return new Supervisor(new Transport()); + } + public RPCCommunicator( + final Supervisor supervisor, final Timer t, final int index, final int nodeStateRequestTimeoutIntervalMaxMs, @@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator { this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage; this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage; this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds; - this.supervisor = new Supervisor(new Transport()); + this.supervisor = supervisor; } public void shutdown() { @@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator { if ( ! connection.isValid()) { log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); } + // TODO remove this deprecated legacy stuff if (node.getVersion() == 0 && node.getConnectionVersion() > 0) { doVersion0HandShake(connection, node); clearOldStoredNodeState(connection, node); @@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator { log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); return; } - if (node.getVersion() == 0 && node.getConnectionVersion() > 0) { + final int nodeVersion = node.getVersion(); + // TODO remove this deprecated legacy stuff + if (nodeVersion == 0 && node.getConnectionVersion() > 0) { doVersion0HandShake(connection, node); clearOldStoredNodeState(connection, node); } Request req; - if (node.getVersion() == 0) { + if (nodeVersion == 0) { req = new Request("setsystemstate"); req.parameters().add(new StringValue(baselineState.toString(true))); - } else { - req = new Request("setsystemstate2"); + } else if (nodeVersion <= 2) { + req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME); req.parameters().add(new StringValue(baselineState.toString(false))); + } else { + req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME); + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle); + Values v = req.parameters(); + v.add(new Int8Value(encodedBundle.getCompression().type().getCode())); + v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize())); + v.add(new DataValue(encodedBundle.getCompression().data())); } + log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d", + req.methodName(), node.getRpcAddress(), stateBundle.getVersion())); RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion()); waiter.setRequest(stateRequest); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java index 39506ddf7fa..96a776a65b8 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCSetClusterStateWaiter.java @@ -30,7 +30,8 @@ public class RPCSetClusterStateWaiter implements RequestWaiter { public SetClusterStateRequest.Reply getReply(Request req) { NodeInfo info = request.getNodeInfo(); - if (req.methodName().equals("setsystemstate2")) { + if (req.methodName().equals(RPCCommunicator.SET_DISTRIBUTION_STATES_RPC_METHOD_NAME) + || req.methodName().equals(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)) { if (req.isError() && req.errorCode() == ErrorCode.NO_SUCH_METHOD) { if (info.notifyNoSuchMethodError(req.methodName(), timer)) { return new SetClusterStateRequest.Reply(Communicator.TRANSIENT_ERROR, "Trying lower version"); @@ -39,7 +40,7 @@ public class RPCSetClusterStateWaiter implements RequestWaiter { if (req.isError()) { return new SetClusterStateRequest.Reply(req.errorCode(), req.errorMessage()); } else if (!req.checkReturnTypes("")) { - return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got setsystemstate2 response with invalid return types from " + info); + return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info); } } else { return new SetClusterStateRequest.Reply(ErrorCode.BAD_REPLY, "Unknown method " + req.methodName()); diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java new file mode 100644 index 00000000000..c37bd8313a9 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/SlimeClusterStateBundleCodec.java @@ -0,0 +1,58 @@ +// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.compress.CompressionType; +import com.yahoo.compress.Compressor; +import com.yahoo.slime.*; +import com.yahoo.vdslib.state.ClusterState; +import com.yahoo.vespa.clustercontroller.core.AnnotatedClusterState; +import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; + +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of ClusterStateBundleCodec which uses structured Slime binary encoding + * to implement (de-)serialization of ClusterStateBundle instances. Encoding format is + * intentionally extensible so that we may add other information to it later. + * + * LZ4 compression is transparently applied during encoding and decompression is + * subsequently applied during decoding. + */ +public class SlimeClusterStateBundleCodec implements ClusterStateBundleCodec { + + private static final Compressor compressor = new Compressor(CompressionType.LZ4, 3, 0.90, 1024); + + @Override + public EncodedClusterStateBundle encode(ClusterStateBundle stateBundle) { + Slime slime = new Slime(); + Cursor root = slime.setObject(); + Cursor states = root.setObject("states"); + // TODO add another function that is not toString for this..! + states.setString("baseline", stateBundle.getBaselineClusterState().toString()); + Cursor spaces = states.setObject("spaces"); + stateBundle.getDerivedBucketSpaceStates().entrySet() + .forEach(entry -> spaces.setString(entry.getKey(), entry.getValue().toString())); + + byte[] serialized = BinaryFormat.encode(slime); + Compressor.Compression compression = compressor.compress(serialized); + return EncodedClusterStateBundle.fromCompressionBuffer(compression); + } + + @Override + public ClusterStateBundle decode(EncodedClusterStateBundle encodedClusterStateBundle) { + byte[] uncompressed = compressor.decompress(encodedClusterStateBundle.getCompression()); + Slime slime = BinaryFormat.decode(uncompressed); + Inspector root = slime.get(); + Inspector states = root.field("states"); + ClusterState baseline = ClusterState.stateFromString(states.field("baseline").asString()); + + Inspector spaces = states.field("spaces"); + Map<String, ClusterState> derivedStates = new HashMap<>(); + spaces.traverse(((ObjectTraverser)(key, value) -> { + derivedStates.put(key, ClusterState.stateFromString(value.asString())); + })); + + return ClusterStateBundle.of(AnnotatedClusterState.withoutAnnotations(baseline), derivedStates); + } +} |