diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-08-17 14:43:37 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-08-17 14:43:37 +0200 |
commit | 391f3b2c945004ad9596171413b86374bc6a573a (patch) | |
tree | 2dc506dfdd1c7d8345cb277e8872e99f3b6966bd /container-search/src/main/java/com/yahoo/search | |
parent | 3d4ea53f113dfa62ad759f87843a0f80a133169b (diff) |
Add min_activedocs_coverage to dispatch config + cleanup
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search')
4 files changed, 65 insertions, 83 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java index d5ab94e7ec1..f660f39656d 100644 --- a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -76,20 +76,16 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod try { pong = future.get(monitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + p)); + pong = new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + p)); logThrowable = e; } catch (ExecutionException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + p)); + pong = new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + p)); logThrowable = e; } catch (LinkageError e) { // Typically Osgi woes - pong = new Pong(); - pong.addError(ErrorMessage.createErrorInPluginSearcher("Class loading problem",e)); + pong = new Pong(ErrorMessage.createErrorInPluginSearcher("Class loading problem",e)); logThrowable = e; } catch (TimeoutException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out.")); + pong = new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out.")); } future.cancel(true); @@ -331,18 +327,13 @@ public abstract class ClusterSearcher<T> extends PingableSearcher implements Nod } public Pong call() { - Pong pong; try { - pong = ping(new Ping(monitor.getConfiguration().getRequestTimeout()), connection); + return ping(new Ping(monitor.getConfiguration().getRequestTimeout()), connection); } catch (RuntimeException e) { - pong = new Pong(); - pong.addError( - ErrorMessage.createBackendCommunicationError( - "Exception when pinging " - + connection + ": " - + Exceptions.toMessageString(e))); + return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + + connection + ": " + + Exceptions.toMessageString(e))); } - return pong; } } diff --git a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java index 4e5d78e30f4..c6581b14942 100644 --- a/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java +++ b/container-search/src/main/java/com/yahoo/search/dispatch/SearchCluster.java @@ -26,6 +26,7 @@ import java.util.concurrent.FutureTask; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -39,7 +40,9 @@ import java.util.stream.Collectors; public class SearchCluster implements NodeManager<SearchCluster.Node> { private static final Logger log = Logger.getLogger(SearchCluster.class.getName()); - + + /** The min active docs a group must have to be considered up, as a % of the average active docs of the other groups */ + private double minActivedocsCoverage; private final int size; private final ImmutableMap<Integer, Group> groups; private final ImmutableMultimap<String, Node> nodesByHost; @@ -49,10 +52,11 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private final FS4ResourcePool fs4ResourcePool; public SearchCluster(DispatchConfig dispatchConfig, FS4ResourcePool fs4ResourcePool) { - this(toNodes(dispatchConfig), fs4ResourcePool); + this(dispatchConfig.min_activedocs_coverage(), toNodes(dispatchConfig), fs4ResourcePool); } - public SearchCluster(List<Node> nodes, FS4ResourcePool fs4ResourcePool) { + public SearchCluster(double minActivedocsCoverage, List<Node> nodes, FS4ResourcePool fs4ResourcePool) { + this.minActivedocsCoverage = minActivedocsCoverage; size = nodes.size(); this.fs4ResourcePool = fs4ResourcePool; @@ -106,30 +110,29 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { @Override public void ping(Node node, Executor executor) { Pinger pinger = new Pinger(node); - FutureTask<Pong> future = new FutureTask<>(pinger); + FutureTask<Pong> futurePong = new FutureTask<>(pinger); + executor.execute(futurePong); + Pong pong = getPong(futurePong, node); + futurePong.cancel(true); - executor.execute(future); - Pong pong; + if (pong.badResponse()) + clusterMonitor.failed(node, pong.getError(0)); + else + clusterMonitor.responded(node); + } + + private Pong getPong(FutureTask<Pong> futurePong, Node node) { try { - pong = future.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); + return futurePong.get(clusterMonitor.getConfiguration().getFailLimit(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); log.log(Level.WARNING, "Exception pinging " + node, e); + return new Pong(ErrorMessage.createUnspecifiedError("Ping was interrupted: " + node)); } catch (ExecutionException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); log.log(Level.WARNING, "Exception pinging " + node, e); + return new Pong(ErrorMessage.createUnspecifiedError("Execution was interrupted: " + node)); } catch (TimeoutException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); + return new Pong(ErrorMessage.createNoAnswerWhenPingingNode("Ping thread timed out")); } - future.cancel(true); - - if (pong.badResponse()) - clusterMonitor.failed(node, pong.getError(0)); - else - clusterMonitor.responded(node); } private class Pinger implements Callable<Pong> { @@ -141,16 +144,15 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { } public Pong call() { - Pong pong; try { - pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), - fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString()); + Pong pong = FastSearcher.ping(new Ping(clusterMonitor.getConfiguration().getRequestTimeout()), + fs4ResourcePool.getBackend(node.hostname(), node.port()), node.toString()); + // TODO: Update active docs + return pong; } catch (RuntimeException e) { - pong = new Pong(); - pong.addError(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " - + Exceptions.toMessageString(e))); + return new Pong(ErrorMessage.createBackendCommunicationError("Exception when pinging " + node + ": " + + Exceptions.toMessageString(e))); } - return pong; } } @@ -160,6 +162,8 @@ public class SearchCluster implements NodeManager<SearchCluster.Node> { private final int id; private final ImmutableList<Node> nodes; + private AtomicLong activeDocs = new AtomicLong(0); + public Group(int id, List<Node> nodes) { this.id = id; this.nodes = ImmutableList.copyOf(nodes); diff --git a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java index b43798113de..7e8f3553cab 100644 --- a/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/federation/ForwardingSearcher.java @@ -25,65 +25,55 @@ import com.yahoo.search.searchchain.Execution; */ @After("*") public class ForwardingSearcher extends PingableSearcher { + private final ComponentSpecification target; - public ForwardingSearcher(final SearchchainForwardConfig config) { + public ForwardingSearcher(SearchchainForwardConfig config) { if (config.target() == null) { - throw new RuntimeException( - "Configuration value searchchain-forward.target was null."); + throw new RuntimeException("Configuration value searchchain-forward.target was null."); } try { target = new ComponentSpecification(config.target()); } catch (RuntimeException e) { - throw new RuntimeException( - "Failed constructing the component specification from searchchain-forward.target: " - + config.target(), e); + throw new RuntimeException("Failed constructing the component specification from searchchain-forward.target: " + + config.target(), e); } } @Override - public Result search(final Query query, final Execution execution) { + public Result search(Query query, Execution execution) { Execution next = createForward(execution); - if (next == null) { + if (next == null) return badResult(query); - } else { + else return next.search(query); - } } - private Result badResult(final Query query) { - final ErrorMessage error = noSearchchain(); - return new Result(query, error); + private Result badResult(Query query) { + return new Result(query, noSearchchain()); } @Override - public Pong ping(final Ping ping, final Execution execution) { + public Pong ping(Ping ping, Execution execution) { Execution next = createForward(execution); - - if (next == null) { + if (next == null) return badPong(); - } else { + else return next.ping(ping); - } } private Pong badPong() { - final Pong pong = new Pong(); - pong.addError(noSearchchain()); - return pong; + return new Pong(noSearchchain()); } @Override - public void fill(final Result result, final String summaryClass, - final Execution execution) { + public void fill(Result result, String summaryClass, Execution execution) { Execution next = createForward(execution); - if (next == null) { + if (next == null) badFill(result.hits()); - return; - } else { + else next.fill(result, summaryClass); - } } private void badFill(HitGroup hits) { @@ -91,11 +81,8 @@ public class ForwardingSearcher extends PingableSearcher { } private Execution createForward(Execution execution) { - Chain<Searcher> targetChain = execution.context().searchChainRegistry() - .getComponent(target); - if (targetChain == null) { - return null; - } + Chain<Searcher> targetChain = execution.context().searchChainRegistry().getComponent(target); + if (targetChain == null) return null; return new Execution(targetChain, execution.context()); } diff --git a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java index 65ce7b3647c..d8321a579b3 100644 --- a/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java +++ b/container-search/src/main/java/com/yahoo/search/federation/http/HTTPSearcher.java @@ -483,11 +483,12 @@ public abstract class HTTPSearcher extends ClusterSearcher<Connection> { } } catch (MalformedURLException | URISyntaxException e) { pong.addError(ErrorMessage.createIllegalQuery("Malformed ping uri '" + uri + "': " + - Exceptions.toMessageString(e))); + Exceptions.toMessageString(e))); } catch (RuntimeException e) { - log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + " using uri '" + uri + "'",e); + log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + + " using uri '" + uri + "'",e); pong.addError(ErrorMessage.createIllegalQuery("Unexpected problem with ping uri '" + uri + "': " + - Exceptions.toMessageString(e))); + Exceptions.toMessageString(e))); } if (uri == null) return pong; @@ -497,16 +498,15 @@ public abstract class HTTPSearcher extends ClusterSearcher<Connection> { response = getPingResponse(uri, ping); checkPing(response, pong); } catch (IOException e) { - //We do not have a valid ping + // We do not have a valid ping pong.addError(ErrorMessage.createBackendCommunicationError( - "Exception thrown when pinging with url '" + uri + "': " + Exceptions.toMessageString(e))); + "Exception thrown when pinging with url '" + uri + "': " + Exceptions.toMessageString(e))); } catch (TimeoutException e) { - pong.addError(ErrorMessage.createTimeout("Timeout for ping " - + uri + " in " + this + ": " + e.getMessage())); + pong.addError(ErrorMessage.createTimeout("Timeout for ping " + uri + " in " + this + ": " + e.getMessage())); } catch (RuntimeException e) { log.log(Level.WARNING,"Unexpected exception while attempting to ping " + connection + " using uri '" + uri + "'",e); pong.addError(ErrorMessage.createIllegalQuery("Unexpected problem with ping uri '" + uri + "': " + - Exceptions.toMessageString(e))); + Exceptions.toMessageString(e))); } finally { if (response != null) { cleanupHttpEntity(response.getEntity()); |