// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.cluster; import com.yahoo.concurrent.DaemonThreadFactory; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; /** * Monitors of a cluster of remote nodes. * The monitor uses an internal thread for node monitoring. * All public methods of this class are multithread safe. * * @author Jon Bratseth */ public class ClusterMonitor { private MonitorConfiguration configuration=new MonitorConfiguration(); private static Logger log=Logger.getLogger(ClusterMonitor.class.getName()); private NodeManager nodeManager; private MonitorThread monitorThread; private volatile boolean shutdown = false; /** A map from Node to corresponding MonitoredNode */ private Map> nodeMonitors= Collections.synchronizedMap(new java.util.LinkedHashMap>()); public ClusterMonitor(NodeManager manager, String monitorConfigID) { nodeManager=manager; monitorThread=new MonitorThread("search.clustermonitor"); monitorThread.start(); log.fine("checkInterval is " + configuration.getCheckInterval()+" ms"); } /** Returns the configuration of this cluster monitor */ public MonitorConfiguration getConfiguration() { return configuration; } /** * Adds a new node for monitoring. * The object representing the node must *
    *
  • Have a sensible toString
  • *
  • Have a sensible identity (equals and hashCode)
  • *
* * @param node the object representing the node * @param internal whether or not this node is internal to this cluster */ public void add(T node,boolean internal) { BaseNodeMonitor monitor=new TrafficNodeMonitor<>(node,configuration,internal); // BaseNodeMonitor monitor=new NodeMonitor(node,configuration); nodeMonitors.put(node,monitor); } /** * Returns the monitor of the given node, or null if this node has not been added */ public BaseNodeMonitor getNodeMonitor(T node) { return nodeMonitors.get(node); } /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { BaseNodeMonitor monitor=nodeMonitors.get(node); boolean wasWorking=monitor.isWorking(); monitor.failed(error); if (wasWorking && !monitor.isWorking()) { nodeManager.failed(node); } } /** Called when a node responded */ public synchronized void responded(T node) { BaseNodeMonitor monitor = nodeMonitors.get(node); boolean wasFailing=!monitor.isWorking(); monitor.responded(); if (wasFailing && monitor.isWorking()) { nodeManager.working(monitor.getNode()); } } /** * Ping all nodes which needs pinging to discover state changes */ public void ping(Executor executor) { for (Iterator> i=nodeMonitorIterator(); i.hasNext(); ) { BaseNodeMonitor monitor= i.next(); // always ping // if (monitor.isIdle()) nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded } } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public Iterator> nodeMonitorIterator() { return nodeMonitors().iterator(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List> nodeMonitors() { synchronized (nodeMonitors) { return new java.util.ArrayList<>(nodeMonitors.values()); } } /** Must be called when this goes out of use */ public void shutdown() { shutdown = true; monitorThread.interrupt(); } private class MonitorThread extends Thread { MonitorThread(String name) { super(name); } public void run() { log.fine("Starting cluster monitor thread"); // Pings must happen in a separate thread from this to handle timeouts // By using a cached thread pool we ensured that 1) a single thread will be used // for all pings when there are no problems (important because it ensures that // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding Executor pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); while (!isInterrupted()) { try { Thread.sleep(configuration.getCheckInterval()); log.finest("Activating ping"); ping(pingExecutor); } catch (Exception e) { if (shutdown && e instanceof InterruptedException) { break; } else { log.log(Level.WARNING,"Error in monitor thread",e); } } } log.fine("Stopped cluster monitor thread"); } } }