aboutsummaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
blob: bb5accd902263f55bee09eed68aacb78f11788eb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.cluster;

import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.yahoo.component.provider.Freezable;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.result.ErrorMessage;

/**
 * Monitors of a cluster of remote nodes. The monitor uses an internal thread
 * for node monitoring.
 *
 * @author bratseth
 * @author Steinar Knutsen
 */
public class ClusterMonitor implements Runnable, Freezable {

    // The ping thread wil start using the system, but we cannot be guaranteed that all components
    // in the system is up. As a workaround for not being able to find out when the system
    // is ready to be used, we wait some time before starting the ping thread
    private static final int pingThreadInitialDelayMs = 3000;

    private final MonitorConfiguration configuration;

    private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName());

    private final ClusterSearcher nodeManager;

    private final Optional<VipStatus> vipStatus;

    /** A map from Node to corresponding MonitoredNode */
    private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>();
    private ScheduledFuture<?>  future;

    private boolean isFrozen = false;

    ClusterMonitor(ClusterSearcher manager, QrMonitorConfig monitorConfig, Optional<VipStatus> vipStatus) {
        configuration = new MonitorConfiguration(monitorConfig);
        nodeManager = manager;
        this.vipStatus = vipStatus;
        log.fine("checkInterval is " + configuration.getCheckInterval() + " ms");
    }

    /** Returns the configuration of this cluster monitor */
    MonitorConfiguration getConfiguration() {
        return configuration;
    }

    void startPingThread() {
        if ( ! isFrozen())
            throw new IllegalStateException("Do not start the monitoring thread before the set of " +
                                            "nodes to monitor is complete/the ClusterMonitor is frozen.");
        future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, pingThreadInitialDelayMs, configuration.getCheckInterval(), TimeUnit.MILLISECONDS);
    }

    /**
     * Adds a new node for monitoring.
     */
    void add(VespaBackEndSearcher node) {
        if (isFrozen())
            throw new IllegalStateException("Can not add new nodes after ClusterMonitor has been frozen.");
        nodeMonitors.put(node, new NodeMonitor(node));
        updateVipStatus();
    }

    /** Called from ClusterSearcher/NodeManager when a node failed */
    void failed(VespaBackEndSearcher node, ErrorMessage error) {
        NodeMonitor monitor = nodeMonitors.get(node);
        boolean wasWorking = monitor.isWorking();
        monitor.failed(error);
        if (wasWorking && !monitor.isWorking()) {
            log.info("Failed monitoring node '" + node + "' due to '" + error);
            nodeManager.failed(node);
        }
        updateVipStatus();
    }

    /** Called when a node responded */
    void responded(VespaBackEndSearcher node, boolean hasSearchNodesOnline) {
        NodeMonitor monitor = nodeMonitors.get(node);
        boolean wasFailing = !monitor.isWorking();
        monitor.responded(hasSearchNodesOnline);
        if (wasFailing && monitor.isWorking()) {
            log.info("Failed node '" + node + "' started working again.");
            nodeManager.working(node);
        }
        updateVipStatus();
    }

    private void updateVipStatus() {
        if ( ! vipStatus.isPresent()) return;
        
        boolean hasWorkingNodesWithDocumentsOnline = false;
        for (NodeMonitor node : nodeMonitors.values()) {
            if (node.isWorking() && node.searchNodesOnline()) {
                hasWorkingNodesWithDocumentsOnline = true;
                break;
            }
        }
        if (hasWorkingNodesWithDocumentsOnline) {
            vipStatus.get().addToRotation(this);
        } else {
            vipStatus.get().removeFromRotation(this);
        }
    }

    /**
     * Ping all nodes which needs pinging to discover state changes
     */
    private void ping() throws InterruptedException {
        for (NodeMonitor monitor : nodeMonitors.values()) {
            nodeManager.ping(monitor.getNode());
        }
    }

    @Override
    public void run() {
        log.finest("Activating ping");
        try {
            ping();
        } catch (Exception e) {
            log.log(Level.WARNING, "Error in monitor thread", e);
        }
    }

    public void shutdown() throws InterruptedException {
        if (future != null) {
            future.cancel(true);
        }
    }

    @Override
    public void freeze() {
        isFrozen  = true;

    }

    @Override
    public boolean isFrozen() {
        return isFrozen;
    }

}