summaryrefslogtreecommitdiffstats
path: root/container-search/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-02-03 10:47:48 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-02-03 10:47:48 +0000
commit7e17736281d6e83efc53026fba168a2f0520b6e9 (patch)
treec50c8d10044687482798cecd221b475fa6884eca /container-search/src
parentb3468dcd4f2185f82eeaceedc30c220cd96aaeb0 (diff)
Use sequence numbers and check on Pong reception instead.
Diffstat (limited to 'container-search/src')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java28
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java19
-rw-r--r--container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java15
3 files changed, 48 insertions, 14 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
index b6daecfa8c3..b0bcd7d90df 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java
@@ -15,33 +15,36 @@ import com.yahoo.search.dispatch.searchcluster.PongHandler;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.yolean.Exceptions;
-public class RpcPing implements Pinger {
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Logger;
+public class RpcPing implements Pinger, Client.ResponseReceiver {
+
+ private static final Logger log = Logger.getLogger(RpcPing.class.getName());
private static final String RPC_METHOD = "vespa.searchprotocol.ping";
private static final CompressionType PING_COMPRESSION = CompressionType.NONE;
private final Node node;
private final RpcResourcePool resourcePool;
private final ClusterMonitor<Node> clusterMonitor;
+ private final long pingSequenceId;
+ private final AtomicReference<PongHandler> handler = new AtomicReference<>();
public RpcPing(Node node, ClusterMonitor<Node> clusterMonitor, RpcResourcePool rpcResourcePool) {
this.node = node;
this.resourcePool = rpcResourcePool;
this.clusterMonitor = clusterMonitor;
+ pingSequenceId = node.createPingSequenceId();
}
@Override
public void ping(PongHandler handler) {
try {
- if (node.sendPing()) {
- sendPing(handler);
- }
+ sendPing(handler);
} catch (RuntimeException e) {
handler.handle(new Pong(
ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e))));
- node.receivePing();
} catch (Throwable throwable) {
- node.receivePing();
}
}
@@ -60,12 +63,12 @@ public class RpcPing implements Pinger {
}
private void sendPing(PongHandler handler) {
+ this.handler.set(handler);
var connection = resourcePool.getConnection(node.key());
var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray();
double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0;
Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping);
- connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(),
- rsp -> { node.receivePing(); handler.handle(toPong(rsp));}, timeoutSeconds);
+ connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(),this, timeoutSeconds);
}
private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException {
@@ -83,4 +86,13 @@ public class RpcPing implements Pinger {
}
}
+ @Override
+ public void receive(ResponseOrError<ProtobufResponse> response) {
+ if (node.isLastReceivedPong(pingSequenceId)) {
+ handler.get().handle(toPong(response));
+ } else {
+ //TODO Reduce to debug or remove once we have enumerated what happens here.
+ log.info("Pong " + pingSequenceId + " received too late, latest is " + node.getLastReceivedPongId());
+ }
+ }
}
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
index 82aa0a50e72..f3a4d752d92 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java
@@ -21,7 +21,8 @@ public class Node {
private final AtomicBoolean statusIsKnown = new AtomicBoolean(false);
private final AtomicBoolean working = new AtomicBoolean(true);
private final AtomicLong activeDocuments = new AtomicLong(0);
- private final AtomicBoolean pendingPing = new AtomicBoolean();
+ private final AtomicLong pingSequence = new AtomicLong(0);
+ private final AtomicLong lastPing = new AtomicLong(0);
public Node(int key, String hostname, int group) {
this.key = key;
@@ -29,11 +30,17 @@ public class Node {
this.group = group;
}
- /** Only send ping if this method return true. If not the is a ping outstanding. */
- public boolean sendPing() { return ! pendingPing.getAndSet(true); }
-
- /** Need to be called when a pong is called to allow next ping to go through. */
- public void receivePing() { pendingPing.set(false); }
+ /** Give a monotonically increasing sequence number.*/
+ public long createPingSequenceId() { return pingSequence.incrementAndGet(); }
+ /** Checks if this pong is received in line and accepted, or out of band and should be ignored..*/
+ public boolean isLastReceivedPong(long pingId ) {
+ long last = lastPing.get();
+ while ((pingId > last) && ! lastPing.compareAndSet(last, pingId)) {
+ last = pingSequence.get();
+ }
+ return last < pingId;
+ }
+ public long getLastReceivedPongId() { return lastPing.get(); }
/** Returns the unique and stable distribution key of this node */
public int key() { return key; }
diff --git a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
index 69e3c5cab89..bc32297d123 100644
--- a/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
+++ b/container-search/src/test/java/com/yahoo/search/dispatch/searchcluster/SearchClusterTest.java
@@ -20,6 +20,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -311,4 +312,18 @@ public class SearchClusterTest {
verifyThatVipStatusUpRequireOnlyOneOnlineNode(3, 3);
}
+ @Test
+ public void requireThatPingSequenceIsUpHeld() {
+ Node node = new Node(1, "n", 1);
+ assertEquals(1, node.createPingSequenceId());
+ assertEquals(2, node.createPingSequenceId());
+ assertEquals(0, node.getLastReceivedPongId());
+ assertTrue(node.isLastReceivedPong(2));
+ assertEquals(2, node.getLastReceivedPongId());
+ assertFalse(node.isLastReceivedPong(1));
+ assertFalse(node.isLastReceivedPong(2));
+ assertTrue(node.isLastReceivedPong(3));
+ assertEquals(3, node.getLastReceivedPongId());
+ }
+
}