diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java | 157 |
1 files changed, 157 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java new file mode 100644 index 00000000000..1c50ea5d904 --- /dev/null +++ b/container-search/src/main/java/com/yahoo/search/cluster/ClusterMonitor.java @@ -0,0 +1,157 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.search.cluster; + + +import com.yahoo.concurrent.DaemonThreadFactory; +import com.yahoo.concurrent.ThreadFactoryFactory; +import com.yahoo.search.result.ErrorMessage; + +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Monitors of a cluster of remote nodes. + * The monitor uses an internal thread for node monitoring. + * All <i>public</i> methods of this class are multithread safe. + * + * @author <a href="mailto:bratseth@yahoo-inc.com">Jon Bratseth</a> + */ +public class ClusterMonitor<T> { + + private MonitorConfiguration configuration=new MonitorConfiguration(); + + private static Logger log=Logger.getLogger(ClusterMonitor.class.getName()); + + private NodeManager<T> nodeManager; + + private MonitorThread monitorThread; + + private volatile boolean shutdown = false; + + /** A map from Node to corresponding MonitoredNode */ + private Map<T,BaseNodeMonitor<T>> nodeMonitors= + Collections.synchronizedMap(new java.util.LinkedHashMap<T, BaseNodeMonitor<T>>()); + + public ClusterMonitor(NodeManager<T> manager, String monitorConfigID) { + nodeManager=manager; + monitorThread=new MonitorThread("search.clustermonitor"); + monitorThread.start(); + log.fine("checkInterval is " + configuration.getCheckInterval()+" ms"); + } + + /** Returns the configuration of this cluster monitor */ + public MonitorConfiguration getConfiguration() { return configuration; } + + /** + * Adds a new node for monitoring. + * The object representing the node must + * <ul> + * <li>Have a sensible toString</li> + * <li>Have a sensible identity (equals and hashCode)</li> + * </ul> + * + * @param node the object representing the node + * @param internal whether or not this node is internal to this cluster + */ + public void add(T node,boolean internal) { + BaseNodeMonitor<T> monitor=new TrafficNodeMonitor<>(node,configuration,internal); + // BaseNodeMonitor monitor=new NodeMonitor(node,configuration); + nodeMonitors.put(node,monitor); + } + + /** + * Returns the monitor of the given node, or null if this node has not been added + */ + public BaseNodeMonitor<T> getNodeMonitor(T node) { + return nodeMonitors.get(node); + } + + /** Called from ClusterSearcher/NodeManager when a node failed */ + public synchronized void failed(T node, ErrorMessage error) { + BaseNodeMonitor<T> monitor=nodeMonitors.get(node); + boolean wasWorking=monitor.isWorking(); + monitor.failed(error); + if (wasWorking && !monitor.isWorking()) { + nodeManager.failed(node); + } + } + + /** Called when a node responded */ + public synchronized void responded(T node) { + BaseNodeMonitor<T> monitor = nodeMonitors.get(node); + boolean wasFailing=!monitor.isWorking(); + monitor.responded(); + if (wasFailing && monitor.isWorking()) { + nodeManager.working(monitor.getNode()); + } + } + + /** + * Ping all nodes which needs pinging to discover state changes + */ + public void ping(Executor executor) { + for (Iterator<BaseNodeMonitor<T>> i=nodeMonitorIterator(); i.hasNext(); ) { + BaseNodeMonitor<T> monitor= i.next(); + // always ping + // if (monitor.isIdle()) + nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded + } + } + + /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ + public Iterator<BaseNodeMonitor<T>> nodeMonitorIterator() { + return nodeMonitors().iterator(); + } + + /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ + public List<BaseNodeMonitor<T>> nodeMonitors() { + synchronized (nodeMonitors) { + return new java.util.ArrayList<>(nodeMonitors.values()); + } + } + + /** Must be called when this goes out of use */ + public void shutdown() { + shutdown = true; + monitorThread.interrupt(); + } + + private class MonitorThread extends Thread { + MonitorThread(String name) { + super(name); + } + + public void run() { + log.fine("Starting cluster monitor thread"); + // Pings must happen in a separate thread from this to handle timeouts + // By using a cached thread pool we ensured that 1) a single thread will be used + // for all pings when there are no problems (important because it ensures that + // any thread local connections are reused) 2) a new thread will be started to execute + // new pings when a ping is not responding + Executor pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); + while (!isInterrupted()) { + try { + Thread.sleep(configuration.getCheckInterval()); + log.finest("Activating ping"); + ping(pingExecutor); + } + catch (Exception e) { + if (shutdown && e instanceof InterruptedException) { + break; + } else { + log.log(Level.WARNING,"Error in monitor thread",e); + } + } + } + log.fine("Stopped cluster monitor thread"); + } + + } + +} |