diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-03 10:47:48 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-03 10:47:48 +0000 |
commit | 7e17736281d6e83efc53026fba168a2f0520b6e9 (patch) | |
tree | c50c8d10044687482798cecd221b475fa6884eca /container-search | |
parent | b3468dcd4f2185f82eeaceedc30c220cd96aaeb0 (diff) |
Use sequence numbers and check on Pong reception instead.
Diffstat (limited to 'container-search')
3 files changed, 48 insertions, 14 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index b6daecfa8c3..b0bcd7d90df 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -15,33 +15,36 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler; import com.yahoo.search.result.ErrorMessage; import com.yahoo.yolean.Exceptions; -public class RpcPing implements Pinger { +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; +public class RpcPing implements Pinger, Client.ResponseReceiver { + + private static final Logger log = Logger.getLogger(RpcPing.class.getName()); private static final String RPC_METHOD = "vespa.searchprotocol.ping"; private static final CompressionType PING_COMPRESSION = CompressionType.NONE; private final Node node; private final RpcResourcePool resourcePool; private final ClusterMonitor<Node> clusterMonitor; + private final long pingSequenceId; + private final AtomicReference<PongHandler> handler = new AtomicReference<>(); public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool) { this.node = node; this.resourcePool = rpcResourcePool; this.clusterMonitor = clusterMonitor; + pingSequenceId = node.createPingSequenceId(); } @Override public void ping(PongHandler handler) { try { - if (node.sendPing()) { - sendPing(handler); - } + sendPing(handler); } catch (RuntimeException e) { handler.handle(new Pong( ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e)))); - node.receivePing(); } catch (Throwable throwable) { - node.receivePing(); } } @@ -60,12 +63,12 @@ public class RpcPing implements Pinger { } private void sendPing(PongHandler handler) { + this.handler.set(handler); var connection = resourcePool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); - connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), - rsp -> { node.receivePing(); handler.handle(toPong(rsp));}, timeoutSeconds); + connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(),this, timeoutSeconds); } private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { @@ -83,4 +86,13 @@ public class RpcPing implements Pinger { } } + @Override + public void receive(ResponseOrError<ProtobufResponse> response) { + if (node.isLastReceivedPong(pingSequenceId)) { + handler.get().handle(toPong(response)); + } else { + //TODO Reduce to debug or remove once we have enumerated what happens here. + log.info("Pong " + pingSequenceId + " received too late, latest is " + node.getLastReceivedPongId()); + } + } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index 82aa0a50e72..f3a4d752d92 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -21,7 +21,8 @@ public class Node { private final AtomicBoolean statusIsKnown = new AtomicBoolean(false); private final AtomicBoolean working = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); - private final AtomicBoolean pendingPing = new AtomicBoolean(); + private final AtomicLong pingSequence = new AtomicLong(0); + private final AtomicLong lastPing = new AtomicLong(0); public Node(int key, String hostname, int group) { this.key = key; @@ -29,11 +30,17 @@ public class Node { this.group = group; } - /** Only send ping if this method return true. If not the is a ping outstanding. */ - public boolean sendPing() { return ! pendingPing.getAndSet(true); } - - /** Need to be called when a pong is called to allow next ping to go through. */ - public void receivePing() { pendingPing.set(false); } + /** Give a monotonically increasing sequence number.*/ + public long createPingSequenceId() { return pingSequence.incrementAndGet(); } + /** Checks if this pong is received in line and accepted, or out of band and should be ignored..*/ + public boolean isLastReceivedPong(long pingId ) { + long last = lastPing.get(); + while ((pingId > last) && ! lastPing.compareAndSet(last, pingId)) { + last = pingSequence.get(); + } + return last < pingId; + } + public long getLastReceivedPongId() { return lastPing.get(); } /** Returns the unique and stable distribution key of this node */ public int key() { return key; } diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java index 69e3c5cab89..bc32297d123 100644 --- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java +++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java @@ -20,6 +20,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -311,4 +312,18 @@ public class SearchClusterTest { verifyThatVipStatusUpRequireOnlyOneOnlineNode(3, 3); } + @Test + public void requireThatPingSequenceIsUpHeld() { + Node node = new Node(1, "n", 1); + assertEquals(1, node.createPingSequenceId()); + assertEquals(2, node.createPingSequenceId()); + assertEquals(0, node.getLastReceivedPongId()); + assertTrue(node.isLastReceivedPong(2)); + assertEquals(2, node.getLastReceivedPongId()); + assertFalse(node.isLastReceivedPong(1)); + assertFalse(node.isLastReceivedPong(2)); + assertTrue(node.isLastReceivedPong(3)); + assertEquals(3, node.getLastReceivedPongId()); + } + } |