1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
|
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "bucket.h"
#include <assert.h>
#include <map>
#include <vespa/vespalib/util/overload.h>
#include <vespa/vespalib/util/visit_ranges.h>
namespace vespalib {
namespace metrics {
namespace {
template<typename T>
std::vector<typename T::aggregator_type>
mergeFromSamples(const StableStore<typename T::sample_type> &source)
{
using Aggregator = typename T::aggregator_type;
using Sample = typename T::sample_type;
using Key = std::pair<MetricId, Point>;
using Map = std::map<Key, Aggregator>;
using MapValue = typename Map::value_type;
Map map;
source.for_each([&map] (const Sample &sample) {
Key id = sample.idx;
auto iter_check = map.emplace(id, sample);
if (!iter_check.second) {
iter_check.first->second.merge(sample);
}
});
std::vector<typename T::aggregator_type> result;
for (const MapValue &entry : map) {
result.push_back(entry.second);
}
return result;
}
template<typename T>
struct IdxComparator {
bool operator() (const T& a, const T& b) { return a.idx < b.idx; }
};
template<typename T>
std::vector<T>
mergeVectors(const std::vector<T> &a,
const std::vector<T> &b)
{
std::vector<T> result;
visit_ranges(overload
{
[&result](visit_ranges_either, const T& x) { result.push_back(x); },
[&result](visit_ranges_both, const T& x, const T& y) {
result.push_back(x);
result.back().merge(y);
}
}, a.begin(), a.end(), b.begin(), b.end(), IdxComparator<T>());
return result;
}
template<typename T>
std::vector<T>
findMissing(const std::vector<T> &already,
const std::vector<T> &complete)
{
std::vector<T> result;
visit_ranges(overload
{
// missing from "complete", should not happen:
[](visit_ranges_first, const T&) { },
// missing this:
[&result](visit_ranges_second, const T& x) { result.push_back(x); },
// already have this:
[](visit_ranges_both, const T&, const T&) { }
},
already.begin(), already.end(),
complete.begin(), complete.end(),
IdxComparator<T>());
return result;
}
} // namespace <unnamed>
void Bucket::merge(const CurrentSamples &samples)
{
counters = mergeFromSamples<Counter>(samples.counterIncrements);
gauges = mergeFromSamples<Gauge>(samples.gaugeMeasurements);
}
void Bucket::merge(const Bucket &other)
{
assert(genCnt < other.genCnt);
genCnt = other.genCnt;
startTime = std::min(startTime, other.startTime);
endTime = std::max(endTime, other.endTime);
std::vector<CounterAggregator> nextCounters = mergeVectors(counters, other.counters);
counters = std::move(nextCounters);
std::vector<GaugeAggregator> nextGauges = mergeVectors(gauges, other.gauges);
gauges = std::move(nextGauges);
}
void Bucket::padMetrics(const Bucket &source)
{
std::vector<CounterAggregator> missingC = findMissing(counters, source.counters);
for (CounterAggregator aggr : missingC) {
aggr.count = 0;
counters.push_back(aggr);
}
std::vector<GaugeAggregator> missingG = findMissing(gauges, source.gauges);
for (GaugeAggregator aggr : missingG) {
aggr.observedCount = 0;
aggr.sumValue = 0;
aggr.minValue = 0;
aggr.maxValue = 0;
gauges.push_back(aggr);
}
}
} // namespace vespalib::metrics
} // namespace vespalib
|