summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-07-15 10:58:58 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-07-15 10:58:58 +0000
commitf4a30a48b9e8586e763e6e98e038e00099ee2b2f (patch)
tree681df0c84722fbec7e31854920beb29725490384 /storage
parent59332ffc463715515f6e15d87834f0646e461ab9 (diff)
Add per-operation metrics for puts and removes that are part of merges
Move all merge-related metrics out into a separate wrapper for convenience.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/common/metricstest.cpp12
-rw-r--r--storage/src/tests/persistence/mergehandlertest.cpp2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h9
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp28
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h32
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp78
8 files changed, 104 insertions, 74 deletions
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp
index d698cbb5e05..1e0144e9efb 100644
--- a/storage/src/tests/common/metricstest.cpp
+++ b/storage/src/tests/common/metricstest.cpp
@@ -175,16 +175,16 @@ void MetricsTest::createFakeLoad()
thread.internalJoin.count.inc(3 * n);
thread.mergeBuckets.count.inc(2 * n);
- thread.bytesMerged.inc(1000 * n);
thread.getBucketDiff.count.inc(4 * n);
thread.getBucketDiffReply.inc(4 * n);
thread.applyBucketDiff.count.inc(4 * n);
thread.applyBucketDiffReply.inc(4 * n);
- thread.mergeLatencyTotal.addValue(300 * n);
- thread.mergeMetadataReadLatency.addValue(20 * n);
- thread.mergeDataReadLatency.addValue(40 * n);
- thread.mergeDataWriteLatency.addValue(50 * n);
- thread.mergeAverageDataReceivedNeeded.addValue(0.8);
+ thread.merge_handler_metrics.bytesMerged.inc(1000 * n);
+ thread.merge_handler_metrics.mergeLatencyTotal.addValue(300 * n);
+ thread.merge_handler_metrics.mergeMetadataReadLatency.addValue(20 * n);
+ thread.merge_handler_metrics.mergeDataReadLatency.addValue(40 * n);
+ thread.merge_handler_metrics.mergeDataWriteLatency.addValue(50 * n);
+ thread.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(0.8);
}
}
for (uint32_t i=0; i<_visitorMetrics->threads.size(); ++i) {
diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp
index 262906a4baf..df0ea3e6680 100644
--- a/storage/src/tests/persistence/mergehandlertest.cpp
+++ b/storage/src/tests/persistence/mergehandlertest.cpp
@@ -901,7 +901,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) {
EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it));
// Casual, in-place testing of bug 6752085.
// This will fail if we give NaN to the metric in question.
- EXPECT_TRUE(std::isfinite(getEnv()._metrics.mergeAverageDataReceivedNeeded.getLast()));
+ EXPECT_TRUE(std::isfinite(getEnv()._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.getLast()));
}
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
index d394ec3a5cf..182bf04b1cf 100644
--- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
@@ -5,6 +5,7 @@ vespa_add_library(storage_filestorpersistence OBJECT
filestorhandlerimpl.cpp
filestormanager.cpp
filestormetrics.cpp
+ merge_handler_metrics.cpp
mergestatus.cpp
modifiedbucketchecker.cpp
DEPENDS
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
index 0be046b2e9e..a80b17d4f19 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
@@ -152,23 +152,9 @@ FileStorThreadMetrics::FileStorThreadMetrics(const std::string& name, const std:
mergeBuckets("mergebuckets", "Number of times buckets have been merged.", this),
getBucketDiff("getbucketdiff", "Number of getbucketdiff commands that have been processed.", this),
applyBucketDiff("applybucketdiff", "Number of applybucketdiff commands that have been processed.", this),
- bytesMerged("bytesmerged", {}, "Total number of bytes merged into this node.", this),
getBucketDiffReply("getbucketdiffreply", {}, "Number of getbucketdiff replies that have been processed.", this),
applyBucketDiffReply("applybucketdiffreply", {}, "Number of applybucketdiff replies that have been processed.", this),
- mergeLatencyTotal("mergelatencytotal", {},
- "Latency of total merge operation, from master node receives "
- "it, until merge is complete and master node replies.", this),
- mergeMetadataReadLatency("mergemetadatareadlatency", {},
- "Latency of time used in a merge step to check metadata of "
- "current node to see what data it has.", this),
- mergeDataReadLatency("mergedatareadlatency", {},
- "Latency of time used in a merge step to read data other "
- "nodes need.", this),
- mergeDataWriteLatency("mergedatawritelatency", {},
- "Latency of time used in a merge step to write data needed to "
- "current node.", this),
- mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node "
- "in chain that we needed to apply locally.", this),
+ merge_handler_metrics(this),
batchingSize("batchingsize", {}, "Number of operations batched per bucket (only counts "
"batches of size > 1)", this)
{ }
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index 3bc7fa33660..be1c5c48213 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -10,6 +10,7 @@
#pragma once
+#include "merge_handler_metrics.h"
#include <vespa/metrics/metrics.h>
#include <vespa/documentapi/loadtypes/loadtypeset.h>
@@ -99,15 +100,9 @@ struct FileStorThreadMetrics : public metrics::MetricSet
Op mergeBuckets;
Op getBucketDiff;
Op applyBucketDiff;
-
- metrics::LongCountMetric bytesMerged;
metrics::LongCountMetric getBucketDiffReply;
metrics::LongCountMetric applyBucketDiffReply;
- metrics::DoubleAverageMetric mergeLatencyTotal;
- metrics::DoubleAverageMetric mergeMetadataReadLatency;
- metrics::DoubleAverageMetric mergeDataReadLatency;
- metrics::DoubleAverageMetric mergeDataWriteLatency;
- metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded;
+ MergeHandlerMetrics merge_handler_metrics;
metrics::LongAverageMetric batchingSize;
FileStorThreadMetrics(const std::string& name, const std::string& desc, const metrics::LoadTypeSet& lt);
diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
new file mode 100644
index 00000000000..d4e82b8a64f
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp
@@ -0,0 +1,28 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "merge_handler_metrics.h"
+
+namespace storage {
+
+MergeHandlerMetrics::MergeHandlerMetrics(metrics::MetricSet* owner)
+ : bytesMerged("bytesmerged", {}, "Total number of bytes merged into this node.", owner),
+ mergeLatencyTotal("mergelatencytotal", {},
+ "Latency of total merge operation, from master node receives "
+ "it, until merge is complete and master node replies.", owner),
+ mergeMetadataReadLatency("mergemetadatareadlatency", {},
+ "Latency of time used in a merge step to check metadata of "
+ "current node to see what data it has.", owner),
+ mergeDataReadLatency("mergedatareadlatency", {},
+ "Latency of time used in a merge step to read data other "
+ "nodes need.", owner),
+ mergeDataWriteLatency("mergedatawritelatency", {},
+ "Latency of time used in a merge step to write data needed to "
+ "current node.", owner),
+ mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node "
+ "in chain that we needed to apply locally.", owner),
+ put_latency("put_latency", {}, "Latency of individual puts that are part of merge operations", owner),
+ remove_latency("remove_latency", {}, "Latency of individual removes that are part of merge operations", owner)
+{}
+
+MergeHandlerMetrics::~MergeHandlerMetrics() = default;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
new file mode 100644
index 00000000000..ffa3cf204a3
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h
@@ -0,0 +1,32 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/metrics/metrics.h>
+
+namespace storage {
+
+// Provides a convenient wrapper for all MergeHandler-related metrics.
+// This is _not_ its own MetricSet; metrics are owned by an explicitly provided
+// parent. This is to prevent metric paths from changing, as external aggregation
+// depends on the existing paths.
+struct MergeHandlerMetrics {
+ metrics::LongCountMetric bytesMerged;
+ // Aggregate metrics:
+ metrics::DoubleAverageMetric mergeLatencyTotal;
+ metrics::DoubleAverageMetric mergeMetadataReadLatency;
+ metrics::DoubleAverageMetric mergeDataReadLatency;
+ metrics::DoubleAverageMetric mergeDataWriteLatency;
+ metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded;
+ // Individual operation metrics. These capture both count and latency sum, so
+ // no need for explicit count metric on the side.
+ metrics::DoubleAverageMetric put_latency;
+ metrics::DoubleAverageMetric remove_latency;
+ // Iteration over metadata and document payload data is already covered by
+ // the merge[Meta]Data(Read|Write)Latency metrics, so not repeated here. Can be
+ // explicitly added if deemed required.
+
+ explicit MergeHandlerMetrics(metrics::MetricSet* owner);
+ ~MergeHandlerMetrics();
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 3483b15dd0e..70894858887 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -5,7 +5,6 @@
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
#include <algorithm>
@@ -34,7 +33,7 @@ MergeHandler::MergeHandler(spi::PersistenceProvider& spi,
namespace {
-int getDeleteFlag() {
+constexpr int getDeleteFlag() {
// Referred into old slotfile code before. Where should this number come from?
return 2;
}
@@ -71,8 +70,7 @@ checkResult(const spi::Result& result,
}
-class IteratorGuard
-{
+class IteratorGuard {
spi::PersistenceProvider& _spi;
spi::IteratorId _iteratorId;
spi::Context& _context;
@@ -84,15 +82,13 @@ public:
_iteratorId(iteratorId),
_context(context)
{}
- ~IteratorGuard()
- {
+ ~IteratorGuard() {
assert(_iteratorId != 0);
_spi.destroyIterator(_iteratorId, _context);
}
};
-struct IndirectDocEntryTimestampPredicate
-{
+struct IndirectDocEntryTimestampPredicate {
bool operator()(const spi::DocEntry::UP& e1,
const spi::DocEntry::UP& e2) const
{
@@ -106,8 +102,7 @@ struct IndirectDocEntryTimestampPredicate
}
};
-struct DiffEntryTimestampPredicate
-{
+struct DiffEntryTimestampPredicate {
bool operator()(const api::ApplyBucketDiffCommand::Entry& e,
const api::Timestamp timestamp) const
{
@@ -221,7 +216,6 @@ MergeHandler::buildBucketInfoList(
bucket.toString().c_str(),
providerInfo.toString().c_str(),
dbInfo.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
}
entry->setBucketInfo(providerInfo);
@@ -538,16 +532,14 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket,
// Regular put entry
Document::SP doc(deserializeDiffDocument(e, repo));
DocumentId docId = doc->getId();
- checkResult(_spi.put(bucket, timestamp, std::move(doc), context),
- bucket,
- docId,
- "put");
+ framework::MilliSecTimer start_time(_env._component.getClock());
+ checkResult(_spi.put(bucket, timestamp, std::move(doc), context), bucket, docId, "put");
+ _env._metrics.merge_handler_metrics.put_latency.addValue(start_time.getElapsedTimeAsDouble());
} else {
DocumentId docId(e._docName);
- checkResult(_spi.remove(bucket, timestamp, docId, context),
- bucket,
- docId,
- "remove");
+ framework::MilliSecTimer start_time(_env._component.getClock());
+ checkResult(_spi.remove(bucket, timestamp, docId, context), bucket, docId, "remove");
+ _env._metrics.merge_handler_metrics.remove_latency.addValue(start_time.getElapsedTimeAsDouble());
}
}
@@ -660,10 +652,10 @@ MergeHandler::applyDiffLocally(
}
if (byteCount + notNeededByteCount != 0) {
- _env._metrics.mergeAverageDataReceivedNeeded.addValue(
+ _env._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(
static_cast<double>(byteCount) / (byteCount + notNeededByteCount));
}
- _env._metrics.bytesMerged.inc(byteCount);
+ _env._metrics.merge_handler_metrics.bytesMerged.inc(byteCount);
LOG(debug, "Merge(%s): Applied %u entries locally from ApplyBucketDiff.",
bucket.toString().c_str(), addedCount);
@@ -870,7 +862,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status,
if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) {
framework::MilliSecTimer startTime(_env._component.getClock());
fetchLocalData(bucket, cmd->getLoadType(), cmd->getDiff(), 0, context);
- _env._metrics.mergeDataReadLatency.addValue(
+ _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(
startTime.getElapsedTimeAsDouble());
}
status.pendingId = cmd->getMsgId();
@@ -966,7 +958,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP
"Bucket not found in buildBucketInfo step");
return tracker;
}
- _env._metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble());
+ _env._metrics.merge_handler_metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble());
LOG(spam, "Sending GetBucketDiff %" PRIu64 " for %s to next node %u "
"with diff of %u entries.",
cmd2->getMsgId(),
@@ -1039,19 +1031,19 @@ namespace {
result.push_back(b);
++j;
} else {
- // If we find equal timestamped entries that are not the
- // same.. Flag an error. But there is nothing we can do
- // about it. Note it as if it is the same entry so we
- // dont try to merge them.
+ // If we find equal timestamped entries that are not the
+ // same.. Flag an error. But there is nothing we can do
+ // about it. Note it as if it is the same entry so we
+ // dont try to merge them.
if (!(a == b)) {
if (a._gid == b._gid && a._flags == b._flags) {
if ((a._flags & getDeleteFlag()) != 0 &&
(b._flags & getDeleteFlag()) != 0)
{
- // Unfortunately this can happen, for instance
- // if a remove comes to a bucket out of sync
- // and reuses different headers in the two
- // versions.
+ // Unfortunately this can happen, for instance
+ // if a remove comes to a bucket out of sync
+ // and reuses different headers in the two
+ // versions.
LOG(debug, "Found entries with equal timestamps of "
"the same gid who both are remove "
"entries: %s <-> %s.",
@@ -1071,9 +1063,9 @@ namespace {
} else if ((a._flags & getDeleteFlag())
!= (b._flags & getDeleteFlag()))
{
- // If we find one remove and one put entry on the
- // same timestamp we are going to keep the remove
- // entry to make the copies consistent.
+ // If we find one remove and one put entry on the
+ // same timestamp we are going to keep the remove
+ // entry to make the copies consistent.
const api::GetBucketDiffCommand::Entry& deletedEntry(
(a._flags & getDeleteFlag()) != 0 ? a : b);
result.push_back(deletedEntry);
@@ -1146,7 +1138,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker
LOG(error, "Diffing %s found suspect entries.",
bucket.toString().c_str());
}
- _env._metrics.mergeMetadataReadLatency.addValue(
+ _env._metrics.merge_handler_metrics.mergeMetadataReadLatency.addValue(
startTime.getElapsedTimeAsDouble());
// If last node in merge chain, we can send reply straight away
@@ -1243,7 +1235,6 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
LOG(warning, "Got GetBucketDiffReply for %s which we have no "
"merge state for.",
bucket.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
@@ -1252,7 +1243,6 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
LOG(warning, "Got GetBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
- DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
api::StorageReply::SP replyToSend;
@@ -1280,7 +1270,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply,
// We have sent something on, and shouldn't reply now.
clearState = false;
} else {
- _env._metrics.mergeLatencyTotal.addValue(
+ _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(
s.startTime.getElapsedTimeAsDouble());
}
}
@@ -1327,7 +1317,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) {
framework::MilliSecTimer startTime(_env._component.getClock());
fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context());
- _env._metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
+ _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Moving %zu entries, didn't need "
"local data on node %u (%u).",
@@ -1340,7 +1330,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
framework::MilliSecTimer startTime(_env._component.getClock());
api::BucketInfo info(applyDiffLocally(bucket, cmd.getLoadType(),
cmd.getDiff(), index, tracker->context()));
- _env._metrics.mergeDataWriteLatency.addValue(
+ _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
@@ -1409,7 +1399,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
LOG(warning, "Got ApplyBucketDiffReply for %s which we have no "
"merge state for.",
bucket.toString().c_str());
- DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
@@ -1418,7 +1407,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
LOG(warning, "Got ApplyBucketDiffReply for %s which had message "
"id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.",
bucket.toString().c_str(), reply.getMsgId(), s.pendingId);
- DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId());
return;
}
bool clearState = true;
@@ -1436,7 +1424,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
framework::MilliSecTimer startTime(_env._component.getClock());
fetchLocalData(bucket, reply.getLoadType(), diff, index,
s.context);
- _env._metrics.mergeDataReadLatency.addValue(
+ _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(
startTime.getElapsedTimeAsDouble());
}
if (applyDiffHasLocallyNeededData(diff, index)) {
@@ -1444,7 +1432,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
api::BucketInfo info(
applyDiffLocally(bucket, reply.getLoadType(), diff,
index, s.context));
- _env._metrics.mergeDataWriteLatency.addValue(
+ _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
@@ -1491,7 +1479,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply,
// We have sent something on and shouldn't reply now.
clearState = false;
} else {
- _env._metrics.mergeLatencyTotal.addValue(
+ _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(
s.startTime.getElapsedTimeAsDouble());
}
}