summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
diff options
context:
space:
mode:
authorOlli Virtanen <olli.virtanen@oath.com>2019-03-28 16:14:48 +0100
committerOlli Virtanen <olli.virtanen@oath.com>2019-03-28 16:14:48 +0100
commit748ad31c704fbd53ec45b659002a72564dbe2c04 (patch)
treec2e82755e2060af95f873d8e0224315522471755 /container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
parent5cb137c5209918bef89cf9fe14628c1078ac78f1 (diff)
Feature flag to enable protobuf in search protocol as default; protobuf ping
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java45
1 files changed, 26 insertions, 19 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
index 5c3ef98c523..6c28352f27b 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/searchcluster/SearchCluster.java
@@ -7,12 +7,12 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.net.HostName;
+import com.yahoo.prelude.Pong;
import com.yahoo.search.cluster.ClusterMonitor;
import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.search.dispatch.InvokerFactory;
import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
-import com.yahoo.prelude.Pong;
-import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import java.util.LinkedHashMap;
import java.util.List;
@@ -46,6 +46,7 @@ public class SearchCluster implements NodeManager<Node> {
private final ImmutableList<Group> orderedGroups;
private final ClusterMonitor<Node> clusterMonitor;
private final VipStatus vipStatus;
+ private InvokerFactory pingFactory;
/**
* A search node on this local machine having the entire corpus, which we therefore
@@ -57,13 +58,9 @@ public class SearchCluster implements NodeManager<Node> {
*/
private final Optional<Node> directDispatchTarget;
- // Only needed until query requests are moved to rpc
- private final FS4ResourcePool fs4ResourcePool;
-
- public SearchCluster(String clusterId, DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool, int containerClusterSize, VipStatus vipStatus) {
+ public SearchCluster(String clusterId, DispatchConfig dispatchConfig, int containerClusterSize, VipStatus vipStatus) {
this.clusterId = clusterId;
this.dispatchConfig = dispatchConfig;
- this.fs4ResourcePool = fs4ResourcePool;
this.vipStatus = vipStatus;
List<Node> nodes = toNodes(dispatchConfig);
@@ -89,15 +86,20 @@ public class SearchCluster implements NodeManager<Node> {
this.directDispatchTarget = findDirectDispatchTarget(HostName.getLocalhost(), size, containerClusterSize,
nodesByHost, groups);
- // Set up monitoring of the fs4 interface of the nodes
- // We can switch to monitoring the rpc interface instead when we move the query phase to rpc
this.clusterMonitor = new ClusterMonitor<>(this);
- for (Node node : nodes) {
- // cluster monitor will only call working() when the
- // node transitions from down to up, so we need to
- // register the initial (working) state here:
- working(node);
- clusterMonitor.add(node, true);
+ }
+
+ public void startClusterMonitoring(InvokerFactory pingFactory) {
+ this.pingFactory = pingFactory;
+
+ for (var group : orderedGroups) {
+ for (var node : group.nodes()) {
+ // cluster monitor will only call working() when the
+ // node transitions from down to up, so we need to
+ // register the initial (working) state here:
+ working(node);
+ clusterMonitor.add(node, true);
+ }
}
}
@@ -251,16 +253,21 @@ public class SearchCluster implements NodeManager<Node> {
/** Used by the cluster monitor to manage node status */
@Override
public void ping(Node node, Executor executor) {
- Pinger pinger = new Pinger(node, clusterMonitor, fs4ResourcePool);
- FutureTask<Pong> futurePong = new FutureTask<>(pinger);
+ if (pingFactory == null) // not initialized yet
+ return;
+ FutureTask<Pong> futurePong = new FutureTask<>(pingFactory.createPinger(node, clusterMonitor));
executor.execute(futurePong);
Pong pong = getPong(futurePong, node);
futurePong.cancel(true);
- if (pong.badResponse())
+ if (pong.badResponse()) {
clusterMonitor.failed(node, pong.getError(0));
- else
+ } else {
+ if (pong.activeDocuments().isPresent()) {
+ node.setActiveDocuments(pong.activeDocuments().get());
+ }
clusterMonitor.responded(node);
+ }
}
/**