summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-01-31 20:46:05 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-01-31 20:46:05 +0000
commit241612c73b9d9dd00fcf196d9be4bafccc1d305c (patch)
treef155d73ae8388411289fce9c7e7579d14babe5d1 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
parentde5f702fbbce8386b522d1afbc309a2621a387fd (diff)
Send ping every second truly async to all nodes who does not have any pending pings.
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java56
1 files changed, 22 insertions, 34 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 5f211c37917..675db0ee60d 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -10,7 +10,6 @@ import com.yahoo.net.HostName;
import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
-import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
import java.util.LinkedHashMap;
@@ -18,13 +17,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Predicate;
-import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -263,24 +256,33 @@ public class SearchCluster implements NodeManager<Node> {
return localCorpusDispatchTarget.isPresent() && localCorpusDispatchTarget.get().group() == group.id();
}
+ private static class PongCallback implements PongHandler {
+ private final ClusterMonitor<Node> clusterMonitor;
+ private final Node node;
+ PongCallback(Node node, ClusterMonitor<Node> clusterMonitor) {
+ this.node = node;
+ this.clusterMonitor = clusterMonitor;
+ }
+ @Override
+ public void handle(Pong pong) {
+ if (pong.badResponse()) {
+ clusterMonitor.failed(node, pong.error().get());
+ } else {
+ if (pong.activeDocuments().isPresent()) {
+ node.setActiveDocuments(pong.activeDocuments().get());
+ }
+ clusterMonitor.responded(node);
+ }
+ }
+ }
+
/** Used by the cluster monitor to manage node status */
@Override
public void ping(Node node, Executor executor) {
if (pingFactory == null) return; // not initialized yet
- FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor));
- executor.execute(futurePong);
- Pong pong = getPong(futurePong, node);
- futurePong.cancel(true);
-
- if (pong.badResponse()) {
- clusterMonitor.failed(node, pong.error().get());
- } else {
- if (pong.activeDocuments().isPresent()) {
- node.setActiveDocuments(pong.activeDocuments().get());
- }
- clusterMonitor.responded(node);
- }
+ Pinger pinger = pingFactory.createPinger(node, clusterMonitor);
+ pinger.ping(new PongCallback(node, clusterMonitor));
}
private void pingIterationCompletedSingleGroup() {
@@ -353,20 +355,6 @@ public class SearchCluster implements NodeManager<Node> {
return workingNodes + nodesAllowedDown >= nodesInGroup;
}
- private Pong getPong(FutureTask<Pong> futurePong, Node node) {
- try {
- return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.log(Level.WARNING, "Exception pinging " + node, e);
- return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node));
- } catch (ExecutionException e) {
- log.log(Level.WARNING, "Exception pinging " + node, e);
- return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node));
- } catch (TimeoutException e) {
- return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out"));
- }
- }
-
/**
* Calculate whether a subset of nodes in a group has enough coverage
*/