diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-01-31 20:46:05 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-01-31 20:46:05 +0000 |
commit | 241612c73b9d9dd00fcf196d9be4bafccc1d305c (patch) | |
tree | f155d73ae8388411289fce9c7e7579d14babe5d1 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | |
parent | de5f702fbbce8386b522d1afbc309a2621a387fd (diff) |
Send ping every second truly async to all nodes who does not have any pending pings.
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 56 |
1 files changed, 22 insertions, 34 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 5f211c37917..675db0ee60d 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -10,7 +10,6 @@ import com.yahoo.net.HostName; import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; -import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; import java.util.LinkedHashMap; @@ -18,13 +17,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.OptionalInt; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; -import java.util.concurrent.FutureTask; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.function.Predicate; -import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -263,24 +256,33 @@ public class SearchCluster implements NodeManager<Node> { return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id(); } + private static class PongCallback implements PongHandler { + private final ClusterMonitor<Node> clusterMonitor; + private final Node node; + PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) { + this.node = node; + this.clusterMonitor = clusterMonitor; + } + @Override + public void handle(Pong pong) { + if (pong.badResponse()) { + clusterMonitor.failed(node, pong.error().get()); + } else { + if (pong.activeDocuments().isPresent()) { + node.setActiveDocuments(pong.activeDocuments().get()); + } + clusterMonitor.responded(node); + } + } + } + /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { if (pingFactory == null) return; // not initialized yet - FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); - executor.execute(futurePong); - Pong pong = getPong(futurePong, node); - futurePong.cancel(true); - - if (pong.badResponse()) { - clusterMonitor.failed(node, pong.error().get()); - } else { - if (pong.activeDocuments().isPresent()) { - node.setActiveDocuments(pong.activeDocuments().get()); - } - clusterMonitor.responded(node); - } + Pinger pinger = pingFactory.createPinger(node, clusterMonitor); + pinger.ping(new PongCallback(node, clusterMonitor)); } private void pingIterationCompletedSingleGroup() { @@ -353,20 +355,6 @@ public class SearchCluster implements NodeManager<Node> { return workingNodes + nodesAllowedDown >= nodesInGroup; } - private Pong getPong(FutureTask<Pong> futurePong, Node node) { - try { - return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - log.log(Level.WARNING, "Exception pinging " + node, e); - return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); - } catch (ExecutionException e) { - log.log(Level.WARNING, "Exception pinging " + node, e); - return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); - } catch (TimeoutException e) { - return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); - } - } - /** * Calculate whether a subset of nodes in a group has enough coverage */ |