diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java |
Publish
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java | 374 |
1 files changed, 374 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java new file mode 100644 index 00000000000..da3d0d8e20b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -0,0 +1,374 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +import com.yahoo.component.ComponentId; +import com.yahoo.container.protect.Error; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.yolean.Exceptions; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.cluster.Hasher.NodeList; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.searchchain.Execution; + +import java.util.List; +import java.util.concurrent.*; + +/** + * Implements clustering (failover and load balancing) over a set of client + * connections to a homogenous cluster of nodes. Searchers which wants to make + * clustered connections to some service should use this. + * <p> + * This replaces the usual searcher methods by ones which have the same contract + * and semantics but which takes an additional parameter which is the Connection + * selected by the cluster searcher which the method should use. Overrides of + * these connection methods <i>must not</i> call the super methods to pass on + * but must use the methods on execution. + * <p> + * The type argument is the class (of any type) representing the connections. + * The connection objects should implement a good toString to ease diagnostics. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a> + */ +public abstract class ClusterSearcher<T> extends PingableSearcher implements NodeManager<T> { + + private Hasher<T> hasher = new Hasher<>(); + private ClusterMonitor<T> monitor = new ClusterMonitor<>(this, "dummy"); + + /** + * Creates a new cluster searcher + * + * @param id + * the id of this searcher + * @param connections + * the connections of the cluster + * @param internal + * whether or not this cluster is internal (part of the same + * installation) + */ + public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) { + this(id, connections, new Hasher<T>(), internal); + } + + public ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal) { + super(id); + this.hasher = hasher; + for (T connection : connections) { + monitor.add(connection, internal); + hasher.add(connection); + } + } + + /** + * Pinging a node by sending a query NodeManager method, called from + * ClusterMonitor + */ + public final @Override void ping(T p, Executor executor) { + log(LogLevel.FINE, "Sending ping to: ", p); + Pinger pinger = new Pinger(p); + FutureTask<Pong> future = new FutureTask<>(pinger); + + executor.execute(future); + Pong pong; + Throwable logThrowable = null; + + try { + pong = future.get(monitor.getConfiguration().getFailLimit(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + pong = new Pong(); + pong.addError(ErrorMessage + .createUnspecifiedError("Ping was interrupted: " + p)); + logThrowable = e; + } catch (ExecutionException e) { + pong = new Pong(); + pong.addError(ErrorMessage + .createUnspecifiedError("Execution was interrupted: " + p)); + logThrowable = e; + } catch (LinkageError e) { // Typically Osgi woes + pong = new Pong(); + pong.addError(ErrorMessage.createErrorInPluginSearcher("Class loading problem",e)); + logThrowable=e; + } catch (TimeoutException e) { + pong = new Pong(); + pong.addError(ErrorMessage + .createNoAnswerWhenPingingNode("Ping thread timed out.")); + } + future.cancel(true); + + if (pong.badResponse()) { + monitor.failed(p, pong.getError(0)); + log(LogLevel.FINE, "Failed ping - ", pong); + } else { + monitor.responded(p); + log(LogLevel.FINE, "Answered ping - ", p); + } + + if (logThrowable != null) { // This looks strange, but yes - it is + // needed + String logMsg; + if (logThrowable instanceof TimeoutException) { + logMsg = "Ping timed out for " + getId().getName() + "."; + } else { + StackTraceElement[] trace = logThrowable.getStackTrace(); + String traceAsString = null; + if (trace != null) { + StringBuilder b = new StringBuilder(": "); + for (StackTraceElement k : trace) { + if (k == null) { + b.append("null\n"); + } else { + b.append(k.toString()).append('\n'); + } + } + traceAsString = b.toString(); + } + logMsg = "Caught " + logThrowable.getClass().getName() + + " exception in " + getId().getName() + " ping" + + (trace == null ? ", no stack trace available." : traceAsString); + } + getLogger().warning(logMsg); + } + + } + + /** + * Pings this connection. Pings may be sent "out of band" at any time by the + * monitoring subsystem to determine the status of this connection. If the + * ping fails, it is ok both to set an error in the pong or to throw an + * exception. + */ + protected abstract Pong ping(Ping ping, T connection); + + protected T getFirstConnection(NodeList<T> nodes, int code, int trynum, Query query) { + return nodes.select(code, trynum); + } + + @Override + public final Result search(Query query, Execution execution) { + int tries = 0; + + Hasher.NodeList<T> nodes = getHasher().getNodes(); + + if (nodes.getNodeCount() == 0) + return search(query, execution, ErrorMessage + .createNoBackendsInService("No nodes in service in " + this + " (" + monitor.nodeMonitors().size() + + " was configured, none is responding)")); + + int code = query.hashCode(); + Result result; + T connection = getFirstConnection(nodes, code, tries, query); + do { + // The loop is in case there are other searchers available + // able to produce results + if (connection == null) + return search(query, execution, ErrorMessage + .createNoBackendsInService("No in node could handle " + query + " according to " + + hasher + " in " + this)); + if (timedOut(query)) + return new Result(query, ErrorMessage.createTimeout("No time left for searching")); + + if (query.getTraceLevel() >= 8) + query.trace("Trying " + connection, false, 8); + + result = robustSearch(query, execution, connection); + + if (!shouldRetry(query, result)) + return result; + + if (query.getTraceLevel() >= 6) + query.trace("Error from connection " + connection + " : " + result.hits().getError(), false, 6); + + if (result.hits().getError().getCode() == Error.TIMEOUT.code) + return result; // Retry is unlikely to help + + log(LogLevel.FINER, "No result, checking for timeout."); + tries++; + connection = nodes.select(code, tries); + } while (tries < nodes.getNodeCount()); + + // only error result gets returned here. + return result; + + } + + /** + * Returns whether this query and result should be retried against another + * connection if possible. This default implementation returns true if the + * result contains some error. + */ + protected boolean shouldRetry(Query query, Result result) { + return result.hits().getError() != null; + } + + /** + * This is called (instead of search(quer,execution,connextion) to handle + * searches where no (suitable) backend was available. The default + * implementation returns an error result. + */ + protected Result search(Query query, Execution execution, ErrorMessage message) { + return new Result(query, message); + } + + /** + * Call search(Query,Execution,T) and handle any exceptions returned which + * we do not want to propagate upwards By default this catches all runtime + * exceptions and puts them into the result + */ + protected Result robustSearch(Query query, Execution execution, T connection) { + Result result; + try { + result = search(query, execution, connection); + } catch (RuntimeException e) { //TODO: Exceptions should not be used to signal backend communication errors + log(LogLevel.WARNING, "An exception occurred while invoking backend searcher.", e); + result = new Result(query, ErrorMessage + .createBackendCommunicationError("Failed calling " + + connection + " in " + this + " for " + query + + ": " + Exceptions.toMessageString(e))); + } + + if (result == null) + result = new Result(query, ErrorMessage + .createBackendCommunicationError("No result returned in " + + this + " from " + connection + " for " + query)); + + if (result.hits().getError() != null) { + log(LogLevel.FINE, "FAILED: ", query); + } else if (!result.isCached()) { + log(LogLevel.FINE, "WORKING: ", query); + } else { + log(LogLevel.FINE, "CACHE HIT: ", query); + } + return result; + } + + /** + * Perform the search against the given connection. Return a result + * containing an error or throw an exception on failures. + */ + protected abstract Result search(Query query, Execution execution, T connection); + + public @Override + final void fill(Result result, String summaryClass, Execution execution) { + Query query = result.getQuery(); + Hasher.NodeList<T> nodes = getHasher().getNodes(); + int code = query.hashCode(); + + T connection = nodes.select(code, 0); + if (connection != null) { + if (timedOut(query)) { + result.hits().addError( + ErrorMessage.createTimeout( + "No time left to get summaries for " + + result)); + } else { + // query.setTimeout(getNodeTimeout(query)); + doFill(connection, result, summaryClass, execution); + } + } else { + result.hits().addError( + ErrorMessage.createNoBackendsInService("Could not fill '" + + result + "' in '" + this + "'")); + } + } + + private void doFill(T connection, Result result, String summaryClass, Execution execution) { + try { + fill(result, summaryClass, execution, connection); + } catch (RuntimeException e) { + result.hits().addError( + ErrorMessage + .createBackendCommunicationError("Error filling " + + result + " from " + connection + ": " + + Exceptions.toMessageString(e))); + } + if (result.hits().getError() != null) { + log(LogLevel.FINE, "FAILED: ", result.getQuery()); + } else if (!result.isCached()) { + log(LogLevel.FINE, "WORKING: ", result.getQuery()); + } else { + log(LogLevel.FINE, "CACHE HIT: " + result.getQuery()); + } + } + + /** + * Perform the fill against the given connection. Add an error to the result + * or throw an exception on failures. + */ + protected abstract void fill(Result result, String summaryClass, + Execution execution, T connection); + + /** NodeManager method, called from ClusterMonitor */ + public @Override + void working(T node) { + getHasher().add(node); + } + + /** NodeManager method, called from ClusterMonitor */ + public @Override + void failed(T node) { + getHasher().remove(node); + } + + /** + * Returns the hasher used internally in this. Do not mutate this hasher + * while in use. + */ + public Hasher<T> getHasher() { + return hasher; + } + + /** Returns the monitor of these nodes */ + public ClusterMonitor<T> getMonitor() { + return monitor; + } + + /** Returns true if this query has timed out now */ + protected boolean timedOut(Query query) { + long duration = query.getDurationTime(); + return duration >= query.getTimeout(); + } + + protected void log(java.util.logging.Level level, Object... objects) { + if (!getLogger().isLoggable(level)) + return; + StringBuilder sb = new StringBuilder(); + for (Object object : objects) { + sb.append(object); + } + getLogger().log(level, sb.toString()); + } + + public @Override void deconstruct() { + super.deconstruct(); + monitor.shutdown(); + } + + private class Pinger implements Callable<Pong> { + + private T connection; + + public Pinger(T connection) { + this.connection = connection; + } + + public Pong call() { + Pong pong; + try { + pong = ping(new Ping(monitor.getConfiguration().getRequestTimeout()), connection); + } catch (RuntimeException e) { + pong = new Pong(); + pong.addError( + ErrorMessage.createBackendCommunicationError( + "Exception when pinging " + + connection + ": " + + Exceptions.toMessageString(e))); + } + return pong; + } + + } +} |