// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.cluster; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.logging.Level; import java.util.logging.Logger; /** * Monitors of a cluster of remote nodes. * The monitor uses an internal thread for node monitoring. * All public methods of this class are multithread safe. * * @author bratseth */ public class ClusterMonitor { private static final Logger log = Logger.getLogger(ClusterMonitor.class.getName()); private final MonitorConfiguration configuration = new MonitorConfiguration(); private final NodeManager nodeManager; private final MonitorThread monitorThread; private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ private final Map> nodeMonitors = Collections.synchronizedMap(new java.util.LinkedHashMap<>()); public ClusterMonitor(NodeManager manager, boolean startPingThread) { nodeManager = manager; monitorThread = new MonitorThread("search.clustermonitor." + manager.name()); if (startPingThread) { monitorThread.start(); } } public void start() { if ( ! monitorThread.isAlive()) { monitorThread.start(); } } /** Returns the configuration of this cluster monitor */ public MonitorConfiguration getConfiguration() { return configuration; } public boolean isClosed() { return closed.get(); } /** * Adds a new node for monitoring. * The object representing the node must *
    *
  • Have a sensible toString
  • *
  • Have a sensible identity (equals and hashCode)
  • *
* * @param node the object representing the node * @param internal whether or not this node is internal to this cluster */ public void add(T node, boolean internal) { nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal)); } /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { if (closed.get()) return; // Do not touch state if close has started. TrafficNodeMonitor monitor = nodeMonitors.get(node); Boolean wasWorking = monitor.isKnownWorking(); monitor.failed(error); if (wasWorking != monitor.isKnownWorking()) nodeManager.failed(node); } /** Called when a node responded */ public synchronized void responded(T node) { if (closed.get()) return; // Do not touch state if close has started. TrafficNodeMonitor monitor = nodeMonitors.get(node); Boolean wasWorking = monitor.isKnownWorking(); monitor.responded(); if (wasWorking != monitor.isKnownWorking()) nodeManager.working(node); } /** * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { for (Iterator> i = nodeMonitorIterator(); i.hasNext() && !closed.get(); ) { BaseNodeMonitor monitor= i.next(); nodeManager.ping(this, monitor.getNode(), executor); // Cause call to failed or responded } if (closed.get()) return; // Do nothing to change state if close has started. nodeManager.pingIterationCompleted(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public Iterator> nodeMonitorIterator() { return nodeMonitors().iterator(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List> nodeMonitors() { return new java.util.ArrayList<>(nodeMonitors.values()); } /** Must be called when this goes out of use */ public void shutdown() { closed.set(true); synchronized (this) { nodeMonitors.clear(); } synchronized (nodeManager) { nodeManager.notifyAll(); } try { if (monitorThread.isAlive()) { monitorThread.join(); } } catch (InterruptedException e) {} } private class MonitorThread extends Thread { MonitorThread(String name) { super(name); setDaemon(true); } public void run() { log.info("Starting cluster monitor thread " + getName()); // Pings must happen in a separate thread from this to handle timeouts // By using a cached thread pool we ensured that 1) a single thread will be used // for all pings when there are no problems (important because it ensures that // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding ExecutorService pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); while (!closed.get()) { try { log.finest("Activating ping"); ping(pingExecutor); synchronized (nodeManager) { nodeManager.wait(configuration.getCheckInterval()); } } catch (Throwable e) { if (closed.get() && e instanceof InterruptedException) { break; } else if ( ! (e instanceof Exception) ) { log.log(Level.WARNING,"Error in monitor thread, will quit", e); break; } else { log.log(Level.WARNING,"Exception in monitor thread", e); } } } pingExecutor.shutdown(); try { pingExecutor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { } log.info("Stopped cluster monitor thread " + getName()); } } }