aboutsummaryrefslogtreecommitdiffstats
path: root/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
diff options
context:
space:
mode:
Diffstat (limited to 'clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java')
-rw-r--r--clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java34
1 files changed, 29 insertions, 5 deletions
diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
index 0cc5ef36e3c..8941b2e32ec 100644
--- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
+++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java
@@ -25,6 +25,12 @@ public class RPCCommunicator implements Communicator {
public static final Logger log = Logger.getLogger(RPCCommunicator.class.getName());
+ public static final int SET_DISTRIBUTION_STATES_RPC_VERSION = 3;
+ public static final String SET_DISTRIBUTION_STATES_RPC_METHOD_NAME = "setdistributionstates";
+
+ public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2;
+ public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2";
+
private final Timer timer;
private final Supervisor supervisor;
private double nodeStateRequestTimeoutIntervalMaxSeconds;
@@ -33,7 +39,12 @@ public class RPCCommunicator implements Communicator {
private int nodeStateRequestRoundTripTimeMaxSeconds;
private final int fleetControllerIndex;
+ public static Supervisor createRealSupervisor() {
+ return new Supervisor(new Transport());
+ }
+
public RPCCommunicator(
+ final Supervisor supervisor,
final Timer t,
final int index,
final int nodeStateRequestTimeoutIntervalMaxMs,
@@ -52,7 +63,7 @@ public class RPCCommunicator implements Communicator {
this.nodeStateRequestTimeoutIntervalStartPercentage = nodeStateRequestTimeoutIntervalStartPercentage;
this.nodeStateRequestTimeoutIntervalStopPercentage = nodeStateRequestTimeoutIntervalStopPercentage;
this.nodeStateRequestRoundTripTimeMaxSeconds = nodeStateRequestRoundTripTimeMaxSeconds;
- this.supervisor = new Supervisor(new Transport());
+ this.supervisor = supervisor;
}
public void shutdown() {
@@ -126,6 +137,7 @@ public class RPCCommunicator implements Communicator {
if ( ! connection.isValid()) {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
}
+ // TODO remove this deprecated legacy stuff
if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
@@ -166,19 +178,31 @@ public class RPCCommunicator implements Communicator {
log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created.");
return;
}
- if (node.getVersion() == 0 && node.getConnectionVersion() > 0) {
+ final int nodeVersion = node.getVersion();
+ // TODO remove this deprecated legacy stuff
+ if (nodeVersion == 0 && node.getConnectionVersion() > 0) {
doVersion0HandShake(connection, node);
clearOldStoredNodeState(connection, node);
}
Request req;
- if (node.getVersion() == 0) {
+ if (nodeVersion == 0) {
req = new Request("setsystemstate");
req.parameters().add(new StringValue(baselineState.toString(true)));
- } else {
- req = new Request("setsystemstate2");
+ } else if (nodeVersion <= 2) {
+ req = new Request(LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME);
req.parameters().add(new StringValue(baselineState.toString(false)));
+ } else {
+ req = new Request(SET_DISTRIBUTION_STATES_RPC_METHOD_NAME);
+ SlimeClusterStateBundleCodec codec = new SlimeClusterStateBundleCodec();
+ EncodedClusterStateBundle encodedBundle = codec.encode(stateBundle);
+ Values v = req.parameters();
+ v.add(new Int8Value(encodedBundle.getCompression().type().getCode()));
+ v.add(new Int32Value(encodedBundle.getCompression().uncompressedSize()));
+ v.add(new DataValue(encodedBundle.getCompression().data()));
}
+ log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d",
+ req.methodName(), node.getRpcAddress(), stateBundle.getVersion()));
RPCSetClusterStateRequest stateRequest = new RPCSetClusterStateRequest(node, req, baselineState.getVersion());
waiter.setRequest(stateRequest);