diff options
Diffstat (limited to 'container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java')
-rw-r--r-- | container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java | 161 |
1 files changed, 161 insertions, 0 deletions
diff --git a/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java new file mode 100644 index 00000000000..c075a0f842b --- /dev/null +++ b/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java @@ -0,0 +1,161 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +package com.yahoo.prelude.cluster; + +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import com.yahoo.component.provider.Freezable; +import com.yahoo.container.handler.VipStatus; +import com.yahoo.prelude.fastsearch.VespaBackEndSearcher; +import com.yahoo.search.result.ErrorMessage; + +/** + * Monitors of a cluster of remote nodes. The monitor uses an internal thread + * for node monitoring. + * + * @author bratseth + * @author Steinar Knutsen + */ +public class ClusterMonitor implements Runnable, Freezable { + + // The ping thread wil start using the system, but we cannot be guaranteed that all components + // in the system is up. As a workaround for not being able to find out when the system + // is ready to be used, we wait some time before starting the ping thread + private static final int pingThreadInitialDelayMs = 3000; + + private final MonitorConfiguration configuration; + + private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName()); + + private final ClusterSearcher nodeManager; + + private final Optional<VipStatus> vipStatus; + + /** A map from Node to corresponding MonitoredNode */ + private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>(); + + private ScheduledFuture<?> future; + + private boolean isFrozen = false; + + ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) { + configuration = new MonitorConfiguration(monitorConfig); + nodeManager = manager; + this.vipStatus = vipStatus; + log.fine("checkInterval is " + configuration.getCheckInterval() + " ms"); + } + + /** Returns the configuration of this cluster monitor */ + MonitorConfiguration getConfiguration() { + return configuration; + } + + void startPingThread() { + if ( ! isFrozen()) + throw new IllegalStateException("Do not start the monitoring thread before the set of " + + "nodes to monitor is complete/the ClusterMonitor is frozen."); + future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS); + } + + /** + * Adds a new node for monitoring. + */ + void add(VespaBackEndSearcher node) { + if (isFrozen()) + throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen."); + nodeMonitors.put(node, new NodeMonitor(node)); + updateVipStatus(); + } + + /** Called from ClusterSearcher/NodeManager when a node failed */ + void failed(VespaBackEndSearcher node, ErrorMessage error) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasWorking = monitor.isWorking(); + monitor.failed(error); + if (wasWorking && !monitor.isWorking()) { + log.info("Failed monitoring node '" + node + "' due to '" + error); + nodeManager.failed(node); + } + updateVipStatus(); + } + + /** Called when a node responded */ + void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) { + NodeMonitor monitor = nodeMonitors.get(node); + boolean wasFailing = !monitor.isWorking(); + monitor.responded(hasSearchNodesOnline); + if (wasFailing && monitor.isWorking()) { + log.info("Failed node '" + node + "' started working again."); + nodeManager.working(node); + } + updateVipStatus(); + } + + private void updateVipStatus() { + if ( ! vipStatus.isPresent()) return; + if ( ! hasInformationAboutAllNodes()) return; + + if (hasWorkingNodesWithDocumentsOnline()) { + vipStatus.get().addToRotation(nodeManager.getId().stringValue()); + } else { + vipStatus.get().removeFromRotation(nodeManager.getId().stringValue()); + } + } + + private boolean hasInformationAboutAllNodes() { + for (NodeMonitor monitor : nodeMonitors.values()) { + if ( ! monitor.statusIsKnown()) + return false; + } + return true; + } + + private boolean hasWorkingNodesWithDocumentsOnline() { + for (NodeMonitor node : nodeMonitors.values()) { + if (node.isWorking() && node.searchNodesOnline()) + return true; + } + return false; + } + + /** + * Ping all nodes which needs pinging to discover state changes + */ + private void ping() throws InterruptedException { + for (NodeMonitor monitor : nodeMonitors.values()) { + nodeManager.ping(monitor.getNode()); + } + } + + @Override + public void run() { + log.finest("Activating ping"); + try { + ping(); + } catch (Exception e) { + log.log(Level.WARNING, "Error in monitor thread", e); + } + } + + public void shutdown() { + if (future != null) { + future.cancel(true); + } + } + + @Override + public void freeze() { + isFrozen = true; + + } + + @Override + public boolean isFrozen() { + return isFrozen; + } + +} |