aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/cluster
diff options
context:
space:
mode:
authorJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
committerJon Bratseth <bratseth@yahoo-inc.com>2016-06-15 23:09:44 +0200
commit72231250ed81e10d66bfe70701e64fa5fe50f712 (patch)
tree2728bba1131a6f6e5bdf95afec7d7ff9358dac50 /container-search/src/main/java/com/yahoo/search/cluster
Publish
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster')
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java93
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java157
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java374
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/Hasher.java130
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java140
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java23
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java29
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java93
-rw-r--r--container-search/src/main/java/com/yahoo/search/cluster/package-info.java12
9 files changed, 1051 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
new file mode 100644
index 00000000000..de67369a231
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/BaseNodeMonitor.java
@@ -0,0 +1,93 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+import java.util.logging.Logger;
+
+import com.yahoo.search.result.ErrorMessage;
+
+
+/**
+ * A node monitor is responsible for maintaining the state of a monitored node.
+ * It has the following properties:
+ * <ul>
+ * <li>A node is taken out of operation if it fails</li>
+ * <li>A node is put back in operation when it responds correctly again
+ * <i>responseAfterFailLimit</i> times <b>unless</b>
+ * it has failed <i>failQuarantineLimit</i>. In the latter case it won't
+ * be put into operation again before that time period has expired</li>
+ * </ul>
+ *
+ * @author bratseth
+ */
+public abstract class BaseNodeMonitor<T> {
+
+ protected static Logger log=Logger.getLogger(BaseNodeMonitor.class.getName());
+
+ /** The object representing the monitored node */
+ protected T node;
+
+ protected boolean isWorking=true;
+
+ /** Whether this node is quarantined for unstability */
+ protected boolean isQuarantined=false;
+
+ /** The last time this node failed, in ms */
+ protected long failedAt=0;
+
+ /** The last time this node responded (failed or succeeded), in ms */
+ protected long respondedAt=0;
+
+ /** The last time this node responded successfully */
+ protected long succeededAt=0;
+
+ /** The configuration of this monitor */
+ protected MonitorConfiguration configuration;
+
+ /** Is the node we monitor part of an internal Vespa cluster or not */
+ private boolean internal=false;
+
+ public BaseNodeMonitor(boolean internal) {
+ this.internal=internal;
+ }
+
+ public T getNode() { return node; }
+
+ /**
+ * Returns whether this node is currently in a state suitable
+ * for receiving traffic. As far as we know, that is
+ */
+ public boolean isWorking() { return isWorking; }
+
+ public boolean isQuarantined() { return isQuarantined; }
+
+ /**
+ * Called when this node fails.
+ *
+ * @param error a description of the error
+ */
+ public abstract void failed(ErrorMessage error);
+
+ /**
+ * Called when a response is received from this node. If the node was
+ * quarantined and it has been in that state for more than QuarantineTime
+ * milliseconds, it is taken out of quarantine.
+ *
+ * if it is not in quarantine but is not working, it may be set to working
+ * if this method is called at least responseAfterFailLimit times
+ */
+ public abstract void responded();
+
+ public boolean isIdle() {
+ return (now()-respondedAt) >= configuration.getIdleLimit();
+ }
+
+ protected long now() {
+ return System.currentTimeMillis();
+ }
+
+ /** Thread-safely changes the state of this node if required */
+ protected abstract void setWorking(boolean working,String explanation);
+
+ /** Returns whether or not this is monitoring an internal node. Default is false. */
+ public boolean isInternal() { return internal; }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
new file mode 100644
index 00000000000..1c50ea5d904
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java
@@ -0,0 +1,157 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+
+import com.yahoo.concurrent.DaemonThreadFactory;
+import com.yahoo.concurrent.ThreadFactoryFactory;
+import com.yahoo.search.result.ErrorMessage;
+
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Monitors of a cluster of remote nodes.
+ * The monitor uses an internal thread for node monitoring.
+ * All <i>public</i> methods of this class are multithread safe.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public class ClusterMonitor<T> {
+
+ private MonitorConfiguration configuration=new MonitorConfiguration();
+
+ private static Logger log=Logger.getLogger(ClusterMonitor.class.getName());
+
+ private NodeManager<T> nodeManager;
+
+ private MonitorThread monitorThread;
+
+ private volatile boolean shutdown = false;
+
+ /** A map from Node to corresponding MonitoredNode */
+ private Map<T,BaseNodeMonitor<T>> nodeMonitors=
+ Collections.synchronizedMap(new java.util.LinkedHashMap<T, BaseNodeMonitor<T>>());
+
+ public ClusterMonitor(NodeManager<T> manager, String monitorConfigID) {
+ nodeManager=manager;
+ monitorThread=new MonitorThread("search.clustermonitor");
+ monitorThread.start();
+ log.fine("checkInterval is " + configuration.getCheckInterval()+" ms");
+ }
+
+ /** Returns the configuration of this cluster monitor */
+ public MonitorConfiguration getConfiguration() { return configuration; }
+
+ /**
+ * Adds a new node for monitoring.
+ * The object representing the node must
+ * <ul>
+ * <li>Have a sensible toString</li>
+ * <li>Have a sensible identity (equals and hashCode)</li>
+ * </ul>
+ *
+ * @param node the object representing the node
+ * @param internal whether or not this node is internal to this cluster
+ */
+ public void add(T node,boolean internal) {
+ BaseNodeMonitor<T> monitor=new TrafficNodeMonitor<>(node,configuration,internal);
+ // BaseNodeMonitor monitor=new NodeMonitor(node,configuration);
+ nodeMonitors.put(node,monitor);
+ }
+
+ /**
+ * Returns the monitor of the given node, or null if this node has not been added
+ */
+ public BaseNodeMonitor<T> getNodeMonitor(T node) {
+ return nodeMonitors.get(node);
+ }
+
+ /** Called from ClusterSearcher/NodeManager when a node failed */
+ public synchronized void failed(T node, ErrorMessage error) {
+ BaseNodeMonitor<T> monitor=nodeMonitors.get(node);
+ boolean wasWorking=monitor.isWorking();
+ monitor.failed(error);
+ if (wasWorking && !monitor.isWorking()) {
+ nodeManager.failed(node);
+ }
+ }
+
+ /** Called when a node responded */
+ public synchronized void responded(T node) {
+ BaseNodeMonitor<T> monitor = nodeMonitors.get(node);
+ boolean wasFailing=!monitor.isWorking();
+ monitor.responded();
+ if (wasFailing && monitor.isWorking()) {
+ nodeManager.working(monitor.getNode());
+ }
+ }
+
+ /**
+ * Ping all nodes which needs pinging to discover state changes
+ */
+ public void ping(Executor executor) {
+ for (Iterator<BaseNodeMonitor<T>> i=nodeMonitorIterator(); i.hasNext(); ) {
+ BaseNodeMonitor<T> monitor= i.next();
+ // always ping
+ // if (monitor.isIdle())
+ nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded
+ }
+ }
+
+ /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
+ public Iterator<BaseNodeMonitor<T>> nodeMonitorIterator() {
+ return nodeMonitors().iterator();
+ }
+
+ /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
+ public List<BaseNodeMonitor<T>> nodeMonitors() {
+ synchronized (nodeMonitors) {
+ return new java.util.ArrayList<>(nodeMonitors.values());
+ }
+ }
+
+ /** Must be called when this goes out of use */
+ public void shutdown() {
+ shutdown = true;
+ monitorThread.interrupt();
+ }
+
+ private class MonitorThread extends Thread {
+ MonitorThread(String name) {
+ super(name);
+ }
+
+ public void run() {
+ log.fine("Starting cluster monitor thread");
+ // Pings must happen in a separate thread from this to handle timeouts
+ // By using a cached thread pool we ensured that 1) a single thread will be used
+ // for all pings when there are no problems (important because it ensures that
+ // any thread local connections are reused) 2) a new thread will be started to execute
+ // new pings when a ping is not responding
+ Executor pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
+ while (!isInterrupted()) {
+ try {
+ Thread.sleep(configuration.getCheckInterval());
+ log.finest("Activating ping");
+ ping(pingExecutor);
+ }
+ catch (Exception e) {
+ if (shutdown && e instanceof InterruptedException) {
+ break;
+ } else {
+ log.log(Level.WARNING,"Error in monitor thread",e);
+ }
+ }
+ }
+ log.fine("Stopped cluster monitor thread");
+ }
+
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
new file mode 100644
index 00000000000..da3d0d8e20b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterSearcher.java
@@ -0,0 +1,374 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.container.protect.Error;
+import com.yahoo.log.LogLevel;
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
+import com.yahoo.yolean.Exceptions;
+import com.yahoo.search.Query;
+import com.yahoo.search.Result;
+import com.yahoo.search.cluster.Hasher.NodeList;
+import com.yahoo.search.result.ErrorMessage;
+import com.yahoo.search.searchchain.Execution;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * Implements clustering (failover and load balancing) over a set of client
+ * connections to a homogenous cluster of nodes. Searchers which wants to make
+ * clustered connections to some service should use this.
+ * <p>
+ * This replaces the usual searcher methods by ones which have the same contract
+ * and semantics but which takes an additional parameter which is the Connection
+ * selected by the cluster searcher which the method should use. Overrides of
+ * these connection methods <i>must not</i> call the super methods to pass on
+ * but must use the methods on execution.
+ * <p>
+ * The type argument is the class (of any type) representing the connections.
+ * The connection objects should implement a good toString to ease diagnostics.
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ * @author <a href="mailto:arnebef@yahoo-inc.com">Arne Bergene Fossaa</a>
+ */
+public abstract class ClusterSearcher<T> extends PingableSearcher implements NodeManager<T> {
+
+ private Hasher<T> hasher = new Hasher<>();
+ private ClusterMonitor<T> monitor = new ClusterMonitor<>(this, "dummy");
+
+ /**
+ * Creates a new cluster searcher
+ *
+ * @param id
+ * the id of this searcher
+ * @param connections
+ * the connections of the cluster
+ * @param internal
+ * whether or not this cluster is internal (part of the same
+ * installation)
+ */
+ public ClusterSearcher(ComponentId id, List<T> connections, boolean internal) {
+ this(id, connections, new Hasher<T>(), internal);
+ }
+
+ public ClusterSearcher(ComponentId id, List<T> connections, Hasher<T> hasher, boolean internal) {
+ super(id);
+ this.hasher = hasher;
+ for (T connection : connections) {
+ monitor.add(connection, internal);
+ hasher.add(connection);
+ }
+ }
+
+ /**
+ * Pinging a node by sending a query NodeManager method, called from
+ * ClusterMonitor
+ */
+ public final @Override void ping(T p, Executor executor) {
+ log(LogLevel.FINE, "Sending ping to: ", p);
+ Pinger pinger = new Pinger(p);
+ FutureTask<Pong> future = new FutureTask<>(pinger);
+
+ executor.execute(future);
+ Pong pong;
+ Throwable logThrowable = null;
+
+ try {
+ pong = future.get(monitor.getConfiguration().getFailLimit(),
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage
+ .createUnspecifiedError("Ping was interrupted: " + p));
+ logThrowable = e;
+ } catch (ExecutionException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage
+ .createUnspecifiedError("Execution was interrupted: " + p));
+ logThrowable = e;
+ } catch (LinkageError e) { // Typically Osgi woes
+ pong = new Pong();
+ pong.addError(ErrorMessage.createErrorInPluginSearcher("Class loading problem",e));
+ logThrowable=e;
+ } catch (TimeoutException e) {
+ pong = new Pong();
+ pong.addError(ErrorMessage
+ .createNoAnswerWhenPingingNode("Ping thread timed out."));
+ }
+ future.cancel(true);
+
+ if (pong.badResponse()) {
+ monitor.failed(p, pong.getError(0));
+ log(LogLevel.FINE, "Failed ping - ", pong);
+ } else {
+ monitor.responded(p);
+ log(LogLevel.FINE, "Answered ping - ", p);
+ }
+
+ if (logThrowable != null) { // This looks strange, but yes - it is
+ // needed
+ String logMsg;
+ if (logThrowable instanceof TimeoutException) {
+ logMsg = "Ping timed out for " + getId().getName() + ".";
+ } else {
+ StackTraceElement[] trace = logThrowable.getStackTrace();
+ String traceAsString = null;
+ if (trace != null) {
+ StringBuilder b = new StringBuilder(": ");
+ for (StackTraceElement k : trace) {
+ if (k == null) {
+ b.append("null\n");
+ } else {
+ b.append(k.toString()).append('\n');
+ }
+ }
+ traceAsString = b.toString();
+ }
+ logMsg = "Caught " + logThrowable.getClass().getName()
+ + " exception in " + getId().getName() + " ping"
+ + (trace == null ? ", no stack trace available." : traceAsString);
+ }
+ getLogger().warning(logMsg);
+ }
+
+ }
+
+ /**
+ * Pings this connection. Pings may be sent "out of band" at any time by the
+ * monitoring subsystem to determine the status of this connection. If the
+ * ping fails, it is ok both to set an error in the pong or to throw an
+ * exception.
+ */
+ protected abstract Pong ping(Ping ping, T connection);
+
+ protected T getFirstConnection(NodeList<T> nodes, int code, int trynum, Query query) {
+ return nodes.select(code, trynum);
+ }
+
+ @Override
+ public final Result search(Query query, Execution execution) {
+ int tries = 0;
+
+ Hasher.NodeList<T> nodes = getHasher().getNodes();
+
+ if (nodes.getNodeCount() == 0)
+ return search(query, execution, ErrorMessage
+ .createNoBackendsInService("No nodes in service in " + this + " (" + monitor.nodeMonitors().size()
+ + " was configured, none is responding)"));
+
+ int code = query.hashCode();
+ Result result;
+ T connection = getFirstConnection(nodes, code, tries, query);
+ do {
+ // The loop is in case there are other searchers available
+ // able to produce results
+ if (connection == null)
+ return search(query, execution, ErrorMessage
+ .createNoBackendsInService("No in node could handle " + query + " according to " +
+ hasher + " in " + this));
+ if (timedOut(query))
+ return new Result(query, ErrorMessage.createTimeout("No time left for searching"));
+
+ if (query.getTraceLevel() >= 8)
+ query.trace("Trying " + connection, false, 8);
+
+ result = robustSearch(query, execution, connection);
+
+ if (!shouldRetry(query, result))
+ return result;
+
+ if (query.getTraceLevel() >= 6)
+ query.trace("Error from connection " + connection + " : " + result.hits().getError(), false, 6);
+
+ if (result.hits().getError().getCode() == Error.TIMEOUT.code)
+ return result; // Retry is unlikely to help
+
+ log(LogLevel.FINER, "No result, checking for timeout.");
+ tries++;
+ connection = nodes.select(code, tries);
+ } while (tries < nodes.getNodeCount());
+
+ // only error result gets returned here.
+ return result;
+
+ }
+
+ /**
+ * Returns whether this query and result should be retried against another
+ * connection if possible. This default implementation returns true if the
+ * result contains some error.
+ */
+ protected boolean shouldRetry(Query query, Result result) {
+ return result.hits().getError() != null;
+ }
+
+ /**
+ * This is called (instead of search(quer,execution,connextion) to handle
+ * searches where no (suitable) backend was available. The default
+ * implementation returns an error result.
+ */
+ protected Result search(Query query, Execution execution, ErrorMessage message) {
+ return new Result(query, message);
+ }
+
+ /**
+ * Call search(Query,Execution,T) and handle any exceptions returned which
+ * we do not want to propagate upwards By default this catches all runtime
+ * exceptions and puts them into the result
+ */
+ protected Result robustSearch(Query query, Execution execution, T connection) {
+ Result result;
+ try {
+ result = search(query, execution, connection);
+ } catch (RuntimeException e) { //TODO: Exceptions should not be used to signal backend communication errors
+ log(LogLevel.WARNING, "An exception occurred while invoking backend searcher.", e);
+ result = new Result(query, ErrorMessage
+ .createBackendCommunicationError("Failed calling "
+ + connection + " in " + this + " for " + query
+ + ": " + Exceptions.toMessageString(e)));
+ }
+
+ if (result == null)
+ result = new Result(query, ErrorMessage
+ .createBackendCommunicationError("No result returned in "
+ + this + " from " + connection + " for " + query));
+
+ if (result.hits().getError() != null) {
+ log(LogLevel.FINE, "FAILED: ", query);
+ } else if (!result.isCached()) {
+ log(LogLevel.FINE, "WORKING: ", query);
+ } else {
+ log(LogLevel.FINE, "CACHE HIT: ", query);
+ }
+ return result;
+ }
+
+ /**
+ * Perform the search against the given connection. Return a result
+ * containing an error or throw an exception on failures.
+ */
+ protected abstract Result search(Query query, Execution execution, T connection);
+
+ public @Override
+ final void fill(Result result, String summaryClass, Execution execution) {
+ Query query = result.getQuery();
+ Hasher.NodeList<T> nodes = getHasher().getNodes();
+ int code = query.hashCode();
+
+ T connection = nodes.select(code, 0);
+ if (connection != null) {
+ if (timedOut(query)) {
+ result.hits().addError(
+ ErrorMessage.createTimeout(
+ "No time left to get summaries for "
+ + result));
+ } else {
+ // query.setTimeout(getNodeTimeout(query));
+ doFill(connection, result, summaryClass, execution);
+ }
+ } else {
+ result.hits().addError(
+ ErrorMessage.createNoBackendsInService("Could not fill '"
+ + result + "' in '" + this + "'"));
+ }
+ }
+
+ private void doFill(T connection, Result result, String summaryClass, Execution execution) {
+ try {
+ fill(result, summaryClass, execution, connection);
+ } catch (RuntimeException e) {
+ result.hits().addError(
+ ErrorMessage
+ .createBackendCommunicationError("Error filling "
+ + result + " from " + connection + ": "
+ + Exceptions.toMessageString(e)));
+ }
+ if (result.hits().getError() != null) {
+ log(LogLevel.FINE, "FAILED: ", result.getQuery());
+ } else if (!result.isCached()) {
+ log(LogLevel.FINE, "WORKING: ", result.getQuery());
+ } else {
+ log(LogLevel.FINE, "CACHE HIT: " + result.getQuery());
+ }
+ }
+
+ /**
+ * Perform the fill against the given connection. Add an error to the result
+ * or throw an exception on failures.
+ */
+ protected abstract void fill(Result result, String summaryClass,
+ Execution execution, T connection);
+
+ /** NodeManager method, called from ClusterMonitor */
+ public @Override
+ void working(T node) {
+ getHasher().add(node);
+ }
+
+ /** NodeManager method, called from ClusterMonitor */
+ public @Override
+ void failed(T node) {
+ getHasher().remove(node);
+ }
+
+ /**
+ * Returns the hasher used internally in this. Do not mutate this hasher
+ * while in use.
+ */
+ public Hasher<T> getHasher() {
+ return hasher;
+ }
+
+ /** Returns the monitor of these nodes */
+ public ClusterMonitor<T> getMonitor() {
+ return monitor;
+ }
+
+ /** Returns true if this query has timed out now */
+ protected boolean timedOut(Query query) {
+ long duration = query.getDurationTime();
+ return duration >= query.getTimeout();
+ }
+
+ protected void log(java.util.logging.Level level, Object... objects) {
+ if (!getLogger().isLoggable(level))
+ return;
+ StringBuilder sb = new StringBuilder();
+ for (Object object : objects) {
+ sb.append(object);
+ }
+ getLogger().log(level, sb.toString());
+ }
+
+ public @Override void deconstruct() {
+ super.deconstruct();
+ monitor.shutdown();
+ }
+
+ private class Pinger implements Callable<Pong> {
+
+ private T connection;
+
+ public Pinger(T connection) {
+ this.connection = connection;
+ }
+
+ public Pong call() {
+ Pong pong;
+ try {
+ pong = ping(new Ping(monitor.getConfiguration().getRequestTimeout()), connection);
+ } catch (RuntimeException e) {
+ pong = new Pong();
+ pong.addError(
+ ErrorMessage.createBackendCommunicationError(
+ "Exception when pinging "
+ + connection + ": "
+ + Exceptions.toMessageString(e)));
+ }
+ return pong;
+ }
+
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/Hasher.java b/container-search/src/main/java/com/yahoo/search/cluster/Hasher.java
new file mode 100644
index 00000000000..7ef71a7968d
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/Hasher.java
@@ -0,0 +1,130 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+/**
+ * A hasher load balances between a set of nodes, represented by object ids.
+ *
+ * @author Arne B Fossaa
+ * @author bratseth
+ * @author Prashanth B. Bhat
+ */
+public class Hasher<T> {
+
+ public static class NodeFactor<T> {
+ private final T node;
+ /**
+ * The relative weight of the different nodes.
+ * Hashing are based on the proportions of the weights.
+ */
+ private final int load;
+ public NodeFactor(T node, int load) {
+ this.node = node;
+ this.load = load;
+ }
+ public final T getNode() { return node; }
+ public final int getLoad() { return load; }
+ }
+
+ public static class NodeList<T> {
+ private final NodeFactor<T>[] nodes;
+
+ private int totalLoadFactor;
+
+ public NodeList(NodeFactor<T>[] nodes) {
+ this.nodes = nodes;
+ totalLoadFactor = 0;
+ if(nodes != null) {
+ for(NodeFactor<T> node:nodes) {
+ totalLoadFactor += node.getLoad();
+ }
+ }
+ }
+
+ public int getNodeCount() {
+ return nodes.length;
+ }
+
+ public T select(int code, int trynum) {
+ if (totalLoadFactor <= 0) return null;
+
+ // Multiply by a prime number much bigger than the likely number of hosts
+ int hashValue=(Math.abs(code*76103)) % totalLoadFactor;
+ int sumLoad=0;
+ int targetNode=0;
+ for (targetNode=0; targetNode<nodes.length; targetNode++) {
+ sumLoad +=nodes[targetNode].getLoad();
+ if (sumLoad > hashValue)
+ break;
+ }
+ // Skip the ones we have tried before.
+ targetNode += trynum;
+ targetNode %= nodes.length;
+ return nodes[targetNode].getNode();
+ }
+
+ public boolean hasNode(T node) {
+ for(int i = 0;i<nodes.length;i++) {
+ if(node == nodes[i].getNode()) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ }
+
+ private volatile NodeList<T> nodes;
+
+ @SuppressWarnings("unchecked")
+ public Hasher() {
+ this.nodes = new NodeList<T>(new NodeFactor[0]);
+ }
+
+ /** Adds a node with load factor 100 */
+ public void add(T node) {
+ add(node,100);
+ }
+
+ /**
+ * Adds a code with a load factor.
+ * The load factor is relative to the load of the other added nodes
+ * and determines how often this node will be selected compared
+ * to the other nodes
+ */
+ public synchronized void add(T node,int load) {
+ assert(nodes != null);
+ if(!nodes.hasNode(node)) {
+ NodeFactor<T>[] oldNodes = nodes.nodes;
+ @SuppressWarnings("unchecked")
+ NodeFactor<T>[] newNodes = (NodeFactor<T>[]) new NodeFactor[oldNodes.length+ 1];
+ System.arraycopy(oldNodes,0,newNodes,0,oldNodes.length);
+ newNodes[newNodes.length-1] = new NodeFactor<>(node, load);
+
+ //Atomic switch due to volatile
+ nodes = new NodeList<>(newNodes);
+ }
+ }
+
+ /** Removes a node */
+ public synchronized void remove(T node) {
+ if( nodes.hasNode(node)) {
+ NodeFactor<T>[] oldNodes = nodes.nodes;
+ @SuppressWarnings("unchecked")
+ NodeFactor<T>[] newNodes = (NodeFactor<T>[]) new NodeFactor[oldNodes.length - 1];
+ for (int i = 0, j = 0; i < oldNodes.length; i++) {
+ if (oldNodes[i].getNode() != node) {
+ newNodes[j++] = oldNodes[i];
+ }
+ }
+ // An atomic switch due to volatile.
+ nodes = new NodeList<>(newNodes);
+ }
+ }
+
+ /**
+ * Returns a list of nodes that are up.
+ */
+ public NodeList<T> getNodes() {
+ return nodes;
+ }
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
new file mode 100644
index 00000000000..c68b60a743b
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/MonitorConfiguration.java
@@ -0,0 +1,140 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+/**
+ * The configuration of a cluster monitor instance
+ *
+ * @author bratseth
+ */
+public class MonitorConfiguration {
+
+ /**
+ * The interval in ms between consecutive checks of the monitored
+ * nodes
+ */
+ private long checkInterval=1000;
+
+ /**
+ * The number of times a failed node must respond before getting
+ * traffic again
+ */
+ private int responseAfterFailLimit=3;
+
+ /**
+ * The number of ms a node is allowed to stay idle before it is
+ * pinged
+ */
+ private long idleLimit=3000;
+
+ /**
+ * The number of milliseconds to attempt to complete a request
+ * before giving up
+ */
+ private long requestTimeout = 5000;
+
+ /**
+ * The number of milliseconds a node is allowed to fail before we
+ * mark it as not working
+ */
+ private long failLimit=5000;
+
+ /**
+ * The number of times a node is allowed to fail in one hour
+ * before it is quarantined for an hour
+ */
+ private int failQuarantineLimit=3;
+
+ /**
+ * The number of ms to quarantine an unstable node
+ */
+ private long quarantineTime=1000*60*60;
+
+ /**
+ * Sets the interval between each ping of idle or failing nodes
+ * Default is 1000ms
+ */
+ public void setCheckInterval(long intervalMs) {
+ this.checkInterval=intervalMs;
+ }
+
+ /**
+ * Returns the interval between each ping of idle or failing nodes
+ * Default is 1000ms
+ */
+ public long getCheckInterval() {
+ return checkInterval;
+ }
+
+ /**
+ * Sets the number of times a failed node must respond before it is put
+ * back in service. Default is 3.
+ */
+ public void setResponseAfterFailLimit(int responseAfterFailLimit) {
+ this.responseAfterFailLimit=responseAfterFailLimit;
+ }
+
+ /**
+ * Sets the number of ms a node (failing or working) is allowed to
+ * stay idle before it is pinged. Default is 3000
+ */
+ public void setIdleLimit(int idleLimit) {
+ this.idleLimit=idleLimit;
+ }
+
+ /**
+ * Gets the number of ms a node (failing or working)
+ * is allowed to stay idle before it is pinged. Default is 3000
+ */
+ public long getIdleLimit() {
+ return idleLimit;
+ }
+
+ /**
+ * Returns the number of milliseconds to attempt to service a request
+ * (at different nodes) before giving up. Default is 5000 ms.
+ */
+ public long getRequestTimeout() { return requestTimeout; }
+
+ /**
+ * Sets the number of milliseconds a node is allowed to fail before we
+ * mark it as not working
+ */
+ public void setFailLimit(long failLimit) { this.failLimit=failLimit; }
+
+ /**
+ * Returns the number of milliseconds a node is allowed to fail before we
+ * mark it as not working
+ */
+ public long getFailLimit() { return failLimit; }
+
+ /**
+ * The number of times a node must fail in one hour to be placed
+ * in quarantine. Once in quarantine it won't be put back in
+ * productuion before quarantineTime has expired even if it is
+ * working. Default is 3
+ */
+ public void setFailQuarantineLimit(int failQuarantineLimit) {
+ this.failQuarantineLimit=failQuarantineLimit;
+ }
+
+ /**
+ * The number of ms an unstable node is quarantined. Default is
+ * 100*60*60
+ */
+ public void setQuarantineTime(long quarantineTime) {
+ this.quarantineTime=quarantineTime;
+ }
+
+ public String toString() {
+ return "monitor configuration [" +
+ "checkInterval: " + checkInterval +
+ " responseAfterFailLimit: " + responseAfterFailLimit +
+ " idleLimit: " + idleLimit +
+ " requestTimeout " + requestTimeout +
+ " feilLimit " + failLimit +
+ " failQuerantineLimit " + failQuarantineLimit +
+ " quarantineTime " + quarantineTime +
+ "]";
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
new file mode 100644
index 00000000000..7071867c8c7
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/NodeManager.java
@@ -0,0 +1,23 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Must be implemented by a node collection which wants
+ * it's node state monitored by a ClusterMonitor
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon S Bratseth</a>
+ */
+public interface NodeManager<T> {
+
+ /** Called when a failed node is working (ready for production) again */
+ public void working(T node);
+
+ /** Called when a working node fails */
+ public void failed(T node);
+
+ /** Called when a node should be pinged */
+ public void ping(T node, Executor executor);
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java b/container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java
new file mode 100644
index 00000000000..486473eba8d
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/PingableSearcher.java
@@ -0,0 +1,29 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+import com.yahoo.component.ComponentId;
+import com.yahoo.prelude.Ping;
+import com.yahoo.prelude.Pong;
+import com.yahoo.search.Searcher;
+import com.yahoo.search.searchchain.Execution;
+
+/**
+ * A searcher to which we can send a ping to probe if it is alive
+ *
+ * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a>
+ */
+public abstract class PingableSearcher extends Searcher {
+
+ public PingableSearcher() {
+ }
+
+ public PingableSearcher(ComponentId id) {
+ super(id);
+ }
+
+ /** Send a ping request downwards to probe if this searcher chain is in functioning order */
+ public Pong ping(Ping ping, Execution execution) {
+ return execution.ping(ping);
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
new file mode 100644
index 00000000000..6464f0101be
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/TrafficNodeMonitor.java
@@ -0,0 +1,93 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+package com.yahoo.search.cluster;
+
+import com.yahoo.search.result.ErrorMessage;
+
+
+/**
+ * This node monitor is responsible for maintaining the state of a monitored node.
+ * It has the following properties:
+ * <ul>
+ * <li>A node is taken out of operation if it gives no response in 10 s</li>
+ * <li>A node is put back in operation when it responds correctly again
+ * </ul>
+ *
+ * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
+ */
+public class TrafficNodeMonitor<T> extends BaseNodeMonitor<T> {
+ /**
+ * Creates a new node monitor for a node
+ */
+ public TrafficNodeMonitor(T node,MonitorConfiguration configuration,boolean internal) {
+ super(internal);
+ this.node=node;
+ this.configuration=configuration;
+ }
+
+ /** Whether or not this has ever responded successfully */
+ private boolean atStartUp = true;
+
+ public T getNode() { return node; }
+
+ /**
+ * Called when this node fails.
+ *
+ * @param error A container which should contain a short description
+ */
+ @Override
+ public void failed(ErrorMessage error) {
+ respondedAt=now();
+
+ switch (error.getCode()) {
+ // TODO: Remove hard coded error messages.
+ // Refer to docs/errormessages
+ case 10:
+ case 11:
+ // Only count not being able to talk to backend at all
+ // as errors we care about
+ if ((respondedAt-succeededAt) > 10000) {
+ setWorking(false,"Not working for 10 s: " + error.toString());
+ }
+ break;
+ default:
+ succeededAt = respondedAt;
+ break;
+ }
+ }
+
+ /**
+ * Called when a response is received from this node.
+ */
+ public void responded() {
+ respondedAt=now();
+ succeededAt=respondedAt;
+ atStartUp = false;
+
+ if (!isWorking) {
+ setWorking(true,"Responds correctly");
+ }
+ }
+
+ /** Thread-safely changes the state of this node if required */
+ protected synchronized void setWorking(boolean working,String explanation) {
+ if (this.isWorking==working) return; // Old news
+
+ if (explanation==null) {
+ explanation="";
+ } else {
+ explanation=": " + explanation;
+ }
+
+ if (working) {
+ log.info("Putting " + node + " in service" + explanation);
+ }
+ else {
+ if (!atStartUp || !isInternal())
+ log.warning("Taking " + node + " out of service" + explanation);
+ failedAt=now();
+ }
+
+ this.isWorking=working;
+ }
+
+}
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/package-info.java b/container-search/src/main/java/com/yahoo/search/cluster/package-info.java
new file mode 100644
index 00000000000..b470d8c8150
--- /dev/null
+++ b/container-search/src/main/java/com/yahoo/search/cluster/package-info.java
@@ -0,0 +1,12 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+/**
+ * Standard searchers to compose in <i>source</i> search chains (those containing searchers specific for one source and
+ * which ends with a call to some provider) which calls a cluster of provider nodes. These searchers provides hashing
+ * and failover of the provider nodes.
+ */
+@ExportPackage
+@PublicApi
+package com.yahoo.search.cluster;
+
+import com.yahoo.api.annotations.PublicApi;
+import com.yahoo.osgi.annotation.ExportPackage;