// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.cluster;
import com.yahoo.concurrent.DaemonThreadFactory;
import com.yahoo.concurrent.ThreadFactoryFactory;
import com.yahoo.search.result.ErrorMessage;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Monitors of a cluster of remote nodes.
* The monitor uses an internal thread for node monitoring.
* All public methods of this class are multithread safe.
*
* @author Jon Bratseth
*/
public class ClusterMonitor {
private MonitorConfiguration configuration=new MonitorConfiguration();
private static Logger log=Logger.getLogger(ClusterMonitor.class.getName());
private NodeManager nodeManager;
private MonitorThread monitorThread;
private volatile boolean shutdown = false;
/** A map from Node to corresponding MonitoredNode */
private Map> nodeMonitors=
Collections.synchronizedMap(new java.util.LinkedHashMap>());
public ClusterMonitor(NodeManager manager, String monitorConfigID) {
nodeManager=manager;
monitorThread=new MonitorThread("search.clustermonitor");
monitorThread.start();
log.fine("checkInterval is " + configuration.getCheckInterval()+" ms");
}
/** Returns the configuration of this cluster monitor */
public MonitorConfiguration getConfiguration() { return configuration; }
/**
* Adds a new node for monitoring.
* The object representing the node must
*
* - Have a sensible toString
* - Have a sensible identity (equals and hashCode)
*
*
* @param node the object representing the node
* @param internal whether or not this node is internal to this cluster
*/
public void add(T node,boolean internal) {
BaseNodeMonitor monitor=new TrafficNodeMonitor<>(node,configuration,internal);
// BaseNodeMonitor monitor=new NodeMonitor(node,configuration);
nodeMonitors.put(node,monitor);
}
/**
* Returns the monitor of the given node, or null if this node has not been added
*/
public BaseNodeMonitor getNodeMonitor(T node) {
return nodeMonitors.get(node);
}
/** Called from ClusterSearcher/NodeManager when a node failed */
public synchronized void failed(T node, ErrorMessage error) {
BaseNodeMonitor monitor=nodeMonitors.get(node);
boolean wasWorking=monitor.isWorking();
monitor.failed(error);
if (wasWorking && !monitor.isWorking()) {
nodeManager.failed(node);
}
}
/** Called when a node responded */
public synchronized void responded(T node) {
BaseNodeMonitor monitor = nodeMonitors.get(node);
boolean wasFailing=!monitor.isWorking();
monitor.responded();
if (wasFailing && monitor.isWorking()) {
nodeManager.working(monitor.getNode());
}
}
/**
* Ping all nodes which needs pinging to discover state changes
*/
public void ping(Executor executor) {
for (Iterator> i=nodeMonitorIterator(); i.hasNext(); ) {
BaseNodeMonitor monitor= i.next();
// always ping
// if (monitor.isIdle())
nodeManager.ping(monitor.getNode(),executor); // Cause call to failed or responded
}
}
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
public Iterator> nodeMonitorIterator() {
return nodeMonitors().iterator();
}
/** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */
public List> nodeMonitors() {
synchronized (nodeMonitors) {
return new java.util.ArrayList<>(nodeMonitors.values());
}
}
/** Must be called when this goes out of use */
public void shutdown() {
shutdown = true;
monitorThread.interrupt();
}
private class MonitorThread extends Thread {
MonitorThread(String name) {
super(name);
}
public void run() {
log.fine("Starting cluster monitor thread");
// Pings must happen in a separate thread from this to handle timeouts
// By using a cached thread pool we ensured that 1) a single thread will be used
// for all pings when there are no problems (important because it ensures that
// any thread local connections are reused) 2) a new thread will be started to execute
// new pings when a ping is not responding
Executor pingExecutor=Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping"));
while (!isInterrupted()) {
try {
Thread.sleep(configuration.getCheckInterval());
log.finest("Activating ping");
ping(pingExecutor);
}
catch (Exception e) {
if (shutdown && e instanceof InterruptedException) {
break;
} else {
log.log(Level.WARNING,"Error in monitor thread",e);
}
}
}
log.fine("Stopped cluster monitor thread");
}
}
}