summaryrefslogtreecommitdiffstats
path: root/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
blob: 52d95e9a3ed3f7aebf955c933ade4c853da34761 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "filestormetrics.h"
#include <vespa/metrics/loadmetric.hpp>
#include <vespa/metrics/summetric.hpp>
#include <sstream>

namespace storage {

using metrics::MetricSet;
using metrics::LoadTypeSet;

FileStorThreadMetrics::Op::Op(const std::string& id, const std::string& name, MetricSet* owner)
    : MetricSet(id, "", name + " load in filestor thread", owner),
      _name(name),
      count("count", "yamasdefault", "Number of requests processed.", this),
      latency("latency", "yamasdefault", "Latency of successful requests.", this),
      failed("failed", "yamasdefault", "Number of failed requests.", this)
{ }

FileStorThreadMetrics::Op::~Op() = default;

MetricSet *
FileStorThreadMetrics::Op::clone(std::vector<Metric::UP>& ownerList,
                                 CopyType copyType,
                                 MetricSet* owner,
                                 bool includeUnused) const
{
    if (copyType == INACTIVE) {
        return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
    }
    return (Op*) (new Op(getName(), _name, owner))->assignValues(*this);
}

template <typename BaseOp>
FileStorThreadMetrics::OpWithRequestSize<BaseOp>::OpWithRequestSize(const std::string& id, const std::string& name, MetricSet* owner)
        : BaseOp(id, name, owner),
          request_size("request_size", "", "Size of requests, in bytes", this)
{
}

template <typename BaseOp>
FileStorThreadMetrics::OpWithRequestSize<BaseOp>::~OpWithRequestSize() = default;

// FIXME this has very non-intuitive semantics, ending up with copy&paste patterns
template <typename BaseOp>
MetricSet*
FileStorThreadMetrics::OpWithRequestSize<BaseOp>::clone(
        std::vector<Metric::UP>& ownerList,
        CopyType copyType,
        MetricSet* owner,
        bool includeUnused) const
{
    if (copyType == INACTIVE) {
        return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
    }
    return static_cast<OpWithRequestSize<BaseOp>*>((new OpWithRequestSize<BaseOp>(this->getName(), this->_name, owner))
            ->assignValues(*this));
}

FileStorThreadMetrics::OpWithNotFound::OpWithNotFound(const std::string& id, const std::string& name, MetricSet* owner)
    : Op(id, name, owner),
      notFound("not_found", "", "Number of requests that could not be "
               "completed due to source document not found.", this)
{ }

FileStorThreadMetrics::OpWithNotFound::~OpWithNotFound() = default;

MetricSet *
FileStorThreadMetrics::OpWithNotFound::clone(std::vector<Metric::UP>& ownerList,
                                             CopyType copyType,
                                             MetricSet* owner,
                                             bool includeUnused) const
{
    if (copyType == INACTIVE) {
        return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
    }
    return (OpWithNotFound*)
            (new OpWithNotFound(getName(), _name, owner))
                ->assignValues(*this);
}

FileStorThreadMetrics::Update::Update(MetricSet* owner)
    : OpWithRequestSize("update", "Update", owner),
      latencyRead("latency_read", "", "Latency of the source read in the request.", this)
{ }

FileStorThreadMetrics::Update::~Update() = default;

MetricSet *
FileStorThreadMetrics::Update::clone(std::vector<Metric::UP>& ownerList,
                                     CopyType copyType,
                                     MetricSet* owner,
                                     bool includeUnused) const
{
    if (copyType == INACTIVE) {
        return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
    }
    return (Update*) (new Update(owner))->assignValues(*this);
}

FileStorThreadMetrics::Visitor::Visitor(MetricSet* owner)
    : Op("visit", "Visit", owner),
      documentsPerIterate("docs", "", "Number of entries read per iterate call", this)
{ }

FileStorThreadMetrics::Visitor::~Visitor() = default;

MetricSet *
FileStorThreadMetrics::Visitor::clone(std::vector<Metric::UP>& ownerList,
                                      CopyType copyType,
                                      MetricSet* owner,
                                      bool includeUnused) const
{
    if (copyType == INACTIVE) {
        return MetricSet::clone(ownerList, INACTIVE, owner, includeUnused);
    }
    return (Visitor*) (new Visitor(owner))->assignValues(*this);
}

FileStorThreadMetrics::FileStorThreadMetrics(const std::string& name, const std::string& desc, const LoadTypeSet& lt)
    : MetricSet(name, "filestor partofsum", desc),
      operations("operations", "", "Number of operations processed.", this),
      failedOperations("failedoperations", "", "Number of operations throwing exceptions.", this),
      put(lt, OpWithRequestSize<Op>("put", "Put"), this),
      get(lt, OpWithRequestSize<OpWithNotFound>("get", "Get"), this),
      remove(lt, OpWithRequestSize<OpWithNotFound>("remove", "Remove"), this),
      removeLocation(lt, Op("remove_location", "Remove location"), this),
      statBucket(lt, Op("stat_bucket", "Stat bucket"), this),
      update(lt, Update(), this),
      revert(lt, OpWithNotFound("revert", "Revert"), this),
      createIterator("createiterator", "", this),
      visit(lt, Visitor(), this),
      multiOp(lt, Op("multioperations", "The number of multioperations that have been created"), this),
      createBuckets("createbuckets", "Number of buckets that has been created.", this),
      deleteBuckets("deletebuckets", "Number of buckets that has been deleted.", this),
      repairs("bucketverified", "Number of times buckets have been checked.", this),
      repairFixed("bucketfixed", "", "Number of times bucket has been fixed because of corruption", this),
      recheckBucketInfo("recheckbucketinfo",
                        "Number of times bucket info has been explicitly "
                        "rechecked due to buckets being marked modified by "
                        "the persistence provider",
                        this),
      splitBuckets("splitbuckets", "Number of times buckets have been split.", this),
      joinBuckets("joinbuckets", "Number of times buckets have been joined.", this),
      setBucketStates("setbucketstates", "Number of times buckets have been activated or deactivated.", this),
      movedBuckets("movedbuckets", "Number of buckets moved between disks", this),
      readBucketList("readbucketlist", "Number of read bucket list requests", this),
      readBucketInfo("readbucketinfo", "Number of read bucket info requests", this),
      internalJoin("internaljoin", "Number of joins to join buckets on multiple disks during "
                                   "storage initialization.", this),
      mergeBuckets("mergebuckets", "Number of times buckets have been merged.", this),
      getBucketDiff("getbucketdiff", "Number of getbucketdiff commands that have been processed.", this),
      applyBucketDiff("applybucketdiff", "Number of applybucketdiff commands that have been processed.", this),
      bytesMerged("bytesmerged", "", "Total number of bytes merged into this node.", this),
      getBucketDiffReply("getbucketdiffreply", "", "Number of getbucketdiff replies that have been processed.", this),
      applyBucketDiffReply("applybucketdiffreply", "", "Number of applybucketdiff replies that have been processed.", this),
      mergeLatencyTotal("mergelatencytotal", "",
             "Latency of total merge operation, from master node receives "
             "it, until merge is complete and master node replies.", this),
      mergeMetadataReadLatency("mergemetadatareadlatency", "",
             "Latency of time used in a merge step to check metadata of "
             "current node to see what data it has.", this),
      mergeDataReadLatency("mergedatareadlatency", "",
             "Latency of time used in a merge step to read data other "
             "nodes need.", this),
      mergeDataWriteLatency("mergedatawritelatency", "",
            "Latency of time used in a merge step to write data needed to "
            "current node.", this),
      mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", "", "Amount of data transferred from previous node "
                                     "in chain that we needed to apply locally.", this),
      batchingSize("batchingsize", "", "Number of operations batched per bucket (only counts "
                   "batches of size > 1)", this)
{ }

