summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/dispatch
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-08-16 14:31:20 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-08-16 14:31:20 +0200
commit9dab52cdfc1d679629046899c85a6b5139ab5945 (patch)
tree90e850697f00f5f902ff1a2b5c11d4492c531ceb /container-search/src/main/java/com/yahoo/search/dispatch
parente32d551e91700add8758cf57d9b91f7624c2bd3a (diff)
Only use direct dispatch when the local node is responding
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch')
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java7
-rw-r--r--container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java128
2 files changed, 128 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
index e1b9f717d61..ca6445cff44 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java
@@ -1,6 +1,7 @@
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.dispatch;
+import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.yahoo.collections.ListMap;
@@ -8,6 +9,7 @@ import com.yahoo.component.AbstractComponent;
import com.yahoo.compress.CompressionType;
import com.yahoo.compress.Compressor;
import com.yahoo.data.access.slime.SlimeAdapter;
+import com.yahoo.prelude.fastsearch.FS4ResourcePool;
import com.yahoo.prelude.fastsearch.FastHit;
import com.yahoo.prelude.fastsearch.TimeoutException;
import com.yahoo.search.Query;
@@ -35,6 +37,7 @@ import java.util.logging.Logger;
*
* @author bratseth
*/
+@Beta
public class Dispatcher extends AbstractComponent {
private final static Logger log = Logger.getLogger(Dispatcher.class.getName());
@@ -49,9 +52,9 @@ public class Dispatcher extends AbstractComponent {
private final Compressor compressor = new Compressor();
@Inject
- public Dispatcher(DispatchConfig dispatchConfig) {
+ public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) {
this.client = new RpcClient();
- this.searchCluster = new SearchCluster(dispatchConfig);
+ this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool);
// Create node rpc connections, indexed by the legacy "partid", which allows us to bridge
// between fs4 calls (for search) and rpc calls (for summary fetch)
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
index 4f1b1ebfec0..6eb67b245f2 100644
--- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
+++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java
@@ -1,12 +1,33 @@
package com.yahoo.search.dispatch;
+import com.google.common.annotations.Beta;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
+import com.yahoo.search.cluster.ClusterMonitor;
+import com.yahoo.search.cluster.NodeManager;
+import com.yahoo.search.result.ErrorMessage;
import com.yahoo.vespa.config.search.DispatchConfig;
+// Only needed until query requests are moved to rpc
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.fastsearch.FastSearcher;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.prelude.Pong;
+import com.yahoo.prelude.fastsearch.FS4ResourcePool;
+
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
@@ -14,18 +35,26 @@ import java.util.stream.Collectors;
*
* @author bratseth
*/
-public class SearchCluster {
+@Beta
+public class SearchCluster implements NodeManager<SearchCluster.Node> {
+ private static final Logger log = Logger.getLogger(SearchCluster.class.getName());
+
private final int size;
private final ImmutableMap<Integer, Group> groups;
private final ImmutableMultimap<String, Node> nodesByHost;
+ private final ClusterMonitor<Node> clusterMonitor;
- public SearchCluster(DispatchConfig dispatchConfig) {
- this(toNodes(dispatchConfig));
+ // Only needed until query requests are moved to rpc
+ private final FS4ResourcePool fs4ResourcePool;
+
+ public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) {
+ this(toNodes(dispatchConfig), fs4ResourcePool);
}
- public SearchCluster(List<Node> nodes) {
+ public SearchCluster(List<Node> nodes, FS4ResourcePool fs4ResourcePool) {
size = nodes.size();
+ this.fs4ResourcePool = fs4ResourcePool;
// Create groups
ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>();
@@ -38,6 +67,12 @@ public class SearchCluster {
for (Node node : nodes)
nodesByHostBuilder.put(node.hostname(), node);
nodesByHost = nodesByHostBuilder.build();
+
+ // Set up monitoring of the fs4 interface of the nodes
+ // We can switch to monitoring the rpc interface instead when we move the query phase to rpc
+ clusterMonitor = new ClusterMonitor<>(this);
+ for (Node node : nodes)
+ clusterMonitor.add(node, true);
}
private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) {
@@ -58,7 +93,68 @@ public class SearchCluster {
* One host may contain multiple nodes (on different ports), so this is a multi-map.
*/
public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; }
-
+
+ /** Used by the cluster monitor to manage node status */
+ @Override
+ public void working(Node node) { node.setWorking(true); }
+
+ /** Used by the cluster monitor to manage node status */
+ @Override
+ public void failed(Node node) { node.setWorking(false); }
+
+ /** Used by the cluster monitor to manage node status */
+ @Override
+ public void ping(Node node, Executor executor) {
+ Pinger pinger = new Pinger(node);
+ FutureTask<Pong> future = new FutureTask<>(pinger);
+
+ executor.execute(future);
+ Pong pong;
+ try {
+ pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node));
+ log.log(Level.WARNING, "Exception pinging " + node, e);
+ } catch (ExecutionException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node));
+ log.log(Level.WARNING, "Exception pinging " + node, e);
+ } catch (TimeoutException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out"));
+ }
+ future.cancel(true);
+
+ if (pong.badResponse())
+ clusterMonitor.failed(node, pong.getError(0));
+ else
+ clusterMonitor.responded(node);
+ }
+
+ private class Pinger implements Callable<Pong> {
+
+ private final Node node;
+
+ public Pinger(Node node) {
+ this.node = node;
+ }
+
+ public Pong call() {
+ Pong pong;
+ try {
+ pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()),
+ fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString());
+ } catch (RuntimeException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": "
+ + Exceptions.toMessageString(e)));
+ }
+ return pong;
+ }
+
+ }
+
public static class Group {
private final int id;
@@ -86,6 +182,8 @@ public class SearchCluster {
private final int port;
private final int group;
+ private final AtomicBoolean working = new AtomicBoolean();
+
public Node(String hostname, int port, int group) {
this.hostname = hostname;
this.port = port;
@@ -98,6 +196,26 @@ public class SearchCluster {
/** Returns the id of this group this node belongs to */
public int group() { return group; }
+ private void setWorking(boolean working) {
+ this.working.lazySet(working);
+ }
+
+ /** Returns whether this node is currently responding to requests */
+ public boolean isWorking() { return working.get(); }
+
+ @Override
+ public int hashCode() { return Objects.hash(hostname, port); }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) return true;
+ if ( ! (o instanceof Node)) return false;
+ Node other = (Node)o;
+ if ( ! Objects.equals(this.hostname, other.hostname)) return false;
+ if ( ! Objects.equals(this.port, other.port)) return false;
+ return true;
+ }
+
@Override
public String toString() { return "search node " + hostname; }