// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. package com.yahoo.search.cluster; import com.yahoo.concurrent.ThreadFactoryFactory; import com.yahoo.search.result.ErrorMessage; import com.yahoo.yolean.UncheckedInterruptedException; import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.logging.Level; import java.util.logging.Logger; /** * Monitors of a cluster of remote nodes. * The monitor uses an internal thread for node monitoring. * All public methods of this class are multithread safe. * * @author bratseth */ public class ClusterMonitor { private static final Logger log = Logger.getLogger(ClusterMonitor.class.getName()); private final MonitorConfiguration configuration = new MonitorConfiguration(); private final NodeManager nodeManager; private final MonitorThread monitorThread; private final AtomicBoolean closed = new AtomicBoolean(false); /** A map from Node to corresponding MonitoredNode */ private final Map> nodeMonitors = Collections.synchronizedMap(new LinkedHashMap<>()); // Used during reconfiguration to ensure async RPC calls are complete. private final Set nodesToRemove = new LinkedHashSet<>(); // Used during reconfiguration to ensure all nodes have data. private final Set nodesToUpdate = new LinkedHashSet<>(); // Used for reconfiguration, and during shutdown. private boolean skipNextWait = false; public ClusterMonitor(NodeManager manager, boolean startPingThread) { nodeManager = manager; monitorThread = new MonitorThread("search.clustermonitor." + manager.name()); if (startPingThread) { monitorThread.start(); } } /** Updates the monitored set of nodes, and waits for 1. data on new nodes, and 2. RPC completion of removed nodes. */ public synchronized void reconfigure(Collection nodes) { if ( ! monitorThread.isAlive()) throw new IllegalStateException("monitor thread must be alive for reconfiguration"); nodesToUpdate.addAll(nodes); nodesToRemove.addAll(nodeMonitors.keySet()); nodesToRemove.removeAll(nodes); for (T node : nodes) if ( ! nodeMonitors.containsKey(node)) add(node, true); synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); } try { while ( ! nodesToRemove.isEmpty() || ! nodesToUpdate.isEmpty()) wait(1); } catch (InterruptedException e) { throw new UncheckedInterruptedException(e, true); } nodeManager.pingIterationCompleted(); } public void start() { if ( ! monitorThread.isAlive()) { monitorThread.start(); } } /** Returns the configuration of this cluster monitor */ public MonitorConfiguration getConfiguration() { return configuration; } public boolean isClosed() { return closed.get(); } /** * Adds a new node for monitoring. * The object representing the node must *
    *
  • Have a sensible toString
  • *
  • Have a sensible identity (equals and hashCode)
  • *
* * @param node the object representing the node * @param internal whether this node is internal to this cluster */ public void add(T node, boolean internal) { nodeMonitors.put(node, new TrafficNodeMonitor<>(node, configuration, internal)); } /** Called from ClusterSearcher/NodeManager when a node failed */ public synchronized void failed(T node, ErrorMessage error) { updateMonitoredNode(node, monitor -> monitor.failed(error), nodeManager::failed); } /** Called when a node responded */ public synchronized void responded(T node) { updateMonitoredNode(node, TrafficNodeMonitor::responded, nodeManager::working); } private void updateMonitoredNode(T node, Consumer> monitorUpdate, Consumer nodeUpdate) { TrafficNodeMonitor monitor = nodeMonitors.get(node); // Don't touch state during shutdown. if (closed.get()) monitor = null; // Node was removed during reconfiguration, and should no longer be monitored. if (nodesToRemove.remove(node)) { nodeMonitors.remove(node); monitor = null; } // Update monitor state only when it actually changes. if (monitor != null) { Boolean wasWorking = monitor.isKnownWorking(); monitorUpdate.accept(monitor); if (wasWorking != monitor.isKnownWorking()) nodeUpdate.accept(node); } // If the node was added in a recent reconfiguration, we now have its required data. nodesToUpdate.remove(node); } /** * Ping all nodes which needs pinging to discover state changes */ public synchronized void ping(Executor executor) { for (var monitor : nodeMonitors()) { if (closed.get()) return; // Do nothing to change state if close has started. if (nodesToRemove.remove(monitor.getNode())) { nodeMonitors.remove(monitor.getNode()); continue; } nodeManager.ping(this, monitor.getNode(), executor); } nodeManager.pingIterationCompleted(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public Iterator> nodeMonitorIterator() { return nodeMonitors().iterator(); } /** Returns a thread-safe snapshot of the NodeMonitors of all added nodes */ public List> nodeMonitors() { return List.copyOf(nodeMonitors.values()); } /** Must be called when this goes out of use */ public void shutdown() { closed.set(true); synchronized (this) { nodeMonitors.clear(); } synchronized (nodeManager) { skipNextWait = true; nodeManager.notifyAll(); } try { if (monitorThread.isAlive()) { monitorThread.join(); } } catch (InterruptedException e) {} } private class MonitorThread extends Thread { MonitorThread(String name) { super(name); setDaemon(true); } public void run() { log.info("Starting cluster monitor thread " + getName()); // Pings must happen in a separate thread from this to handle timeouts // By using a cached thread pool we ensured that 1) a single thread will be used // for all pings when there are no problems (important because it ensures that // any thread local connections are reused) 2) a new thread will be started to execute // new pings when a ping is not responding ExecutorService pingExecutor = Executors.newCachedThreadPool(ThreadFactoryFactory.getDaemonThreadFactory("search.ping")); while (!closed.get()) { try { log.finest("Activating ping"); ping(pingExecutor); synchronized (nodeManager) { if ( ! skipNextWait) nodeManager.wait(configuration.getCheckInterval()); skipNextWait = false; } } catch (Throwable e) { if (closed.get() && e instanceof InterruptedException) { break; } else if ( ! (e instanceof Exception) ) { log.log(Level.WARNING,"Error in monitor thread, will quit", e); break; } else { log.log(Level.WARNING,"Exception in monitor thread", e); } } } pingExecutor.shutdown(); try { if ( ! pingExecutor.awaitTermination(10, TimeUnit.SECONDS)) { log.warning("Timeout waiting for ping executor to terminate"); } } catch (InterruptedException e) { } log.info("Stopped cluster monitor thread " + getName()); } } }