FileStorThreadMetrics::~FileStorThreadMetrics() = default;

FileStorStripeMetrics::FileStorStripeMetrics(const std::string& name, const std::string& description,
                                             const LoadTypeSet& loadTypes)
    : MetricSet(name, "partofsum", description),
      averageQueueWaitingTime(loadTypes,
                              metrics::DoubleAverageMetric("averagequeuewait", "",
                                                           "Average time an operation spends in input queue."),
                              this)
{
}

FileStorStripeMetrics::~FileStorStripeMetrics() = default;

FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::string& description,
                                         const metrics::LoadTypeSet& loadTypes, MetricSet* owner)
    : MetricSet(name, "partofsum", description, owner),
      sumThreads("allthreads", "sum", "", this),
      sumStripes("allstripes", "sum", "", this),
      averageQueueWaitingTime(loadTypes,
                              metrics::DoubleAverageMetric("averagequeuewait", "",
                                                           "Average time an operation spends in input queue."),
                              this),
      queueSize("queuesize", "", "Size of input message queue.", this),
      pendingMerges("pendingmerge", "", "Number of buckets currently being merged.", this),
      waitingForLockHitRate("waitingforlockrate", "",
              "Amount of times a filestor thread has needed to wait for "
              "lock to take next message in queue.", this),
      lockWaitTime("lockwaittime", "", "Amount of time waiting used waiting for lock.", this)
{
    pendingMerges.unsetOnZeroValue();
    waitingForLockHitRate.unsetOnZeroValue();
}

