summaryrefslogtreecommitdiffstats
path: root/controller-server/src/main/java/com/yahoo/vespa/hosted/controller/maintenance/DeploymentMetricsMaintainer.java
blob: 1d9f76cddcdff6672b4eb9736a6aa753eda64599 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
// Copyright 2018 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.hosted.controller.maintenance;

import com.yahoo.config.provision.ApplicationId;
import com.yahoo.config.provision.HostName;
import com.yahoo.config.provision.SystemName;
import com.yahoo.vespa.hosted.controller.Application;
import com.yahoo.vespa.hosted.controller.ApplicationController;
import com.yahoo.vespa.hosted.controller.Controller;
import com.yahoo.vespa.hosted.controller.api.application.v4.model.ClusterMetrics;
import com.yahoo.vespa.hosted.controller.api.integration.MetricsService;
import com.yahoo.vespa.hosted.controller.application.Deployment;
import com.yahoo.vespa.hosted.controller.application.DeploymentMetrics;
import com.yahoo.vespa.hosted.controller.application.RotationStatus;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * Retrieve deployment metrics such as QPS and document count from the metric service and
 * update applications with this info.
 *
 * @author smorgrav
 * @author mpolden
 */
public class DeploymentMetricsMaintainer extends Maintainer {

    private static final Logger log = Logger.getLogger(DeploymentMetricsMaintainer.class.getName());

    private static final int applicationsToUpdateInParallel = 10;

    private final ApplicationController applications;

    public DeploymentMetricsMaintainer(Controller controller, Duration duration, JobControl jobControl) {
        super(controller, duration, jobControl, DeploymentMetricsMaintainer.class.getSimpleName(),
              SystemName.allOf(Predicate.not(SystemName::isPublic)));
        this.applications = controller.applications();
    }

    @Override
    protected void maintain() {
        AtomicInteger failures = new AtomicInteger(0);
        AtomicReference<Exception> lastException = new AtomicReference<>(null);
        List<Application> applicationList = applications.asList();

        // Run parallel stream inside a custom ForkJoinPool so that we can control the number of threads used
        ForkJoinPool pool = new ForkJoinPool(applicationsToUpdateInParallel);
        pool.submit(() -> {
            applicationList.parallelStream().forEach(application -> {
                try {
                    applications.lockIfPresent(application.id(), locked ->
                            applications.store(locked.with(controller().metricsService().getApplicationMetrics(application.id()))));

                    applications.lockIfPresent(application.id(), locked ->
                            applications.store(locked.withRotationStatus(rotationStatus(application))));

                    for (Deployment deployment : application.deployments().values()) {
                        MetricsService.DeploymentMetrics collectedMetrics = controller().metricsService()
                                                                                        .getDeploymentMetrics(application.id(), deployment.zone());
                        Instant now = controller().clock().instant();
                        applications.lockIfPresent(application.id(), locked -> {
                            Deployment existingDeployment = locked.get().deployments().get(deployment.zone());
                            if (existingDeployment == null) return; // Deployment removed since we started collecting metrics
                            DeploymentMetrics newMetrics = existingDeployment.metrics()
                                                                             .withQueriesPerSecond(collectedMetrics.queriesPerSecond())
                                                                             .withWritesPerSecond(collectedMetrics.writesPerSecond())
                                                                             .withDocumentCount(collectedMetrics.documentCount())
                                                                             .withQueryLatencyMillis(collectedMetrics.queryLatencyMillis())
                                                                             .withWriteLatencyMillis(collectedMetrics.writeLatencyMillis())
                                                                             .at(now);
                            applications.store(locked.with(existingDeployment.zone(), newMetrics)
                                                     .recordActivityAt(now, existingDeployment.zone()));
                        });
                    }
                } catch (Exception e) {
                    failures.incrementAndGet();
                    lastException.set(e);
                }
            });
        });
        pool.shutdown();
        try {
            pool.awaitTermination(30, TimeUnit.MINUTES);
            if (lastException.get() != null) {
                log.log(Level.INFO, String.format("Failed to query metrics service for %d/%d applications. Retrying in %s. Last error: ",
                                                     failures.get(),
                                                     applicationList.size(),
                                                     maintenanceInterval()),
                        lastException.get());
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        getClusterMetrics();
    }

    /** Get global rotation status for application */
    private Map<HostName, RotationStatus> rotationStatus(Application application) {
        return applications.rotationRepository().getRotation(application)
                           .map(rotation -> controller().metricsService().getRotationStatus(rotation.name()))
                           .map(rotationStatus -> {
                               Map<HostName, RotationStatus> result = new TreeMap<>();
                               rotationStatus.forEach((hostname, status) -> result.put(hostname, from(status)));
                               return result;
                           })
                           .orElseGet(Collections::emptyMap);
    }

    private static RotationStatus from(com.yahoo.vespa.hosted.controller.api.integration.routing.RotationStatus status) {
        switch (status) {
            case IN: return RotationStatus.in;
            case OUT: return RotationStatus.out;
            case UNKNOWN: return RotationStatus.unknown;
            default: throw new IllegalArgumentException("Unknown API value for rotation status: " + status);
        }
    }

    private void getClusterMetrics() {
        controller().zoneRegistry().zones()
                .reachable().ids()
                .stream().forEach(zoneId -> {
                    Map<ApplicationId, List<ClusterMetrics>> allMetrics = controller().configServer().getMetrics(zoneId);
                    allMetrics.entrySet().stream().forEach(entry -> applications.lockIfPresent(entry.getKey(), locked -> {
                        applications.store(locked.with(zoneId, entry.getValue()));
                    }));
                }
        );
    }


}