summaryrefslogtreecommitdiffstats
path: root/metrics-proxy/src/main/java/ai/vespa/metricsproxy/http/application/ApplicationMetricsRetriever.java
blob: 40011a0dc72cc581a66e723d20daacf0bdebf98c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package ai.vespa.metricsproxy.http.application;

import ai.vespa.metricsproxy.metric.model.ConsumerId;
import ai.vespa.metricsproxy.metric.model.MetricsPacket;
import ai.vespa.util.http.VespaHttpClientBuilder;
import com.google.inject.Inject;
import com.yahoo.component.AbstractComponent;
import java.util.logging.Level;
import org.apache.http.client.HttpClient;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;

import java.time.Clock;
import java.time.Duration;
import java.util.AbstractMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;

import static ai.vespa.metricsproxy.http.ValuesFetcher.DEFAULT_PUBLIC_CONSUMER_ID;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toMap;

/**
 * This class retrieves metrics from all nodes in the given config, usually all
 * nodes in a Vespa application.
 *
 * @author gjoranv
 */
public class ApplicationMetricsRetriever extends AbstractComponent {

    private static final Logger log = Logger.getLogger(ApplicationMetricsRetriever.class.getName());

    static final int MAX_THREADS = 20;
    static final Duration MIN_TIMEOUT = Duration.ofSeconds(60);
    static final Duration MAX_TIMEOUT = Duration.ofSeconds(240);

    private static final int HTTP_CONNECT_TIMEOUT = 5000;
    private static final int HTTP_SOCKET_TIMEOUT = 30000;

    private final HttpClient httpClient = createHttpClient();
    private final List<NodeMetricsClient> clients;
    private final ForkJoinPool forkJoinPool;

    // Non-final for testing
    private Duration taskTimeout;


    @Inject
    public ApplicationMetricsRetriever(MetricsNodesConfig nodesConfig) {
        clients = createNodeClients(nodesConfig);
        int numThreads = Math.min(clients.size(), MAX_THREADS);
        taskTimeout = timeout(clients.size(), numThreads);
        forkJoinPool = new ForkJoinPool(numThreads);
    }

    @Override
    public void deconstruct() {
        forkJoinPool.shutdownNow();
        super.deconstruct();
    }

    public Map<Node, List<MetricsPacket.Builder>> getMetrics() {
        return getMetrics(DEFAULT_PUBLIC_CONSUMER_ID);
    }

    public Map<Node, List<MetricsPacket.Builder>> getMetrics(ConsumerId consumer) {
        log.log(Level.FINE, () -> "Retrieving metrics from " + clients.size() + " nodes.");
        var forkJoinTask = forkJoinPool.submit(() -> clients.parallelStream()
                .map(client -> getNodeMetrics(client, consumer))
                .collect(toMap(Map.Entry::getKey, Map.Entry::getValue)));

        try {
            var metricsByNode = forkJoinTask.get(taskTimeout.toMillis(), TimeUnit.MILLISECONDS);

            log.log(Level.FINE, () -> "Finished retrieving metrics from " + clients.size() + " nodes.");
            return metricsByNode;

        } catch (Exception e) {
            // Since the task is a ForkJoinTask, we don't need special handling of InterruptedException
            throw new ApplicationMetricsException("Failed retrieving metrics.", e);
        }
    }

    private Map.Entry<Node, List<MetricsPacket.Builder>> getNodeMetrics(NodeMetricsClient client, ConsumerId consumer) {
        try {
            return new AbstractMap.SimpleEntry<>(client.node, client.getMetrics(consumer));
        } catch (Exception e) {
            log.log(Level.WARNING, "Could not retrieve metrics from " + client.node.metricsUri(consumer), e);
        }
        return new AbstractMap.SimpleEntry<>(client.node, emptyList());
    }

    private List<NodeMetricsClient> createNodeClients(MetricsNodesConfig nodesConfig) {
        return nodesConfig.node().stream()
                .map(Node::new)
                .map(node-> new NodeMetricsClient(httpClient, node, Clock.systemUTC()))
                .collect(Collectors.toList());
   }

    private static CloseableHttpClient createHttpClient() {
        return VespaHttpClientBuilder.create(PoolingHttpClientConnectionManager::new)
                .setUserAgent("application-metrics-retriever")
                .setDefaultRequestConfig(RequestConfig.custom()
                                                 .setConnectTimeout(HTTP_CONNECT_TIMEOUT)
                                                 .setSocketTimeout(HTTP_SOCKET_TIMEOUT)
                                                 .build())
                .build();
    }

    static Duration timeout(int clients, int numThreads) {
        Duration timeout = Duration.ofSeconds(Long.max(MIN_TIMEOUT.toSeconds(), 20 * clients / numThreads));
        return timeout.compareTo(MAX_TIMEOUT) > 0 ? MAX_TIMEOUT : timeout;
    }

    // For testing only!
    void setTaskTimeout(Duration taskTimeout) {
        this.taskTimeout = taskTimeout;
    }

}