diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-08-16 14:31:20 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-08-16 14:31:20 +0200 |
commit | 9dab52cdfc1d679629046899c85a6b5139ab5945 (patch) | |
tree | 90e850697f00f5f902ff1a2b5c11d4492c531ceb /container-search/src/main/java/com/yahoo/search/dispatch | |
parent | e32d551e91700add8758cf57d9b91f7624c2bd3a (diff) |
Only use direct dispatch when the local node is responding
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/dispatch')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java | 7 | ||||
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java | 128 |
2 files changed, 128 insertions, 7 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java index e1b9f717d61..ca6445cff44 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/Dispatcher.java @@ -1,6 +1,7 @@ // Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.dispatch; +import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableMap; import com.google.inject.Inject; import com.yahoo.collections.ListMap; @@ -8,6 +9,7 @@ import com.yahoo.component.AbstractComponent; import com.yahoo.compress.CompressionType; import com.yahoo.compress.Compressor; import com.yahoo.data.access.slime.SlimeAdapter; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; import com.yahoo.prelude.fastsearch.FastHit; import com.yahoo.prelude.fastsearch.TimeoutException; import com.yahoo.search.Query; @@ -35,6 +37,7 @@ import java.util.logging.Logger; * * @author bratseth */ +@Beta public class Dispatcher extends AbstractComponent { private final static Logger log = Logger.getLogger(Dispatcher.class.getName()); @@ -49,9 +52,9 @@ public class Dispatcher extends AbstractComponent { private final Compressor compressor = new Compressor(); @Inject - public Dispatcher(DispatchConfig dispatchConfig) { + public Dispatcher(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) { this.client = new RpcClient(); - this.searchCluster = new SearchCluster(dispatchConfig); + this.searchCluster = new SearchCluster(dispatchConfig, fs4ResourcePool); // Create node rpc connections, indexed by the legacy "partid", which allows us to bridge // between fs4 calls (for search) and rpc calls (for summary fetch) diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index 4f1b1ebfec0..6eb67b245f2 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -1,12 +1,33 @@ package com.yahoo.search.dispatch; +import com.google.common.annotations.Beta; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMultimap; +import com.yahoo.search.cluster.ClusterMonitor; +import com.yahoo.search.cluster.NodeManager; +import com.yahoo.search.result.ErrorMessage; import com.yahoo.vespa.config.search.DispatchConfig; +// Only needed until query requests are moved to rpc +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.fastsearch.FastSearcher; +import com.yahoo.yolean.Exceptions; +import com.yahoo.prelude.Pong; +import com.yahoo.prelude.fastsearch.FS4ResourcePool; + import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; import java.util.stream.Collectors; /** @@ -14,18 +35,26 @@ import java.util.stream.Collectors; * * @author bratseth */ -public class SearchCluster { +@Beta +public class SearchCluster implements NodeManager<SearchCluster.Node> { + private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); + private final int size; private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; + private final ClusterMonitor<Node> clusterMonitor; - public SearchCluster(DispatchConfig dispatchConfig) { - this(toNodes(dispatchConfig)); + // Only needed until query requests are moved to rpc + private final FS4ResourcePool fs4ResourcePool; + + public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) { + this(toNodes(dispatchConfig), fs4ResourcePool); } - public SearchCluster(List<Node> nodes) { + public SearchCluster(List<Node> nodes, FS4ResourcePool fs4ResourcePool) { size = nodes.size(); + this.fs4ResourcePool = fs4ResourcePool; // Create groups ImmutableMap.Builder<Integer, Group> groupsBuilder = new ImmutableMap.Builder<>(); @@ -38,6 +67,12 @@ public class SearchCluster { for (Node node : nodes) nodesByHostBuilder.put(node.hostname(), node); nodesByHost = nodesByHostBuilder.build(); + + // Set up monitoring of the fs4 interface of the nodes + // We can switch to monitoring the rpc interface instead when we move the query phase to rpc + clusterMonitor = new ClusterMonitor<>(this); + for (Node node : nodes) + clusterMonitor.add(node, true); } private static ImmutableList<Node> toNodes(DispatchConfig dispatchConfig) { @@ -58,7 +93,68 @@ public class SearchCluster { * One host may contain multiple nodes (on different ports), so this is a multi-map. */ public ImmutableMultimap<String, Node> nodesByHost() { return nodesByHost; } - + + /** Used by the cluster monitor to manage node status */ + @Override + public void working(Node node) { node.setWorking(true); } + + /** Used by the cluster monitor to manage node status */ + @Override + public void failed(Node node) { node.setWorking(false); } + + /** Used by the cluster monitor to manage node status */ + @Override + public void ping(Node node, Executor executor) { + Pinger pinger = new Pinger(node); + FutureTask<Pong> future = new FutureTask<>(pinger); + + executor.execute(future); + Pong pong; + try { + pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); + log.log(Level.WARNING, "Exception pinging " + node, e); + } catch (ExecutionException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); + log.log(Level.WARNING, "Exception pinging " + node, e); + } catch (TimeoutException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); + } + future.cancel(true); + + if (pong.badResponse()) + clusterMonitor.failed(node, pong.getError(0)); + else + clusterMonitor.responded(node); + } + + private class Pinger implements Callable<Pong> { + + private final Node node; + + public Pinger(Node node) { + this.node = node; + } + + public Pong call() { + Pong pong; + try { + pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), + fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString()); + } catch (RuntimeException e) { + pong = new Pong(); + pong.addError(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + + Exceptions.toMessageString(e))); + } + return pong; + } + + } + public static class Group { private final int id; @@ -86,6 +182,8 @@ public class SearchCluster { private final int port; private final int group; + private final AtomicBoolean working = new AtomicBoolean(); + public Node(String hostname, int port, int group) { this.hostname = hostname; this.port = port; @@ -98,6 +196,26 @@ public class SearchCluster { /** Returns the id of this group this node belongs to */ public int group() { return group; } + private void setWorking(boolean working) { + this.working.lazySet(working); + } + + /** Returns whether this node is currently responding to requests */ + public boolean isWorking() { return working.get(); } + + @Override + public int hashCode() { return Objects.hash(hostname, port); } + + @Override + public boolean equals(Object o) { + if (o == this) return true; + if ( ! (o instanceof Node)) return false; + Node other = (Node)o; + if ( ! Objects.equals(this.hostname, other.hostname)) return false; + if ( ! Objects.equals(this.port, other.port)) return false; + return true; + } + @Override public String toString() { return "search node " + hostname; } |