summaryrefslogtreecommitdiffstats
path: root/container-search/src/main/java/com/yahoo/search/statistics/PeakQpsSearcher.java
blob: e4bce05b4f2fa6e086822c0ecd2e9c651f53997b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
package com.yahoo.search.statistics;

import com.yahoo.collections.Tuple2;
import com.yahoo.concurrent.ThreadLocalDirectory;
import com.yahoo.search.Query;
import com.yahoo.search.Result;
import com.yahoo.search.Searcher;
import com.yahoo.processing.request.CompoundName;
import com.yahoo.search.result.Hit;
import com.yahoo.search.searchchain.Execution;
import com.yahoo.statistics.Callback;
import com.yahoo.statistics.Handle;
import com.yahoo.statistics.Statistics;
import com.yahoo.statistics.Value;

import java.util.*;

/**
 * Aggregate peak qps and expose through meta hits and/or log events.
 *
 * @author Steinar Knutsen
 */
public class PeakQpsSearcher extends Searcher {

    private final ThreadLocalDirectory<Deque<QueryRatePerSecond>, Long> directory;
    private final Value qpsStatistics;
    private final CompoundName propertyName;
    private final boolean useMetaHit;

    /**
     * Meta hit which carries the peak qps and mean qps since the last time this
     * data was requested. The URI is always "meta:qps". The data is stored as
     * Number subclasses in the fields named by the fields PEAK_QPS and MEAN_QPS
     * in the QpsHit class.
     */
    public static class QpsHit extends Hit {

        /** The name of the field containing mean QPS since the last measurement. */
        public static final String MEAN_QPS = "mean_qps";

        /** The name of the field containing peak QPS since the last measurement. */
        public static final String PEAK_QPS = "peak_qps";
        public static final String SCHEME = "meta";

        public QpsHit(Integer peakQps, Double meanQps) {
            super(SCHEME + ":qps");
            setField(PEAK_QPS, peakQps);
            setField(MEAN_QPS, meanQps);
        }

        public boolean isMeta() {
            return true;
        }

        @Override
        public String toString() {
            return "QPS hit: Peak QPS " + getField(PEAK_QPS) + ", mean QPS " + getField(MEAN_QPS) + ".";
        }

    }

    static class QueryRatePerSecond {
        long when;
        int howMany;

        QueryRatePerSecond(long when) {
            this.when = when;
            this.howMany = 0;
        }

        void add(int x) {
            howMany += x;
        }

        void increment() {
            howMany += 1;
        }

        @Override
        public String toString() {
            return "QueryRatePerSecond(" + when + ": " + howMany + ")";
        }
    }

    static class QueryRate implements
            ThreadLocalDirectory.Updater<Deque<QueryRatePerSecond>, Long> {
        @Override
        public Deque<QueryRatePerSecond> update(Deque<QueryRatePerSecond> current, Long when) {
            QueryRatePerSecond last = current.peekLast();
            if (last == null || last.when != when) {
                last = new QueryRatePerSecond(when);
                current.addLast(last);
            }
            last.increment();
            return current;
        }

        @Override
        public Deque<QueryRatePerSecond> createGenerationInstance(Deque<QueryRatePerSecond> previous) {
            if (previous == null) {
                return new ArrayDeque<>();
            } else {
                return new ArrayDeque<>(previous.size());
            }
        }
    }

    private class Fetcher implements Callback {
        @Override
        public void run(Handle h, boolean firstRun) {
            List<Deque<QueryRatePerSecond>> data = directory.fetch();
            List<QueryRatePerSecond> chewed = merge(data);
            for (QueryRatePerSecond qps : chewed) {
                qpsStatistics.put(qps.howMany);
            }
        }
    }

    public PeakQpsSearcher(MeasureQpsConfig config, Statistics manager) {
        directory = createDirectory();
        MeasureQpsConfig.Outputmethod.Enum method = config.outputmethod();
        if (method == MeasureQpsConfig.Outputmethod.METAHIT) {
            useMetaHit = true;
            propertyName = new CompoundName(config.queryproperty());
            qpsStatistics = null;
        } else if (method == MeasureQpsConfig.Outputmethod.STATISTICS) {
            String event = config.eventname();
            if (event == null || event.isEmpty()) {
                event = getId().getName();
                event = event.replace('.', '_');
            }
            qpsStatistics = new Value(event, manager, new Value.Parameters()
                    .setAppendChar('_').setLogMax(true).setLogMean(true)
                    .setLogMin(false).setLogRaw(false).setNameExtension(true)
                    .setCallback(new Fetcher()));
            useMetaHit = false;
            propertyName = null;
        } else {
            throw new IllegalStateException("Config definition out of sync with implementation." +
                                            " No way to create output for method " + method + ".");
        }
    }

    static ThreadLocalDirectory<Deque<QueryRatePerSecond>, Long> createDirectory() {
        return new ThreadLocalDirectory<>(new QueryRate());
    }

    static List<QueryRatePerSecond> merge(List<Deque<QueryRatePerSecond>> measurements) {
        List<QueryRatePerSecond> rates = new ArrayList<>();
        while (measurements.size() > 0) {
            Deque<Deque<QueryRatePerSecond>> consumeFrom = new ArrayDeque<>(measurements.size());
            long current = Long.MAX_VALUE;
            for (ListIterator<Deque<QueryRatePerSecond>> i = measurements.listIterator(); i.hasNext();) {
                Deque<QueryRatePerSecond> deck = i.next();
                if (deck.size() == 0) {
                    i.remove();
                    continue;
                }
                QueryRatePerSecond threadData = deck.peekFirst();
                if (threadData.when < current) {
                    consumeFrom.clear();
                    current = threadData.when;
                    consumeFrom.add(deck);
                } else if (threadData.when == current) {
                    consumeFrom.add(deck);
                }
            }
            if (consumeFrom.size() > 0) {
                rates.add(consume(consumeFrom));
            }
        }
        return rates;
    }

    private static QueryRatePerSecond consume(Deque<Deque<QueryRatePerSecond>> consumeFrom) {
        Deque<QueryRatePerSecond> valueQueue = consumeFrom.pop();
        QueryRatePerSecond value = valueQueue.pop();
        QueryRatePerSecond thisSecond = new QueryRatePerSecond(value.when);
        thisSecond.add(value.howMany);
        while (consumeFrom.size() > 0) {
            valueQueue = consumeFrom.pop();
            value = valueQueue.pop();
            thisSecond.add(value.howMany);
        }
        return thisSecond;

    }

    @Override
    public Result search(Query query, Execution execution) {
        Result r;
        long when = query.getStartTime() / 1000L;
        Hit meta = null;
        directory.update(when);
        if (useMetaHit) {
            if (query.properties().getBoolean(propertyName, false)) {
                List<QueryRatePerSecond> l = merge(directory.fetch());
                Tuple2<Integer, Double> maxAndMean = maxAndMean(l);
                meta = new QpsHit(maxAndMean.first, maxAndMean.second);
            }
        }
        r = execution.search(query);
        if (meta != null) {
            r.hits().add(meta);
        }
        return r;
    }

    private Tuple2<Integer, Double> maxAndMean(List<QueryRatePerSecond> l) {
        int max = Integer.MIN_VALUE;
        double sum = 0.0d;
        if (l.size() == 0) {
            return new Tuple2<>(0, 0.0);
        }
        for (QueryRatePerSecond qps : l) {
            sum += qps.howMany;
            if (qps.howMany > max) {
                max = qps.howMany;
            }
        }
        return new Tuple2<>(max, sum / (double) l.size());
    }

}