diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-01-31 20:46:05 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-01-31 20:46:05 +0000 |
commit | 241612c73b9d9dd00fcf196d9be4bafccc1d305c (patch) | |
tree | f155d73ae8388411289fce9c7e7579d14babe5d1 /container-search/src/main/java/com | |
parent | de5f702fbbce8386b522d1afbc309a2621a387fd (diff) |
Send ping every second truly async to all nodes who does not have any pending pings.
Diffstat (limited to 'container-search/src/main/java/com')
8 files changed, 81 insertions, 59 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java index 226e0180d2e..7b10992dff8 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java @@ -30,7 +30,7 @@ public class MonitorConfiguration { * The number of milliseconds to attempt to complete a request * before giving up */ - private long requestTimeout = 5000; + private final long requestTimeout = 5000; /** * The number of milliseconds a node is allowed to fail before we diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java index 5c9928de924..119037d16dd 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcInvokerFactory.java @@ -14,6 +14,7 @@ import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.dispatch.SearchInvoker; import com.yahoo.search.dispatch.searchcluster.Node; import com.yahoo.search.dispatch.searchcluster.PingFactory; +import com.yahoo.search.dispatch.searchcluster.Pinger; import com.yahoo.search.dispatch.searchcluster.SearchCluster; import java.util.Optional; @@ -65,7 +66,7 @@ public class RpcInvokerFactory extends InvokerFactory implements PingFactory { } @Override - public Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor) { + public Pinger createPinger(Node node, ClusterMonitor<Node> monitor) { return new RpcPing(node, monitor, rpcResourcePool); } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java index e0f1dc5e675..b6daecfa8c3 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/rpc/RpcPing.java @@ -10,14 +10,12 @@ import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.dispatch.rpc.Client.ProtobufResponse; import com.yahoo.search.dispatch.rpc.Client.ResponseOrError; import com.yahoo.search.dispatch.searchcluster.Node; +import com.yahoo.search.dispatch.searchcluster.Pinger; +import com.yahoo.search.dispatch.searchcluster.PongHandler; import com.yahoo.search.result.ErrorMessage; import com.yahoo.yolean.Exceptions; -import java.util.concurrent.Callable; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - -public class RpcPing implements Callable<Pong> { +public class RpcPing implements Pinger { private static final String RPC_METHOD = "vespa.searchprotocol.ping"; private static final CompressionType PING_COMPRESSION = CompressionType.NONE; @@ -33,32 +31,41 @@ public class RpcPing implements Callable<Pong> { } @Override - public Pong call() throws Exception { + public void ping(PongHandler handler) { try { - var queue = new LinkedBlockingQueue<ResponseOrError<ProtobufResponse>>(1); - - sendPing(queue); - - var responseOrError = queue.poll(clusterMonitor.getConfiguration().getRequestTimeout(), TimeUnit.MILLISECONDS); - if (responseOrError == null) { - return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Timed out waiting for pong from " + node)); - } else if (responseOrError.error().isPresent()) { - return new Pong(ErrorMessage.createBackendCommunicationError(responseOrError.error().get())); + if (node.sendPing()) { + sendPing(handler); } + } catch (RuntimeException e) { + handler.handle(new Pong( + ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e)))); + node.receivePing(); + } catch (Throwable throwable) { + node.receivePing(); + } + } + + private Pong toPong(ResponseOrError<ProtobufResponse> responseOrError) { + if (responseOrError == null) { + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Timed out waiting for pong from " + node)); + } else if (responseOrError.error().isPresent()) { + return new Pong(ErrorMessage.createBackendCommunicationError(responseOrError.error().get())); + } + try { return decodeReply(responseOrError.response().get()); - } catch (RuntimeException e) { - return new Pong( - ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + Exceptions.toMessageString(e))); + } catch (InvalidProtocolBufferException e) { + return new Pong(ErrorMessage.createBackendCommunicationError(e.getMessage())); } } - private void sendPing(LinkedBlockingQueue<ResponseOrError<ProtobufResponse>> queue) { + private void sendPing(PongHandler handler) { var connection = resourcePool.getConnection(node.key()); var ping = SearchProtocol.MonitorRequest.newBuilder().build().toByteArray(); double timeoutSeconds = ((double) clusterMonitor.getConfiguration().getRequestTimeout()) / 1000.0; Compressor.Compression compressionResult = resourcePool.compressor().compress(PING_COMPRESSION, ping); - connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), rsp -> queue.add(rsp), timeoutSeconds); + connection.request(RPC_METHOD, compressionResult.type(), ping.length, compressionResult.data(), + rsp -> { node.receivePing(); handler.handle(toPong(rsp));}, timeoutSeconds); } private Pong decodeReply(ProtobufResponse response) throws InvalidProtocolBufferException { diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java index 2f70c37cd48..1802d3907b4 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Node.java @@ -21,6 +21,7 @@ public class Node { private final AtomicBoolean statusIsKnown = new AtomicBoolean(false); private final AtomicBoolean working = new AtomicBoolean(true); private final AtomicLong activeDocuments = new AtomicLong(0); + private final AtomicBoolean pendingPing = new AtomicBoolean(); public Node(int key, String hostname, int group) { this.key = key; @@ -28,6 +29,9 @@ public class Node { this.group = group; } + public boolean sendPing() { return ! pendingPing.getAndSet(true); } + public void receivePing() { pendingPing.set(false); } + /** Returns the unique and stable distribution key of this node */ public int key() { return key; } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java index b16fa941f68..fe2937d2450 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PingFactory.java @@ -1,13 +1,11 @@ // Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch.searchcluster; -import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; -import java.util.concurrent.Callable; public interface PingFactory { - Callable<Pong> createPinger(Node node, ClusterMonitor<Node> monitor); + Pinger createPinger(Node node, ClusterMonitor<Node> monitor); } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java new file mode 100644 index 00000000000..d6cf610cd65 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/Pinger.java @@ -0,0 +1,12 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.dispatch.searchcluster; + +/** + * Send a ping and ensure that the pong is propagated to the ponghandler. + * Should not wait as this should be done in parallel on all nodes. + * + * @author baldersheim + */ +public interface Pinger { + void ping(PongHandler handler); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PongHandler.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PongHandler.java new file mode 100644 index 00000000000..c0579b5d36e --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/PongHandler.java @@ -0,0 +1,12 @@ +package com.yahoo.search.dispatch.searchcluster; + +import com.yahoo.prelude.Pong; + +/** + * Handle the Pong result of a Ping. + * + * @author baldersheim + */ +public interface PongHandler { + void handle(Pong pong); +} diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 5f211c37917..675db0ee60d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -10,7 +10,6 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.LinkedHashMap; @@ -18,13 +17,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; -import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -263,24 +256,33 @@ public class SearchCluster implements NodeManager<Node> { return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); } + private static class PongCallback implements PongHandler { + private final ClusterMonitor<Node> clusterMonitor; + private final Node node; + PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { + this.node = node; + this.clusterMonitor = clusterMonitor; + } + @Override + public void handle(Pong pong) { + if (pong.badResponse()) { + clusterMonitor.failed(node, pong.error().get()); + } else { + if (pong.activeDocuments().isPresent()) { + node.setActiveDocuments(pong.activeDocuments().get()); + } + clusterMonitor.responded(node); + } + } + } + /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { if (pingFactory == null) return; // not initialized yet - FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); - executor.execute(futurePong); - Pong pong = getPong(futurePong, node); - futurePong.cancel(true); - - if (pong.badResponse()) { - clusterMonitor.failed(node, pong.error().get()); - } else { - if (pong.activeDocuments().isPresent()) { - node.setActiveDocuments(pong.activeDocuments().get()); - } - clusterMonitor.responded(node); - } + Pinger pinger = pingFactory.createPinger(node, clusterMonitor); + pinger.ping(new PongCallback(node, clusterMonitor)); } private void pingIterationCompletedSingleGroup() { @@ -353,20 +355,6 @@ public class SearchCluster implements NodeManager<Node> { return workingNodes + nodesAllowedDown >= nodesInGroup; } - private Pong getPong(FutureTask<Pong> futurePong, Node node) { - try { - return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.log(Level.WARNING, "Exception pinging " + node, e); - return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); - } catch (ExecutionException e) { - log.log(Level.WARNING, "Exception pinging " + node, e); - return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); - } catch (TimeoutException e) { - return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); - } - } - /** * Calculate whether a subset of nodes in a group has enough coverage */ |