diff options
author | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
---|---|---|
committer | Jon Bratseth <bratseth@yahoo-inc.com> | 2016-06-15 23:09:44 +0200 |
commit | 72231250ed81e10d66bfe70701e64fa5fe50f712 (patch) | |
tree | 2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-search/src/main/java/com/yahoo/search/cluster |
Publish
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster')
9 files changed, 1051 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java new file mode 100644 index 00000000000..de67369a231 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java @@ -0,0 +1,93 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +import java.util.logging.Logger; + +import com.yahoo.search.result.ErrorMessage; + + +/** + * A node monitor is responsible for maintaining the state of a monitored node. + * It has the following properties: + * <ul> + * <li>A node is taken out of operation if it fails</li> + * <li>A node is put back in operation when it responds correctly again + * <i>responseAfterFailLimit</i> times <b>unless</b> + * it has failed <i>failQuarantineLimit</i>. In the latter case it won't + * be put into operation again before that time period has expired</li> + * </ul> + * + * @author bratseth + */ +public abstract class BaseNodeMonitor<T> { + + protected static Logger log=Logger.getLogger(BaseNodeMonitor.class.getName()); + + /** The object representing the monitored node */ + protected T node; + + protected boolean isWorking=true; + + /** Whether this node is quarantined for unstability */ + protected boolean isQuarantined=false; + + /** The last time this node failed, in ms */ + protected long failedAt=0; + + /** The last time this node responded (failed or succeeded), in ms */ + protected long respondedAt=0; + + /** The last time this node responded successfully */ + protected long succeededAt=0; + + /** The configuration of this monitor */ + protected MonitorConfiguration configuration; + + /** Is the node we monitor part of an internal Vespa cluster or not */ + private boolean internal=false; + + public BaseNodeMonitor(boolean internal) { + this.internal=internal; + } + + public T getNode() { return node; } + + /** + * Returns whether this node is currently in a state suitable + * for receiving traffic. As far as we know, that is + */ + public boolean isWorking() { return isWorking; } + + public boolean isQuarantined() { return isQuarantined; } + + /** + * Called when this node fails. + * + * @param error a description of the error + */ + public abstract void failed(ErrorMessage error); + + /** + * Called when a response is received from this node. If the node was + * quarantined and it has been in that state for more than QuarantineTime + * milliseconds, it is taken out of quarantine. + * + * if it is not in quarantine but is not working, it may be set to working + * if this method is called at least responseAfterFailLimit times + */ + public abstract void responded(); + + public boolean isIdle() { + return (now()-respondedAt) >= configuration.getIdleLimit(); + } + + protected long now() { + return System.currentTimeMillis(); + } + + /** Thread-safely changes the state of this node if required */ + protected abstract void setWorking(boolean working,String explanation); + + /** Returns whether or not this is monitoring an internal node. Default is false. */ + public boolean isInternal() { return internal; } +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java new file mode 100644 index 00000000000..1c50ea5d904 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -0,0 +1,157 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + + +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.search.result.ErrorMessage; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Monitors of a cluster of remote nodes. + * The monitor uses an internal thread for node monitoring. + * All <i>public</i> methods of this class are multithread safe. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ClusterMonitor<T> { + + private MonitorConfiguration configuration=new MonitorConfiguration(); + + private static Logger log=Logger.getLogger(ClusterMonitor.class.getName()); + + private NodeManager<T> nodeManager; + + private MonitorThread monitorThread; + + private volatile boolean shutdown = false; + + /** A map from Node to corresponding MonitoredNode */ + private Map<T,BaseNodeMonitor<T>> nodeMonitors= + Collections.synchronizedMap(new java.util.LinkedHashMap<T, BaseNodeMonitor<T>>()); + + public ClusterMonitor(NodeManager<T> manager, String monitorConfigID) { + nodeManager=manager; + monitorThread=new MonitorThread("search.clustermonitor"); + monitorThread.start(); + log.fine("checkInterval is " + configuration.getCheckInterval()+" ms"); + } + + /** Returns the configuration of this cluster monitor */ + public MonitorConfiguration getConfiguration() { return configuration; } + + /** + * Adds a new node for monitoring. + * The object representing the node must + * <ul> + * <li>Have a sensible toString</li> + * <li>Have a sensible identity (equals and hashCode)</li> + * </ul> + * + * @param node the object representing the node + * @param internal whether or not this node is internal to this cluster + */ + public void add(T node,boolean internal) { + BaseNodeMonitor<T> monitor=new TrafficNodeMonitor<>(node,configuration,internal); + // BaseNodeMonitor monitor=new NodeMonitor(node,configuration); + nodeMonitors.put(node,monitor); + } + + /** + * Returns the monitor of the given node, or null if this node has not been added + */ + public BaseNodeMonitor<T> getNodeMonitor(T node) { + return nodeMonitors.get(node); + } + + /** Called from ClusterSearcher/NodeManager when a node failed */ + public synchronized void failed(T node, ErrorMessage error) { + BaseNodeMonitor<T> monitor=nodeMonitors.get(node); + boolean wasWorking=monitor.isWorking(); + monitor.failed(error); + if (wasWorking && !monitor.isWorking()) { + nodeManager.failed(node); + } + } + + /** Called when a node responded */ + public synchronized void responded(T node) { + BaseNodeMonitor<T> monitor = nodeMonitors.get(node); + boolean wasFailing=!monitor.isWorking(); + monitor.responded(); + if (wasFailing && monitor.isWorking()) { + nodeManager.working(monitor.getNode()); + } + } + + /** + * Ping all nodes which needs pinging to discover state changes + */ + public void ping(Executor executor) { + for (Iterator<BaseNodeMonitor<T>> i=nodeMonitorIterator(); i.hasNext(); ) { + BaseNodeMonitor<T> monitor= i.next(); + // always ping + // if (monitor.isIdle()) + nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded + } + } + + /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ + public Iterator<BaseNodeMonitor<T>> nodeMonitorIterator() { + return nodeMonitors().iterator(); + } + + /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ + public List<BaseNodeMonitor<T>> nodeMonitors() { + synchronized (nodeMonitors) { + return new java.util.ArrayList<>(nodeMonitors.values()); + } + } + + /** Must be called when this goes out of use */ + public void shutdown() { + shutdown = true; + monitorThread.interrupt(); + } + + private class MonitorThread extends Thread { + MonitorThread(String name) { + super(name); + } + + public void run() { + log.fine("Starting cluster monitor thread"); + // Pings must happen in a separate thread from this to handle timeouts + // By using a cached thread pool we ensured that 1) a single thread will be used + // for all pings when there are no problems (important because it ensures that + // any thread local connections are reused) 2) a new thread will be started to execute + // new pings when a ping is not responding + Executor pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); + while (!isInterrupted()) { + try { + Thread.sleep(configuration.getCheckInterval()); + log.finest("Activating ping"); + ping(pingExecutor); + } + catch (Exception e) { + if (shutdown && e instanceof InterruptedException) { + break; + } else { + log.log(Level.WARNING,"Error in monitor thread",e); + } + } + } + log.fine("Stopped cluster monitor thread"); + } + + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java new file mode 100644 index 00000000000..da3d0d8e20b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java @@ -0,0 +1,374 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +import com.yahoo.component.ComponentId; +import com.yahoo.container.protect.Error; +import com.yahoo.log.LogLevel; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.yolean.Exceptions; +import com.yahoo.search.Query; +import com.yahoo.search.Result; +import com.yahoo.search.cluster.Hasher.NodeList; +import com.yahoo.search.result.ErrorMessage; +import com.yahoo.search.searchchain.Execution; + +import java.util.List; +import java.util.concurrent.*; + +/** + * Implements clustering (failover and load balancing) over a set of client + * connections to a homogenous cluster of nodes. Searchers which wants to make + * clustered connections to some service should use this. + * <p> + * This replaces the usual searcher methods by ones which have the same contract + * and semantics but which takes an additional parameter which is the Connection + * selected by the cluster searcher which the method should use. Overrides of + * these connection methods <i>must not</i> call the super methods to pass on + * but must use the methods on execution. + * <p> + * The type argument is the class (of any type) representing the connections. + * The connection objects should implement a good toString to ease diagnostics. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a> + */ +public abstract class ClusterSearcher<T> extends PingableSearcher implements NodeManager<T> { + + private Hasher<T> hasher = new Hasher<>(); + private ClusterMonitor<T> monitor = new ClusterMonitor<>(this, "dummy"); + + /** + * Creates a new cluster searcher + * + * @param id + * the id of this searcher + * @param connections + * the connections of the cluster + * @param internal + * whether or not this cluster is internal (part of the same + * installation) + */ + public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) { + this(id, connections, new Hasher<T>(), internal); + } + + public ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal) { + super(id); + this.hasher = hasher; + for (T connection : connections) { + monitor.add(connection, internal); + hasher.add(connection); + } + } + + /** + * Pinging a node by sending a query NodeManager method, called from + * ClusterMonitor + */ + public final @Override void ping(T p, Executor executor) { + log(LogLevel.FINE, "Sending ping to: ", p); + Pinger pinger = new Pinger(p); + FutureTask<Pong> future = new FutureTask<>(pinger); + + executor.execute(future); + Pong pong; + Throwable logThrowable = null; + + try { + pong = future.get(monitor.getConfiguration().getFailLimit(), + TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + pong = new Pong(); + pong.addError(ErrorMessage + .createUnspecifiedError("Ping was interrupted: " + p)); + logThrowable = e; + } catch (ExecutionException e) { + pong = new Pong(); + pong.addError(ErrorMessage + .createUnspecifiedError("Execution was interrupted: " + p)); + logThrowable = e; + } catch (LinkageError e) { // Typically Osgi woes + pong = new Pong(); + pong.addError(ErrorMessage.createErrorInPluginSearcher("Class loading problem",e)); + logThrowable=e; + } catch (TimeoutException e) { + pong = new Pong(); + pong.addError(ErrorMessage + .createNoAnswerWhenPingingNode("Ping thread timed out.")); + } + future.cancel(true); + + if (pong.badResponse()) { + monitor.failed(p, pong.getError(0)); + log(LogLevel.FINE, "Failed ping - ", pong); + } else { + monitor.responded(p); + log(LogLevel.FINE, "Answered ping - ", p); + } + + if (logThrowable != null) { // This looks strange, but yes - it is + // needed + String logMsg; + if (logThrowable instanceof TimeoutException) { + logMsg = "Ping timed out for " + getId().getName() + "."; + } else { + StackTraceElement[] trace = logThrowable.getStackTrace(); + String traceAsString = null; + if (trace != null) { + StringBuilder b = new StringBuilder(": "); + for (StackTraceElement k : trace) { + if (k == null) { + b.append("null\n"); + } else { + b.append(k.toString()).append('\n'); + } + } + traceAsString = b.toString(); + } + logMsg = "Caught " + logThrowable.getClass().getName() + + " exception in " + getId().getName() + " ping" + + (trace == null ? ", no stack trace available." : traceAsString); + } + getLogger().warning(logMsg); + } + + } + + /** + * Pings this connection. Pings may be sent "out of band" at any time by the + * monitoring subsystem to determine the status of this connection. If the + * ping fails, it is ok both to set an error in the pong or to throw an + * exception. + */ + protected abstract Pong ping(Ping ping, T connection); + + protected T getFirstConnection(NodeList<T> nodes, int code, int trynum, Query query) { + return nodes.select(code, trynum); + } + + @Override + public final Result search(Query query, Execution execution) { + int tries = 0; + + Hasher.NodeList<T> nodes = getHasher().getNodes(); + + if (nodes.getNodeCount() == 0) + return search(query, execution, ErrorMessage + .createNoBackendsInService("No nodes in service in " + this + " (" + monitor.nodeMonitors().size() + + " was configured, none is responding)")); + + int code = query.hashCode(); + Result result; + T connection = getFirstConnection(nodes, code, tries, query); + do { + // The loop is in case there are other searchers available + // able to produce results + if (connection == null) + return search(query, execution, ErrorMessage + .createNoBackendsInService("No in node could handle " + query + " according to " + + hasher + " in " + this)); + if (timedOut(query)) + return new Result(query, ErrorMessage.createTimeout("No time left for searching")); + + if (query.getTraceLevel() >= 8) + query.trace("Trying " + connection, false, 8); + + result = robustSearch(query, execution, connection); + + if (!shouldRetry(query, result)) + return result; + + if (query.getTraceLevel() >= 6) + query.trace("Error from connection " + connection + " : " + result.hits().getError(), false, 6); + + if (result.hits().getError().getCode() == Error.TIMEOUT.code) + return result; // Retry is unlikely to help + + log(LogLevel.FINER, "No result, checking for timeout."); + tries++; + connection = nodes.select(code, tries); + } while (tries < nodes.getNodeCount()); + + // only error result gets returned here. + return result; + + } + + /** + * Returns whether this query and result should be retried against another + * connection if possible. This default implementation returns true if the + * result contains some error. + */ + protected boolean shouldRetry(Query query, Result result) { + return result.hits().getError() != null; + } + + /** + * This is called (instead of search(quer,execution,connextion) to handle + * searches where no (suitable) backend was available. The default + * implementation returns an error result. + */ + protected Result search(Query query, Execution execution, ErrorMessage message) { + return new Result(query, message); + } + + /** + * Call search(Query,Execution,T) and handle any exceptions returned which + * we do not want to propagate upwards By default this catches all runtime + * exceptions and puts them into the result + */ + protected Result robustSearch(Query query, Execution execution, T connection) { + Result result; + try { + result = search(query, execution, connection); + } catch (RuntimeException e) { //TODO: Exceptions should not be used to signal backend communication errors + log(LogLevel.WARNING, "An exception occurred while invoking backend searcher.", e); + result = new Result(query, ErrorMessage + .createBackendCommunicationError("Failed calling " + + connection + " in " + this + " for " + query + + ": " + Exceptions.toMessageString(e))); + } + + if (result == null) + result = new Result(query, ErrorMessage + .createBackendCommunicationError("No result returned in " + + this + " from " + connection + " for " + query)); + + if (result.hits().getError() != null) { + log(LogLevel.FINE, "FAILED: ", query); + } else if (!result.isCached()) { + log(LogLevel.FINE, "WORKING: ", query); + } else { + log(LogLevel.FINE, "CACHE HIT: ", query); + } + return result; + } + + /** + * Perform the search against the given connection. Return a result + * containing an error or throw an exception on failures. + */ + protected abstract Result search(Query query, Execution execution, T connection); + + public @Override + final void fill(Result result, String summaryClass, Execution execution) { + Query query = result.getQuery(); + Hasher.NodeList<T> nodes = getHasher().getNodes(); + int code = query.hashCode(); + + T connection = nodes.select(code, 0); + if (connection != null) { + if (timedOut(query)) { + result.hits().addError( + ErrorMessage.createTimeout( + "No time left to get summaries for " + + result)); + } else { + // query.setTimeout(getNodeTimeout(query)); + doFill(connection, result, summaryClass, execution); + } + } else { + result.hits().addError( + ErrorMessage.createNoBackendsInService("Could not fill '" + + result + "' in '" + this + "'")); + } + } + + private void doFill(T connection, Result result, String summaryClass, Execution execution) { + try { + fill(result, summaryClass, execution, connection); + } catch (RuntimeException e) { + result.hits().addError( + ErrorMessage + .createBackendCommunicationError("Error filling " + + result + " from " + connection + ": " + + Exceptions.toMessageString(e))); + } + if (result.hits().getError() != null) { + log(LogLevel.FINE, "FAILED: ", result.getQuery()); + } else if (!result.isCached()) { + log(LogLevel.FINE, "WORKING: ", result.getQuery()); + } else { + log(LogLevel.FINE, "CACHE HIT: " + result.getQuery()); + } + } + + /** + * Perform the fill against the given connection. Add an error to the result + * or throw an exception on failures. + */ + protected abstract void fill(Result result, String summaryClass, + Execution execution, T connection); + + /** NodeManager method, called from ClusterMonitor */ + public @Override + void working(T node) { + getHasher().add(node); + } + + /** NodeManager method, called from ClusterMonitor */ + public @Override + void failed(T node) { + getHasher().remove(node); + } + + /** + * Returns the hasher used internally in this. Do not mutate this hasher + * while in use. + */ + public Hasher<T> getHasher() { + return hasher; + } + + /** Returns the monitor of these nodes */ + public ClusterMonitor<T> getMonitor() { + return monitor; + } + + /** Returns true if this query has timed out now */ + protected boolean timedOut(Query query) { + long duration = query.getDurationTime(); + return duration >= query.getTimeout(); + } + + protected void log(java.util.logging.Level level, Object... objects) { + if (!getLogger().isLoggable(level)) + return; + StringBuilder sb = new StringBuilder(); + for (Object object : objects) { + sb.append(object); + } + getLogger().log(level, sb.toString()); + } + + public @Override void deconstruct() { + super.deconstruct(); + monitor.shutdown(); + } + + private class Pinger implements Callable<Pong> { + + private T connection; + + public Pinger(T connection) { + this.connection = connection; + } + + public Pong call() { + Pong pong; + try { + pong = ping(new Ping(monitor.getConfiguration().getRequestTimeout()), connection); + } catch (RuntimeException e) { + pong = new Pong(); + pong.addError( + ErrorMessage.createBackendCommunicationError( + "Exception when pinging " + + connection + ": " + + Exceptions.toMessageString(e))); + } + return pong; + } + + } +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/Hasher.java b/container-search/src/main/java/com/yahoo/search/cluster/Hasher.java new file mode 100644 index 00000000000..7ef71a7968d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/Hasher.java @@ -0,0 +1,130 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +/** + * A hasher load balances between a set of nodes, represented by object ids. + * + * @author Arne B Fossaa + * @author bratseth + * @author Prashanth B. Bhat + */ +public class Hasher<T> { + + public static class NodeFactor<T> { + private final T node; + /** + * The relative weight of the different nodes. + * Hashing are based on the proportions of the weights. + */ + private final int load; + public NodeFactor(T node, int load) { + this.node = node; + this.load = load; + } + public final T getNode() { return node; } + public final int getLoad() { return load; } + } + + public static class NodeList<T> { + private final NodeFactor<T>[] nodes; + + private int totalLoadFactor; + + public NodeList(NodeFactor<T>[] nodes) { + this.nodes = nodes; + totalLoadFactor = 0; + if(nodes != null) { + for(NodeFactor<T> node:nodes) { + totalLoadFactor += node.getLoad(); + } + } + } + + public int getNodeCount() { + return nodes.length; + } + + public T select(int code, int trynum) { + if (totalLoadFactor <= 0) return null; + + // Multiply by a prime number much bigger than the likely number of hosts + int hashValue=(Math.abs(code*76103)) % totalLoadFactor; + int sumLoad=0; + int targetNode=0; + for (targetNode=0; targetNode<nodes.length; targetNode++) { + sumLoad +=nodes[targetNode].getLoad(); + if (sumLoad > hashValue) + break; + } + // Skip the ones we have tried before. + targetNode += trynum; + targetNode %= nodes.length; + return nodes[targetNode].getNode(); + } + + public boolean hasNode(T node) { + for(int i = 0;i<nodes.length;i++) { + if(node == nodes[i].getNode()) { + return true; + } + } + return false; + } + + } + + private volatile NodeList<T> nodes; + + @SuppressWarnings("unchecked") + public Hasher() { + this.nodes = new NodeList<T>(new NodeFactor[0]); + } + + /** Adds a node with load factor 100 */ + public void add(T node) { + add(node,100); + } + + /** + * Adds a code with a load factor. + * The load factor is relative to the load of the other added nodes + * and determines how often this node will be selected compared + * to the other nodes + */ + public synchronized void add(T node,int load) { + assert(nodes != null); + if(!nodes.hasNode(node)) { + NodeFactor<T>[] oldNodes = nodes.nodes; + @SuppressWarnings("unchecked") + NodeFactor<T>[] newNodes = (NodeFactor<T>[]) new NodeFactor[oldNodes.length+ 1]; + System.arraycopy(oldNodes,0,newNodes,0,oldNodes.length); + newNodes[newNodes.length-1] = new NodeFactor<>(node, load); + + //Atomic switch due to volatile + nodes = new NodeList<>(newNodes); + } + } + + /** Removes a node */ + public synchronized void remove(T node) { + if( nodes.hasNode(node)) { + NodeFactor<T>[] oldNodes = nodes.nodes; + @SuppressWarnings("unchecked") + NodeFactor<T>[] newNodes = (NodeFactor<T>[]) new NodeFactor[oldNodes.length - 1]; + for (int i = 0, j = 0; i < oldNodes.length; i++) { + if (oldNodes[i].getNode() != node) { + newNodes[j++] = oldNodes[i]; + } + } + // An atomic switch due to volatile. + nodes = new NodeList<>(newNodes); + } + } + + /** + * Returns a list of nodes that are up. + */ + public NodeList<T> getNodes() { + return nodes; + } +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java new file mode 100644 index 00000000000..c68b60a743b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java @@ -0,0 +1,140 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +/** + * The configuration of a cluster monitor instance + * + * @author bratseth + */ +public class MonitorConfiguration { + + /** + * The interval in ms between consecutive checks of the monitored + * nodes + */ + private long checkInterval=1000; + + /** + * The number of times a failed node must respond before getting + * traffic again + */ + private int responseAfterFailLimit=3; + + /** + * The number of ms a node is allowed to stay idle before it is + * pinged + */ + private long idleLimit=3000; + + /** + * The number of milliseconds to attempt to complete a request + * before giving up + */ + private long requestTimeout = 5000; + + /** + * The number of milliseconds a node is allowed to fail before we + * mark it as not working + */ + private long failLimit=5000; + + /** + * The number of times a node is allowed to fail in one hour + * before it is quarantined for an hour + */ + private int failQuarantineLimit=3; + + /** + * The number of ms to quarantine an unstable node + */ + private long quarantineTime=1000*60*60; + + /** + * Sets the interval between each ping of idle or failing nodes + * Default is 1000ms + */ + public void setCheckInterval(long intervalMs) { + this.checkInterval=intervalMs; + } + + /** + * Returns the interval between each ping of idle or failing nodes + * Default is 1000ms + */ + public long getCheckInterval() { + return checkInterval; + } + + /** + * Sets the number of times a failed node must respond before it is put + * back in service. Default is 3. + */ + public void setResponseAfterFailLimit(int responseAfterFailLimit) { + this.responseAfterFailLimit=responseAfterFailLimit; + } + + /** + * Sets the number of ms a node (failing or working) is allowed to + * stay idle before it is pinged. Default is 3000 + */ + public void setIdleLimit(int idleLimit) { + this.idleLimit=idleLimit; + } + + /** + * Gets the number of ms a node (failing or working) + * is allowed to stay idle before it is pinged. Default is 3000 + */ + public long getIdleLimit() { + return idleLimit; + } + + /** + * Returns the number of milliseconds to attempt to service a request + * (at different nodes) before giving up. Default is 5000 ms. + */ + public long getRequestTimeout() { return requestTimeout; } + + /** + * Sets the number of milliseconds a node is allowed to fail before we + * mark it as not working + */ + public void setFailLimit(long failLimit) { this.failLimit=failLimit; } + + /** + * Returns the number of milliseconds a node is allowed to fail before we + * mark it as not working + */ + public long getFailLimit() { return failLimit; } + + /** + * The number of times a node must fail in one hour to be placed + * in quarantine. Once in quarantine it won't be put back in + * productuion before quarantineTime has expired even if it is + * working. Default is 3 + */ + public void setFailQuarantineLimit(int failQuarantineLimit) { + this.failQuarantineLimit=failQuarantineLimit; + } + + /** + * The number of ms an unstable node is quarantined. Default is + * 100*60*60 + */ + public void setQuarantineTime(long quarantineTime) { + this.quarantineTime=quarantineTime; + } + + public String toString() { + return "monitor configuration [" + + "checkInterval: " + checkInterval + + " responseAfterFailLimit: " + responseAfterFailLimit + + " idleLimit: " + idleLimit + + " requestTimeout " + requestTimeout + + " feilLimit " + failLimit + + " failQuerantineLimit " + failQuarantineLimit + + " quarantineTime " + quarantineTime + + "]"; + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java new file mode 100644 index 00000000000..7071867c8c7 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java @@ -0,0 +1,23 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +import java.util.concurrent.Executor; + +/** + * Must be implemented by a node collection which wants + * it's node state monitored by a ClusterMonitor + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a> + */ +public interface NodeManager<T> { + + /** Called when a failed node is working (ready for production) again */ + public void working(T node); + + /** Called when a working node fails */ + public void failed(T node); + + /** Called when a node should be pinged */ + public void ping(T node, Executor executor); + +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java new file mode 100644 index 00000000000..486473eba8d --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java @@ -0,0 +1,29 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +import com.yahoo.component.ComponentId; +import com.yahoo.prelude.Ping; +import com.yahoo.prelude.Pong; +import com.yahoo.search.Searcher; +import com.yahoo.search.searchchain.Execution; + +/** + * A searcher to which we can send a ping to probe if it is alive + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public abstract class PingableSearcher extends Searcher { + + public PingableSearcher() { + } + + public PingableSearcher(ComponentId id) { + super(id); + } + + /** Send a ping request downwards to probe if this searcher chain is in functioning order */ + public Pong ping(Ping ping, Execution execution) { + return execution.ping(ping); + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java new file mode 100644 index 00000000000..6464f0101be --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java @@ -0,0 +1,93 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + +import com.yahoo.search.result.ErrorMessage; + + +/** + * This node monitor is responsible for maintaining the state of a monitored node. + * It has the following properties: + * <ul> + * <li>A node is taken out of operation if it gives no response in 10 s</li> + * <li>A node is put back in operation when it responds correctly again + * </ul> + * + * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a> + */ +public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> { + /** + * Creates a new node monitor for a node + */ + public TrafficNodeMonitor(T node,MonitorConfiguration configuration,boolean internal) { + super(internal); + this.node=node; + this.configuration=configuration; + } + + /** Whether or not this has ever responded successfully */ + private boolean atStartUp = true; + + public T getNode() { return node; } + + /** + * Called when this node fails. + * + * @param error A container which should contain a short description + */ + @Override + public void failed(ErrorMessage error) { + respondedAt=now(); + + switch (error.getCode()) { + // TODO: Remove hard coded error messages. + // Refer to docs/errormessages + case 10: + case 11: + // Only count not being able to talk to backend at all + // as errors we care about + if ((respondedAt-succeededAt) > 10000) { + setWorking(false,"Not working for 10 s: " + error.toString()); + } + break; + default: + succeededAt = respondedAt; + break; + } + } + + /** + * Called when a response is received from this node. + */ + public void responded() { + respondedAt=now(); + succeededAt=respondedAt; + atStartUp = false; + + if (!isWorking) { + setWorking(true,"Responds correctly"); + } + } + + /** Thread-safely changes the state of this node if required */ + protected synchronized void setWorking(boolean working,String explanation) { + if (this.isWorking==working) return; // Old news + + if (explanation==null) { + explanation=""; + } else { + explanation=": " + explanation; + } + + if (working) { + log.info("Putting " + node + " in service" + explanation); + } + else { + if (!atStartUp || !isInternal()) + log.warning("Taking " + node + " out of service" + explanation); + failedAt=now(); + } + + this.isWorking=working; + } + +} diff --git a/container-search/src/main/java/com/yahoo/search/cluster/package-info.java b/container-search/src/main/java/com/yahoo/search/cluster/package-info.java new file mode 100644 index 00000000000..b470d8c8150 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/package-info.java @@ -0,0 +1,12 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +/** + * Standard searchers to compose in <i>source</i> search chains (those containing searchers specific for one source and + * which ends with a call to some provider) which calls a cluster of provider nodes. These searchers provides hashing + * and failover of the provider nodes. + */ +@ExportPackage +@PublicApi +package com.yahoo.search.cluster; + +import com.yahoo.api.annotations.PublicApi; +import com.yahoo.osgi.annotation.ExportPackage; |