FileStorDiskMetrics::~FileStorDiskMetrics() = default;

void
FileStorDiskMetrics::initDiskMetrics(const LoadTypeSet& loadTypes, uint32_t numStripes, uint32_t threadsPerDisk)
{
    threads.clear();
    threads.resize(threadsPerDisk);
    for (uint32_t i=0; i<threadsPerDisk; ++i) {
        std::ostringstream desc;
        std::ostringstream name;
        name << "thread" << i;
        desc << "Thread " << i << '/' << threadsPerDisk;
        threads[i] = std::make_shared<FileStorThreadMetrics>(name.str(), desc.str(), loadTypes);
        registerMetric(*threads[i]);
        sumThreads.addMetricToSum(*threads[i]);
    }
    stripes.clear();
    stripes.resize(numStripes);
    for (uint32_t i=0; i<numStripes; ++i) {
        std::ostringstream desc;
        std::ostringstream name;
        name << "stripe" << i;
        desc << "Stripe " << i << '/' << numStripes;
        stripes[i] = std::make_shared<FileStorStripeMetrics>(name.str(), desc.str(), loadTypes);
        registerMetric(*stripes[i]);
        sumStripes.addMetricToSum(*stripes[i]);
    }
}

FileStorMetrics::FileStorMetrics(const LoadTypeSet&)
    : MetricSet("filestor", "filestor", ""),
      sum("alldisks", "sum", "", this),
      directoryEvents("directoryevents", "", "Number of directory events received.", this),
      partitionEvents("partitionevents", "", "Number of partition events received.", this),
      diskEvents("diskevents", "", "Number of disk events received.", this)
{ }

FileStorMetrics::~FileStorMetrics() = default;

void FileStorMetrics::initDiskMetrics(uint16_t numDisks, const LoadTypeSet& loadTypes, uint32_t numStripes, uint32_t threadsPerDisk)
{
    if (!disks.empty()) {
        throw vespalib::IllegalStateException("Can't initialize disks twice", VESPA_STRLOC);
    }
    disks.clear();
    disks.resize(numDisks);
    for (uint32_t i=0; i<numDisks; ++i) {
        // Currently FileStorHandlerImpl expects metrics to exist for
        // disks that are not in use too.
        std::ostringstream desc;
        std::ostringstream name;
        name << "disk_" << i;
        desc << "Disk " << i;
        disks[i] = std::make_shared<FileStorDiskMetrics>( name.str(), desc.str(), loadTypes, this);
        sum.addMetricToSum(*disks[i]);
        disks[i]->initDiskMetrics(loadTypes, numStripes, threadsPerDisk);
    }
}

}

template class metrics::LoadMetric<storage::FileStorThreadMetrics::Op>;
template class metrics::LoadMetric<storage::FileStorThreadMetrics::OpWithNotFound>;
template class metrics::LoadMetric<storage::FileStorThreadMetrics::Update>;
template class metrics::LoadMetric<storage::FileStorThreadMetrics::Visitor>;
template class metrics::LoadMetric<storage::FileStorThreadMetrics::OpWithRequestSize<storage::FileStorThreadMetrics::Op>>;
template class metrics::LoadMetric<storage::FileStorThreadMetrics::OpWithRequestSize<storage::FileStorThreadMetrics::OpWithNotFound>>;
template class metrics::SumMetric<storage::FileStorThreadMetrics::Op>;
template class metrics::SumMetric<storage::FileStorThreadMetrics::OpWithNotFound>;
template class metrics::SumMetric<storage::FileStorThreadMetrics::Update>;
template class metrics::SumMetric<storage::FileStorThreadMetrics::Visitor>;