From 05f27f6cfcda786232fa6da47154784dce2483e1 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 6 Mar 2019 14:23:53 +0100 Subject: Initial groundwork for cluster state version activation RPC --- .../core/ActivateClusterStateVersionRequest.java | 10 +++++ .../core/ClusterStateVersionSpecificRequest.java | 48 ++++++++++++++++++++++ .../vespa/clustercontroller/core/Communicator.java | 2 + .../core/SetClusterStateRequest.java | 44 ++------------------ .../core/SystemStateBroadcaster.java | 31 +++++++++----- .../rpc/RPCActivateClusterStateVersionRequest.java | 17 ++++++++ .../rpc/RPCActivateClusterStateVersionWaiter.java | 44 ++++++++++++++++++++ .../core/rpc/RPCCommunicator.java | 28 ++++++++++++- .../clustercontroller/core/DummyCommunicator.java | 15 +++++++ .../core/rpc/RPCCommunicatorTest.java | 29 +++++++++---- 10 files changed, 206 insertions(+), 62 deletions(-) create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java create mode 100644 clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java (limited to 'clustercontroller-core/src') diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java new file mode 100644 index 00000000000..b57004eedce --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ActivateClusterStateVersionRequest.java @@ -0,0 +1,10 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +public class ActivateClusterStateVersionRequest extends ClusterStateVersionSpecificRequest { + + public ActivateClusterStateVersionRequest(NodeInfo nodeInfo, int systemStateVersion) { + super(nodeInfo, systemStateVersion); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java new file mode 100644 index 00000000000..fda6ec19752 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/ClusterStateVersionSpecificRequest.java @@ -0,0 +1,48 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core; + +public abstract class ClusterStateVersionSpecificRequest { + + private final NodeInfo nodeInfo; + private final int clusterStateVersion; + private Reply reply; + + public ClusterStateVersionSpecificRequest(NodeInfo nodeInfo, int clusterStateVersion) { + this.nodeInfo = nodeInfo; + this.clusterStateVersion = clusterStateVersion; + } + + public NodeInfo getNodeInfo() { return nodeInfo; } + + public int getClusterStateVersion() { return clusterStateVersion; } + + public void setReply(Reply reply) { this.reply = reply; } + + public Reply getReply() { return reply; } + + public static class Reply { + + final int returnCode; + final String returnMessage; + + public Reply() { + this(0, null); + } + + public Reply(int returnCode, String returnMessage) { + this.returnCode = returnCode; + this.returnMessage = returnMessage; + } + + /** Returns whether this is an error response */ + public boolean isError() { return returnCode != 0; } + + /** Returns the return code, which is 0 if this request was successful */ + public int getReturnCode() { return returnCode; } + + /** Returns the message returned, or null if none */ + public String getReturnMessage() { return returnMessage; } + + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java index 450513343b0..900eee54cd3 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/Communicator.java @@ -20,6 +20,8 @@ public interface Communicator { void setSystemState(ClusterStateBundle states, NodeInfo node, Waiter waiter); + void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter waiter); + void shutdown(); } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java index 836876b5642..d4e79a4f2b2 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SetClusterStateRequest.java @@ -1,48 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.vespa.clustercontroller.core; -public abstract class SetClusterStateRequest { - - private final NodeInfo nodeInfo; - private final int systemStateVersion; - private Reply reply; - - public SetClusterStateRequest(NodeInfo nodeInfo, int systemStateVersion) { - this.nodeInfo = nodeInfo; - this.systemStateVersion = systemStateVersion; - } - - public NodeInfo getNodeInfo() { return nodeInfo; } - - public int getSystemStateVersion() { return systemStateVersion; } - - public void setReply(Reply reply) { this.reply = reply; } - - public Reply getReply() { return reply; } - - public static class Reply { - - final int returnCode; - final String returnMessage; - - public Reply() { - this(0, null); - } - - public Reply(int returnCode, String returnMessage) { - this.returnCode = returnCode; - this.returnMessage = returnMessage; - } - - /** Returns whether this is an error response */ - public boolean isError() { return returnCode != 0; } - - /** Returns the return code, which is 0 if this request was successful */ - public int getReturnCode() { return returnCode; } - - /** Returns the message returned, or null if none */ - public String getReturnMessage() { return returnMessage; } +public abstract class SetClusterStateRequest extends ClusterStateVersionSpecificRequest { + public SetClusterStateRequest(NodeInfo nodeInfo, int clusterStateVersion) { + super(nodeInfo, clusterStateVersion); } } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java index 3683fe342bc..a40a45fd48a 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/SystemStateBroadcaster.java @@ -19,13 +19,13 @@ public class SystemStateBroadcaster { private final Timer timer; private final Object monitor; private ClusterStateBundle clusterStateBundle; - private final List replies = new LinkedList<>(); + private final List setClusterStateReplies = new LinkedList<>(); private final static long minTimeBetweenNodeErrorLogging = 10 * 60 * 1000; private final Map lastErrorReported = new TreeMap<>(); private int lastClusterStateInSync = 0; - private final ClusterStateWaiter waiter = new ClusterStateWaiter(); + private final SetClusterStateWaiter setClusterStateWaiter = new SetClusterStateWaiter(); public SystemStateBroadcaster(Timer timer, Object monitor) { this.timer = timer; @@ -56,19 +56,21 @@ public class SystemStateBroadcaster { long time = timer.getCurrentTimeInMillis(); Long lastReported = lastErrorReported.get(info.getNode()); boolean alreadySeen = (lastReported != null && time - lastReported < minTimeBetweenNodeErrorLogging); - log.log(nodeOk && !alreadySeen ? LogLevel.WARNING : LogLevel.DEBUG, message); - if (!alreadySeen) lastErrorReported.put(info.getNode(), time); + log.log((nodeOk && !alreadySeen) ? LogLevel.WARNING : LogLevel.DEBUG, message); + if (!alreadySeen) { + lastErrorReported.put(info.getNode(), time); + } } public boolean processResponses() { boolean anyResponsesFound = false; synchronized(monitor) { - for(SetClusterStateRequest req : replies) { + for (SetClusterStateRequest req : setClusterStateReplies) { anyResponsesFound = true; NodeInfo info = req.getNodeInfo(); boolean nodeOk = info.getReportedState().getState().oneOf("uir"); - int version = req.getSystemStateVersion(); + int version = req.getClusterStateVersion(); if (req.getReply().isError()) { info.setSystemStateVersionAcknowledged(version, false); @@ -85,7 +87,7 @@ public class SystemStateBroadcaster { lastErrorReported.remove(info.getNode()); } } - replies.clear(); + setClusterStateReplies.clear(); } return anyResponsesFound; } @@ -159,11 +161,11 @@ public class SystemStateBroadcaster { ClusterStateBundle modifiedBundle = clusterStateBundle.cloneWithMapper(state -> buildModifiedClusterState(state, dbContext)); log.log(LogLevel.DEBUG, "Sending modified cluster state version " + baselineState.getVersion() + " to node " + node + ": " + modifiedBundle); - communicator.setSystemState(modifiedBundle, node, waiter); + communicator.setSystemState(modifiedBundle, node, setClusterStateWaiter); } else { log.log(LogLevel.DEBUG, "Sending system state version " + baselineState.getVersion() + " to node " + node + ". (went down time " + node.getWentDownWithStartTime() + ", node start time " + node.getStartTimestamp() + ")"); - communicator.setSystemState(clusterStateBundle, node, waiter); + communicator.setSystemState(clusterStateBundle, node, setClusterStateWaiter); } } @@ -188,13 +190,20 @@ public class SystemStateBroadcaster { return newState; } - private class ClusterStateWaiter implements Communicator.Waiter { + private class SetClusterStateWaiter implements Communicator.Waiter { @Override public void done(SetClusterStateRequest reply) { synchronized (monitor) { - replies.add(reply); + setClusterStateReplies.add(reply); } } } + private class ActivateClusterStateVersionWaiter implements Communicator.Waiter { + @Override + public void done(ActivateClusterStateVersionRequest reply) { + // TODO + } + } + } diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java new file mode 100644 index 00000000000..32d4046c181 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionRequest.java @@ -0,0 +1,17 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.jrt.Request; +import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest; +import com.yahoo.vespa.clustercontroller.core.NodeInfo; + +public class RPCActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { + + Request request; + + public RPCActivateClusterStateVersionRequest(NodeInfo nodeInfo, Request request, int clusterStateVersion) { + super(nodeInfo, clusterStateVersion); + this.request = request; + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java new file mode 100644 index 00000000000..f19bb5ad9b8 --- /dev/null +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCActivateClusterStateVersionWaiter.java @@ -0,0 +1,44 @@ +// Copyright 2019 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.vespa.clustercontroller.core.rpc; + +import com.yahoo.jrt.ErrorCode; +import com.yahoo.jrt.Request; +import com.yahoo.jrt.RequestWaiter; +import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest; +import com.yahoo.vespa.clustercontroller.core.Communicator; +import com.yahoo.vespa.clustercontroller.core.NodeInfo; +import com.yahoo.vespa.clustercontroller.core.Timer; + +public class RPCActivateClusterStateVersionWaiter implements RequestWaiter { + + ActivateClusterStateVersionRequest request; + Timer timer; + Communicator.Waiter waiter; + + public RPCActivateClusterStateVersionWaiter(Communicator.Waiter waiter, Timer timer) { + this.timer = timer; + this.waiter = waiter; + } + + public void setRequest(RPCActivateClusterStateVersionRequest request) { + this.request = request; + } + + public ActivateClusterStateVersionRequest.Reply getReply(Request req) { + NodeInfo info = request.getNodeInfo(); + if (req.isError()) { + return new ActivateClusterStateVersionRequest.Reply(req.errorCode(), req.errorMessage()); + } else if (!req.checkReturnTypes("")) { + return new ActivateClusterStateVersionRequest.Reply(ErrorCode.BAD_REPLY, "Got RPC response with invalid return types from " + info); + } + return new ActivateClusterStateVersionRequest.Reply(); + } + + @Override + public void handleRequestDone(Request request) { + ActivateClusterStateVersionRequest.Reply reply = getReply(request); + this.request.setReply(reply); + waiter.done(this.request); + } + +} diff --git a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java index 9089da68e10..a28da4d02a1 100644 --- a/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java +++ b/clustercontroller-core/src/main/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicator.java @@ -15,6 +15,7 @@ import com.yahoo.vdslib.state.NodeState; import com.yahoo.vdslib.state.ClusterState; import com.yahoo.vdslib.state.State; import com.yahoo.log.LogLevel; +import com.yahoo.vespa.clustercontroller.core.ActivateClusterStateVersionRequest; import com.yahoo.vespa.clustercontroller.core.ClusterStateBundle; import com.yahoo.vespa.clustercontroller.core.Communicator; import com.yahoo.vespa.clustercontroller.core.FleetControllerOptions; @@ -41,6 +42,8 @@ public class RPCCommunicator implements Communicator { public static final int LEGACY_SET_SYSTEM_STATE2_RPC_VERSION = 2; public static final String LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME = "setsystemstate2"; + public static final String ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME = "activate_cluster_state_version"; + private final Timer timer; private final Supervisor supervisor; private double nodeStateRequestTimeoutIntervalMaxSeconds; @@ -106,7 +109,7 @@ public class RPCCommunicator implements Communicator { public void getNodeState(NodeInfo node, Waiter externalWaiter) { Target connection = getConnection(node); if ( ! connection.isValid()) { - log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); + log.log(LogLevel.DEBUG, () -> String.format("Connection to '%s' could not be created.", node.getRpcAddress())); } NodeState currentState = node.getReportedState(); Request req = new Request("getnodestate3"); @@ -134,7 +137,7 @@ public class RPCCommunicator implements Communicator { Target connection = getConnection(node); if ( ! connection.isValid()) { - log.log(LogLevel.DEBUG, "Connection to " + node.getRpcAddress() + " could not be created."); + log.log(LogLevel.DEBUG, () -> String.format("Connection to '%s' could not be created.", node.getRpcAddress())); return; } int nodeVersion = node.getVersion(); @@ -161,6 +164,27 @@ public class RPCCommunicator implements Communicator { node.setSystemStateVersionSent(baselineState); } + @Override + public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter externalWaiter) { + var waiter = new RPCActivateClusterStateVersionWaiter(externalWaiter, timer); + + Target connection = getConnection(node); + if ( ! connection.isValid()) { + log.log(LogLevel.DEBUG, () -> String.format("Connection to '%s' could not be created.", node.getRpcAddress())); + return; + } + + var req = new Request(ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME); + req.parameters().add(new Int32Value(clusterStateVersion)); + + log.log(LogLevel.DEBUG, () -> String.format("Sending '%s' RPC to %s for state version %d", + req.methodName(), node.getRpcAddress(), clusterStateVersion)); + var activationRequest = new RPCActivateClusterStateVersionRequest(node, req, clusterStateVersion); + waiter.setRequest(activationRequest); + + connection.invokeAsync(req, 60, waiter); + } + // protected for testing. protected int generateNodeStateRequestTimeoutMs() { double intervalFraction = Math.random(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java index 5d200d65516..b1e7158f61c 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/DummyCommunicator.java @@ -47,6 +47,14 @@ public class DummyCommunicator implements Communicator, NodeLookup { } + public class DummyActivateClusterStateVersionRequest extends ActivateClusterStateVersionRequest { + + public DummyActivateClusterStateVersionRequest(NodeInfo nodeInfo, int stateVersion) { + super(nodeInfo, stateVersion); + } + + } + private Map getNodeStateRequests = new TreeMap<>(); public DummyCommunicator(List nodeList, Timer timer) { @@ -98,6 +106,13 @@ public class DummyCommunicator implements Communicator, NodeLookup { } } + @Override + public void activateClusterStateVersion(int clusterStateVersion, NodeInfo node, Waiter waiter) { + var req = new DummyActivateClusterStateVersionRequest(node, clusterStateVersion); + req.setReply(new ActivateClusterStateVersionRequest.Reply()); + waiter.done(req); + } + public void sendAllDeferredDistributorClusterStateAcks() { deferredClusterStateAcks.forEach(reqAndWaiter -> reqAndWaiter.getFirst().done(reqAndWaiter.getSecond())); deferredClusterStateAcks.clear(); diff --git a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java index 7602f0c83a2..61e9d1a90de 100644 --- a/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java +++ b/clustercontroller-core/src/test/java/com/yahoo/vespa/clustercontroller/core/rpc/RPCCommunicatorTest.java @@ -99,7 +99,7 @@ public class RPCCommunicatorTest { (RequestWaiter)any()); } - private static class Fixture { + private static class Fixture { final Supervisor mockSupervisor = mock(Supervisor.class); final Target mockTarget = mock(Target.class); final Timer timer = new FakeTimer(); @@ -107,7 +107,7 @@ public class RPCCommunicatorTest { final AtomicReference receivedRequest = new AtomicReference<>(); final AtomicReference receivedWaiter = new AtomicReference<>(); @SuppressWarnings("unchecked") // Cannot mock with "compiler-obvious" type safety for generics - final Communicator.Waiter mockWaiter = mock(Communicator.Waiter.class); + final Communicator.Waiter mockWaiter = mock(Communicator.Waiter.class); Fixture() { communicator = new RPCCommunicator( @@ -131,9 +131,9 @@ public class RPCCommunicatorTest { @Test public void setSystemState_v3_sends_distribution_states_rpc() { - Fixture f = new Fixture(); - ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); - ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3"); + var f = new Fixture(); + var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + var sentBundle = ClusterStateBundleUtil.makeBundle("distributor:3 storage:3"); f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); Request req = f.receivedRequest.get(); @@ -147,9 +147,9 @@ public class RPCCommunicatorTest { @Test public void set_distribution_states_v3_rpc_auto_downgrades_to_v2_on_unknown_method_error() { - Fixture f = new Fixture(); - ClusterFixture cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); - ClusterStateBundle sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3"); + var f = new Fixture(); + var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + var sentBundle = ClusterStateBundleUtil.makeBundle("version:123 distributor:3 storage:3"); f.communicator.setSystemState(sentBundle, cf.cluster().getNodeInfo(Node.ofStorage(1)), f.mockWaiter); RequestWaiter waiter = f.receivedWaiter.get(); @@ -171,4 +171,17 @@ public class RPCCommunicatorTest { assertThat(req.methodName(), equalTo(RPCCommunicator.LEGACY_SET_SYSTEM_STATE2_RPC_METHOD_NAME)); } + @Test + public void activateClusterStateVersion_sends_version_activation_rpc() { + var f = new Fixture(); + var cf = ClusterFixture.forFlatCluster(3).bringEntireClusterUp().assignDummyRpcAddresses(); + f.communicator.activateClusterStateVersion(12345, cf.cluster().getNodeInfo(Node.ofDistributor(1)), f.mockWaiter); + + Request req = f.receivedRequest.get(); + assertThat(req, notNullValue()); + assertThat(req.methodName(), equalTo(RPCCommunicator.ACTIVATE_CLUSTER_STATE_VERSION_RPC_METHOD_NAME)); + assertTrue(req.parameters().satisfies("i")); // + assertThat(req.parameters().get(0).asInt32(), equalTo(12345)); + } + } -- cgit v1.2.3