diff options
author | Olli Virtanen <olli.virtanen@oath.com> | 2019-03-28 16:14:48 +0100 |
---|---|---|
committer | Olli Virtanen <olli.virtanen@oath.com> | 2019-03-28 16:14:48 +0100 |
commit | 748ad31c704fbd53ec45b659002a72564dbe2c04 (patch) | |
tree | c2e82755e2060af95f873d8e0224315522471755 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | |
parent | 5cb137c5209918bef89cf9fe14628c1078ac78f1 (diff) |
Feature flag to enable protobuf in search protocol as default; protobuf ping
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java index 5c3ef98c523..6c28352f27b 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java @@ -7,12 +7,12 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; import com.yahoo.container.handler.VipStatus; import com.yahoo.net.HostName; +import com.yahoo.prelude.Pong; import com.yahoo.search.cluster.ClusterMonitor; import com.yahoo.search.cluster.NodeManager; +import com.yahoo.search.dispatch.InvokerFactory; import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; -import com.yahoo.prelude.Pong; -import com.yahoo.prelude.fastsearch.FS4ResourcePool; import java.util.LinkedHashMap; import java.util.List; @@ -46,6 +46,7 @@ public class SearchCluster implements NodeManager<Node> { private final ImmutableList<Group> orderedGroups; private final ClusterMonitor<Node> clusterMonitor; private final VipStatus vipStatus; + private InvokerFactory pingFactory; /** * A search node on this local machine having the entire corpus, which we therefore @@ -57,13 +58,9 @@ public class SearchCluster implements NodeManager<Node> { */ private final Optional<Node> directDispatchTarget; - // Only needed until query requests are moved to rpc - private final FS4ResourcePool fs4ResourcePool; - - public SearchCluster(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) { + public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) { this.clusterId = clusterId; this.dispatchConfig = dispatchConfig; - this.fs4ResourcePool = fs4ResourcePool; this.vipStatus = vipStatus; List<Node> nodes = toNodes(dispatchConfig); @@ -89,15 +86,20 @@ public class SearchCluster implements NodeManager<Node> { this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize, nodesByHost, groups); - // Set up monitoring of the fs4 interface of the nodes - // We can switch to monitoring the rpc interface instead when we move the query phase to rpc this.clusterMonitor = new ClusterMonitor<>(this); - for (Node node : nodes) { - // cluster monitor will only call working() when the - // node transitions from down to up, so we need to - // register the initial (working) state here: - working(node); - clusterMonitor.add(node, true); + } + + public void startClusterMonitoring(InvokerFactory pingFactory) { + this.pingFactory = pingFactory; + + for (var group : orderedGroups) { + for (var node : group.nodes()) { + // cluster monitor will only call working() when the + // node transitions from down to up, so we need to + // register the initial (working) state here: + working(node); + clusterMonitor.add(node, true); + } } } @@ -251,16 +253,21 @@ public class SearchCluster implements NodeManager<Node> { /** Used by the cluster monitor to manage node status */ @Override public void ping(Node node, Executor executor) { - Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool); - FutureTask<Pong> futurePong = new FutureTask<>(pinger); + if (pingFactory == null) // not initialized yet + return; + FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor)); executor.execute(futurePong); Pong pong = getPong(futurePong, node); futurePong.cancel(true); - if (pong.badResponse()) + if (pong.badResponse()) { clusterMonitor.failed(node, pong.getError(0)); - else + } else { + if (pong.activeDocuments().isPresent()) { + node.setActiveDocuments(pong.activeDocuments().get()); + } clusterMonitor.responded(node); + } } /** |