diff options
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java')
-rw-r--r-- | clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java | 34 |
1 files changed, 29 insertions, 5 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index 0cc5ef36e3c..8941b2e32ec 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator { public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName()); + public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3; + public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates"; + + public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2; + public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2"; + private final Timer timer; private final Supervisor supervisor; private double nodeStateRequestTimeoutIntervalMaxSeconds; @@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator { private int nodeStateRequestRoundTripTimeMaxSeconds; private final int fleetControllerIndex; + public static Supervisor createRealSupervisor() { + return new Supervisor(new Transport()); + } + public RPCCommunicator( + final Supervisor supervisor, final Timer t, final int index, final int nodeStateRequestTimeoutIntervalMaxMs, @@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator { this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage; this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage; this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds; - this.supervisor = new Supervisor(new Transport()); + this.supervisor = supervisor; } public void shutdown() { @@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator { if ( ! connection.isValid()) { log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); } + // TODO remove this deprecated legacy stuff if (node.getVersion() == 0 && node.getConnectionVersion() > 0) { doVersion0HandShake(connection, node); clearOldStoredNodeState(connection, node); @@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator { log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); return; } - if (node.getVersion() == 0 && node.getConnectionVersion() > 0) { + final int nodeVersion = node.getVersion(); + // TODO remove this deprecated legacy stuff + if (nodeVersion == 0 && node.getConnectionVersion() > 0) { doVersion0HandShake(connection, node); clearOldStoredNodeState(connection, node); } Request req; - if (node.getVersion() == 0) { + if (nodeVersion == 0) { req = new Request("setsystemstate"); req.parameters().add(new StringValue(baselineState.toString(true))); - } else { - req = new Request("setsystemstate2"); + } else if (nodeVersion <= 2) { + req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME); req.parameters().add(new StringValue(baselineState.toString(false))); + } else { + req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME); + SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec(); + EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle); + Values v = req.parameters(); + v.add(new Int8Value(encodedBundle.getCompression().type().getCode())); + v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize())); + v.add(new DataValue(encodedBundle.getCompression().data())); } + log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d", + req.methodName(), node.getRpcAddress(), stateBundle.getVersion())); RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion()); waiter.setRequest(stateRequest); |