summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/prelude/cluster/ClusterMonitor.java
blob: d183a47a66f8ba373ebe49a4f092c2acfcb54b72 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.prelude.cluster;

import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.yahoo.component.provider.Freezable;
import com.yahoo.container.handler.VipStatus;
import com.yahoo.prelude.fastsearch.VespaBackEndSearcher;
import com.yahoo.search.result.ErrorMessage;

/**
 * Monitors of a cluster of remote nodes. The monitor uses an internal thread
 * for node monitoring.
 *
 * @author bratseth
 * @author <a href="mailto:steinar@yahoo-inc.com">Steinar Knutsen</a>
 */
public class ClusterMonitor implements Runnable, Freezable {

    private final MonitorConfiguration configuration;

    private final static Logger log = Logger.getLogger(ClusterMonitor.class.getName());

    private final ClusterSearcher nodeManager;

    private final VipStatus vipStatus;

    /** A map from Node to corresponding MonitoredNode */
    private final Map<VespaBackEndSearcher, NodeMonitor> nodeMonitors = new java.util.IdentityHashMap<>();
    ScheduledFuture<?>  future;

    private boolean isFrozen = false;

    ClusterMonitor(final ClusterSearcher manager, final QrMonitorConfig monitorConfig, VipStatus vipStatus) {
        configuration = new MonitorConfiguration(monitorConfig);
        nodeManager = manager;
        this.vipStatus = vipStatus;
        log.fine("checkInterval is " + configuration.getCheckInterval() + " ms");
    }

    /** Returns the configuration of this cluster monitor */
    MonitorConfiguration getConfiguration() {
        return configuration;
    }

    void startPingThread() {
        if (!isFrozen()) {
            throw new IllegalStateException(
                    "Do not start the monitoring thread before the set of"
                    +" nodes to monitor is complete/the ClusterMonitor is frozen.");
        }
        future = nodeManager.getScheduledExecutor().scheduleAtFixedRate(this, 30 * 1000, configuration.getCheckInterval(), TimeUnit.MILLISECONDS);
    }

    /**
     * Adds a new node for monitoring.
     */
    void add(final VespaBackEndSearcher node) {
        if (isFrozen()) {
            throw new IllegalStateException(
                    "Can not add new nodes after ClusterMonitor has been frozen.");
        }
        final NodeMonitor monitor = new NodeMonitor(node);
        nodeMonitors.put(node, monitor);
    }

    /** Called from ClusterSearcher/NodeManager when a node failed */
    void failed(final VespaBackEndSearcher node, final ErrorMessage error) {
        final NodeMonitor monitor = nodeMonitors.get(node);
        final boolean wasWorking = monitor.isWorking();
        monitor.failed(error);
        if (wasWorking && !monitor.isWorking()) {
            // was warning, see VESPA-1922            
            log.info("Failed monitoring node '" + node + "' due to '" + error);
            nodeManager.failed(node);
        }
        updateVipStatus();
    }

    /** Called when a node responded */
    void responded(final VespaBackEndSearcher node, boolean hasDocumentsOnline) {
        final NodeMonitor monitor = nodeMonitors.get(node);
        final boolean wasFailing = !monitor.isWorking();
        monitor.responded(hasDocumentsOnline);
        if (wasFailing && monitor.isWorking()) {
            log.info("Failed node '" + node + "' started working again.");
            nodeManager.working(monitor.getNode());
        }
        updateVipStatus();
    }

    private void updateVipStatus() {
        boolean hasWorkingNodesWithDocumentsOnline = false;
        for (NodeMonitor node : nodeMonitors.values()) {
            if (node.isWorking() && node.searchNodesOnline()) {
                hasWorkingNodesWithDocumentsOnline = true;
                break;
            }
        }
        if (hasWorkingNodesWithDocumentsOnline) {
            vipStatus.addToRotation(this);
        } else {
            vipStatus.removeFromRotation(this);
        }
    }

    /**
     * Ping all nodes which needs pinging to discover state changes
     */
    private void ping() throws InterruptedException {
        for (final NodeMonitor monitor : nodeMonitors.values()) {
            nodeManager.ping(monitor.getNode());
        }
    }

    @Override
    public void run() {
        log.finest("Activating ping");
        try {
            ping();
        } catch (final Exception e) {
            log.log(Level.WARNING, "Error in monitor thread", e);
        }
    }

    public void shutdown() throws InterruptedException {
        if (future != null) {
            future.cancel(true);
        }
    }

    @Override
    public void freeze() {
        isFrozen  = true;

    }

    @Override
    public boolean isFrozen() {
        return isFrozen;
    }
}