aboutsummaryrefslogtreecommitdiffstats
path: root/zkfacade/src/main/java/com/yahoo/vespa/curator/stats/LatencyStats.java
blob: 130c8a6a9878bc03446fb30aa3e01739cd591c9a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.vespa.curator.stats;

import java.time.Duration;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.function.LongSupplier;
import java.util.logging.Level;
import java.util.logging.Logger;

import static java.lang.Math.max;
import static java.lang.Math.round;

/**
 * An instance of {@code LatencyStats} keeps track of statistics related to <em>time intervals</em> that
 * start at a particular moment in time and ends at a later time.  A typical example is the processing of
 * requests:  Each newly received request starts a new interval, and ends when the response is sent.
 *
 * <p>The statistics only applies to the current <em>time period</em>, and can be retrieved as a
 * {@link LatencyMetrics} instance from e.g. {@link #getLatencyMetrics()}.  This fits well with how Yamas
 * works:  it collects metrics since last collection every minute or so.</p>
 *
 * @see LatencyMetrics
 * @author hakon
 */
// @Thread-Safe
public class LatencyStats {

    private static Logger logger = Logger.getLogger(LatencyStats.class.getName());

    private final LongSupplier nanoTimeSupplier;

    // NB: Keep these in sync with resetForNewPeriod()
    private final Object monitor = new Object();
    private long startOfPeriodNanos;
    private long endOfPeriodNanos;
    private double cumulativeLoadNanos;
    private final Map<String, Long> cumulativeLoadNanosByThread = new HashMap<>();
    private Duration cumulativeLatency;
    private Duration maxLatency;
    private int numIntervalsStarted;
    private int numIntervalsEnded;
    private final HashSet<ActiveIntervalInfo> activeIntervals = new HashSet<>();
    private int maxLoad;

    /** Creates an empty LatencyStats starting the first time period now. */
    public LatencyStats() { this(System::nanoTime); }

    LatencyStats(LongSupplier nanoTimeSupplier) {
        this.nanoTimeSupplier = nanoTimeSupplier;
        this.endOfPeriodNanos = nanoTimeSupplier.getAsLong();
        resetForNewPeriod();
    }

    /** @see #startNewInterval() */
    public interface ActiveInterval extends AutoCloseable {
        @Override void close();
    }

    /**
     * Starts a new (active) interval.  The caller MUST call {@link ActiveInterval#close()} on the
     * returned instance exactly once, which will end the interval.
     */
    public ActiveInterval startNewInterval() {
        synchronized (monitor) {
            pushEndOfPeriodToNow();
            ActiveIntervalInfo activeIntervalInfo = new ActiveIntervalInfo(endOfPeriodNanos);
            activeIntervals.add(activeIntervalInfo);
            maxLoad = max(maxLoad, activeIntervals.size()) ;
            ++numIntervalsStarted;
            return () -> endInterval(activeIntervalInfo);
        }
    }

    /** Returns the metrics for the current time period up to now. */
    public LatencyMetrics getLatencyMetrics() {
        synchronized (monitor) {
            pushEndOfPeriodToNow();
            return makeLatencyMetrics();
        }
    }

    /** Returns the metrics for the current time period up to now, and starts a new period. */
    public LatencyMetrics getLatencyMetricsAndStartNewPeriod() {
        synchronized (monitor) {
            pushEndOfPeriodToNow();
            LatencyMetrics metrics = makeLatencyMetrics();
            resetForNewPeriod();
            return metrics;
        }
    }

    private static class ActiveIntervalInfo {
        private final long startNanos;
        // Poor man's attempt at collapsing thread names into their pool names, as that is the relevant (task) level here.
        private final String threadNameTemplate = Thread.currentThread().getName().replaceAll("\\d+", "*");
        public ActiveIntervalInfo(long startOfIntervalNanos) { this.startNanos = startOfIntervalNanos; }
        public long startOfIntervalNanos() { return startNanos; }
    }

    private void resetForNewPeriod() {
        startOfPeriodNanos = endOfPeriodNanos;
        cumulativeLoadNanos = 0.0;
        cumulativeLoadNanosByThread.clear();
        cumulativeLatency = Duration.ZERO;
        maxLatency = Duration.ZERO;
        numIntervalsStarted = 0;
        numIntervalsEnded = 0;
        maxLoad = activeIntervals.size();
    }

    private void pushEndOfPeriodToNow() {
        long currentNanos = nanoTimeSupplier.getAsLong();
        cumulativeLoadNanos += activeIntervals.size() * (currentNanos - endOfPeriodNanos);
        for (ActiveIntervalInfo activeInterval : activeIntervals) {
            cumulativeLoadNanosByThread.merge(activeInterval.threadNameTemplate,
                                              currentNanos - endOfPeriodNanos,
                                              Long::sum);
        }
        endOfPeriodNanos = currentNanos;
    }

    private void endInterval(ActiveIntervalInfo activeInterval) {
        boolean wasRemoved;
        synchronized (monitor) {
            pushEndOfPeriodToNow();
            wasRemoved = activeIntervals.remove(activeInterval);
            Duration latency = Duration.ofNanos(endOfPeriodNanos - activeInterval.startOfIntervalNanos());
            cumulativeLatency = cumulativeLatency.plus(latency);
            if (latency.compareTo(maxLatency) > 0) {
                maxLatency = latency;
            }
            ++numIntervalsEnded;
        }

        if (!wasRemoved) {
            // Exception made to dump stack trace.
            logger.log(Level.WARNING, "Interval of latency stats was closed twice", new IllegalStateException());
        }
    }

    /** Returns the metrics for the startOfPeriodNanos and endOfPeriodNanos period. */
    private LatencyMetrics makeLatencyMetrics() {
        Duration latency = numIntervalsEnded <= 0 ?
                Duration.ZERO :
                Duration.ofNanos(round(cumulativeLatency.toNanos() / (double) numIntervalsEnded));

        Optional<Duration> maxLatencyFromActiveIntervals = activeIntervals.stream()
                .map(ActiveIntervalInfo::startOfIntervalNanos)
                .min(Comparator.comparing(value -> value))
                .map(startOfIntervalNanos -> Duration.ofNanos(endOfPeriodNanos - startOfIntervalNanos));
        Duration maxActiveLatency = maxLatencyFromActiveIntervals
                .filter(latencyCandidate -> latencyCandidate.compareTo(maxLatency) > 0)
                .orElse(maxLatency);

        final double startHz, endHz, load;
        final Map<String, Double> loadByThread = new HashMap<>();
        long periodNanos = endOfPeriodNanos - startOfPeriodNanos;
        if (periodNanos > 0) {
            double periodSeconds = periodNanos / 1_000_000_000.0;
            startHz = numIntervalsStarted / periodSeconds;
            endHz = numIntervalsEnded / periodSeconds;
            load = cumulativeLoadNanos / periodNanos;
            cumulativeLoadNanosByThread.forEach((name, threadLoad) -> {
                if (threadLoad > 0) loadByThread.put(name, threadLoad / (double) periodNanos);
            });
        } else {
            startHz = endHz = 0.0;
            load = activeIntervals.size();
            for (ActiveIntervalInfo activeInterval : activeIntervals) {
                loadByThread.put(activeInterval.threadNameTemplate, 1.0);
            }
        }

        return new LatencyMetrics(latency,
                                  maxLatency,
                                  maxActiveLatency,
                                  startHz,
                                  endHz,
                                  loadByThread,
                                  load,
                                  maxLoad,
                                  activeIntervals.size());
    }
}