diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-07-15 10:58:58 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-07-15 10:58:58 +0000 |
commit | f4a30a48b9e8586e763e6e98e038e00099ee2b2f (patch) | |
tree | 681df0c84722fbec7e31854920beb29725490384 /storage | |
parent | 59332ffc463715515f6e15d87834f0646e461ab9 (diff) |
Add per-operation metrics for puts and removes that are part of merges
Move all merge-related metrics out into a separate wrapper for convenience.
Diffstat (limited to 'storage')
8 files changed, 104 insertions, 74 deletions
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index d698cbb5e05..1e0144e9efb 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -175,16 +175,16 @@ void MetricsTest::createFakeLoad() thread.internalJoin.count.inc(3 * n); thread.mergeBuckets.count.inc(2 * n); - thread.bytesMerged.inc(1000 * n); thread.getBucketDiff.count.inc(4 * n); thread.getBucketDiffReply.inc(4 * n); thread.applyBucketDiff.count.inc(4 * n); thread.applyBucketDiffReply.inc(4 * n); - thread.mergeLatencyTotal.addValue(300 * n); - thread.mergeMetadataReadLatency.addValue(20 * n); - thread.mergeDataReadLatency.addValue(40 * n); - thread.mergeDataWriteLatency.addValue(50 * n); - thread.mergeAverageDataReceivedNeeded.addValue(0.8); + thread.merge_handler_metrics.bytesMerged.inc(1000 * n); + thread.merge_handler_metrics.mergeLatencyTotal.addValue(300 * n); + thread.merge_handler_metrics.mergeMetadataReadLatency.addValue(20 * n); + thread.merge_handler_metrics.mergeDataReadLatency.addValue(40 * n); + thread.merge_handler_metrics.mergeDataWriteLatency.addValue(50 * n); + thread.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue(0.8); } } for (uint32_t i=0; i<_visitorMetrics->threads.size(); ++i) { diff --git a/storage/src/tests/persistence/mergehandlertest.cpp b/storage/src/tests/persistence/mergehandlertest.cpp index 262906a4baf..df0ea3e6680 100644 --- a/storage/src/tests/persistence/mergehandlertest.cpp +++ b/storage/src/tests/persistence/mergehandlertest.cpp @@ -901,7 +901,7 @@ TEST_F(MergeHandlerTest, apply_bucket_diff_spi_failures) { EXPECT_EQ("", doTestSPIException(handler, providerWrapper, invoker, *it)); // Casual, in-place testing of bug 6752085. // This will fail if we give NaN to the metric in question. - EXPECT_TRUE(std::isfinite(getEnv()._metrics.mergeAverageDataReceivedNeeded.getLast())); + EXPECT_TRUE(std::isfinite(getEnv()._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.getLast())); } } diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt index d394ec3a5cf..182bf04b1cf 100644 --- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt @@ -5,6 +5,7 @@ vespa_add_library(storage_filestorpersistence OBJECT filestorhandlerimpl.cpp filestormanager.cpp filestormetrics.cpp + merge_handler_metrics.cpp mergestatus.cpp modifiedbucketchecker.cpp DEPENDS diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp index 0be046b2e9e..a80b17d4f19 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp @@ -152,23 +152,9 @@ FileStorThreadMetrics::FileStorThreadMetrics(const std::string& name, const std: mergeBuckets("mergebuckets", "Number of times buckets have been merged.", this), getBucketDiff("getbucketdiff", "Number of getbucketdiff commands that have been processed.", this), applyBucketDiff("applybucketdiff", "Number of applybucketdiff commands that have been processed.", this), - bytesMerged("bytesmerged", {}, "Total number of bytes merged into this node.", this), getBucketDiffReply("getbucketdiffreply", {}, "Number of getbucketdiff replies that have been processed.", this), applyBucketDiffReply("applybucketdiffreply", {}, "Number of applybucketdiff replies that have been processed.", this), - mergeLatencyTotal("mergelatencytotal", {}, - "Latency of total merge operation, from master node receives " - "it, until merge is complete and master node replies.", this), - mergeMetadataReadLatency("mergemetadatareadlatency", {}, - "Latency of time used in a merge step to check metadata of " - "current node to see what data it has.", this), - mergeDataReadLatency("mergedatareadlatency", {}, - "Latency of time used in a merge step to read data other " - "nodes need.", this), - mergeDataWriteLatency("mergedatawritelatency", {}, - "Latency of time used in a merge step to write data needed to " - "current node.", this), - mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node " - "in chain that we needed to apply locally.", this), + merge_handler_metrics(this), batchingSize("batchingsize", {}, "Number of operations batched per bucket (only counts " "batches of size > 1)", this) { } diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h index 3bc7fa33660..be1c5c48213 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h @@ -10,6 +10,7 @@ #pragma once +#include "merge_handler_metrics.h" #include <vespa/metrics/metrics.h> #include <vespa/documentapi/loadtypes/loadtypeset.h> @@ -99,15 +100,9 @@ struct FileStorThreadMetrics : public metrics::MetricSet Op mergeBuckets; Op getBucketDiff; Op applyBucketDiff; - - metrics::LongCountMetric bytesMerged; metrics::LongCountMetric getBucketDiffReply; metrics::LongCountMetric applyBucketDiffReply; - metrics::DoubleAverageMetric mergeLatencyTotal; - metrics::DoubleAverageMetric mergeMetadataReadLatency; - metrics::DoubleAverageMetric mergeDataReadLatency; - metrics::DoubleAverageMetric mergeDataWriteLatency; - metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded; + MergeHandlerMetrics merge_handler_metrics; metrics::LongAverageMetric batchingSize; FileStorThreadMetrics(const std::string& name, const std::string& desc, const metrics::LoadTypeSet& lt); diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp new file mode 100644 index 00000000000..d4e82b8a64f --- /dev/null +++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.cpp @@ -0,0 +1,28 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "merge_handler_metrics.h" + +namespace storage { + +MergeHandlerMetrics::MergeHandlerMetrics(metrics::MetricSet* owner) + : bytesMerged("bytesmerged", {}, "Total number of bytes merged into this node.", owner), + mergeLatencyTotal("mergelatencytotal", {}, + "Latency of total merge operation, from master node receives " + "it, until merge is complete and master node replies.", owner), + mergeMetadataReadLatency("mergemetadatareadlatency", {}, + "Latency of time used in a merge step to check metadata of " + "current node to see what data it has.", owner), + mergeDataReadLatency("mergedatareadlatency", {}, + "Latency of time used in a merge step to read data other " + "nodes need.", owner), + mergeDataWriteLatency("mergedatawritelatency", {}, + "Latency of time used in a merge step to write data needed to " + "current node.", owner), + mergeAverageDataReceivedNeeded("mergeavgdatareceivedneeded", {}, "Amount of data transferred from previous node " + "in chain that we needed to apply locally.", owner), + put_latency("put_latency", {}, "Latency of individual puts that are part of merge operations", owner), + remove_latency("remove_latency", {}, "Latency of individual removes that are part of merge operations", owner) +{} + +MergeHandlerMetrics::~MergeHandlerMetrics() = default; + +} diff --git a/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h new file mode 100644 index 00000000000..ffa3cf204a3 --- /dev/null +++ b/storage/src/vespa/storage/persistence/filestorage/merge_handler_metrics.h @@ -0,0 +1,32 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/metrics/metrics.h> + +namespace storage { + +// Provides a convenient wrapper for all MergeHandler-related metrics. +// This is _not_ its own MetricSet; metrics are owned by an explicitly provided +// parent. This is to prevent metric paths from changing, as external aggregation +// depends on the existing paths. +struct MergeHandlerMetrics { + metrics::LongCountMetric bytesMerged; + // Aggregate metrics: + metrics::DoubleAverageMetric mergeLatencyTotal; + metrics::DoubleAverageMetric mergeMetadataReadLatency; + metrics::DoubleAverageMetric mergeDataReadLatency; + metrics::DoubleAverageMetric mergeDataWriteLatency; + metrics::DoubleAverageMetric mergeAverageDataReceivedNeeded; + // Individual operation metrics. These capture both count and latency sum, so + // no need for explicit count metric on the side. + metrics::DoubleAverageMetric put_latency; + metrics::DoubleAverageMetric remove_latency; + // Iteration over metadata and document payload data is already covered by + // the merge[Meta]Data(Read|Write)Latency metrics, so not repeated here. Can be + // explicitly added if deemed required. + + explicit MergeHandlerMetrics(metrics::MetricSet* owner); + ~MergeHandlerMetrics(); +}; + +} diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 3483b15dd0e..70894858887 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -5,7 +5,6 @@ #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/document/fieldset/fieldsets.h> -#include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> #include <algorithm> @@ -34,7 +33,7 @@ MergeHandler::MergeHandler(spi::PersistenceProvider& spi, namespace { -int getDeleteFlag() { +constexpr int getDeleteFlag() { // Referred into old slotfile code before. Where should this number come from? return 2; } @@ -71,8 +70,7 @@ checkResult(const spi::Result& result, } -class IteratorGuard -{ +class IteratorGuard { spi::PersistenceProvider& _spi; spi::IteratorId _iteratorId; spi::Context& _context; @@ -84,15 +82,13 @@ public: _iteratorId(iteratorId), _context(context) {} - ~IteratorGuard() - { + ~IteratorGuard() { assert(_iteratorId != 0); _spi.destroyIterator(_iteratorId, _context); } }; -struct IndirectDocEntryTimestampPredicate -{ +struct IndirectDocEntryTimestampPredicate { bool operator()(const spi::DocEntry::UP& e1, const spi::DocEntry::UP& e2) const { @@ -106,8 +102,7 @@ struct IndirectDocEntryTimestampPredicate } }; -struct DiffEntryTimestampPredicate -{ +struct DiffEntryTimestampPredicate { bool operator()(const api::ApplyBucketDiffCommand::Entry& e, const api::Timestamp timestamp) const { @@ -221,7 +216,6 @@ MergeHandler::buildBucketInfoList( bucket.toString().c_str(), providerInfo.toString().c_str(), dbInfo.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); } entry->setBucketInfo(providerInfo); @@ -538,16 +532,14 @@ MergeHandler::applyDiffEntry(const spi::Bucket& bucket, // Regular put entry Document::SP doc(deserializeDiffDocument(e, repo)); DocumentId docId = doc->getId(); - checkResult(_spi.put(bucket, timestamp, std::move(doc), context), - bucket, - docId, - "put"); + framework::MilliSecTimer start_time(_env._component.getClock()); + checkResult(_spi.put(bucket, timestamp, std::move(doc), context), bucket, docId, "put"); + _env._metrics.merge_handler_metrics.put_latency.addValue(start_time.getElapsedTimeAsDouble()); } else { DocumentId docId(e._docName); - checkResult(_spi.remove(bucket, timestamp, docId, context), - bucket, - docId, - "remove"); + framework::MilliSecTimer start_time(_env._component.getClock()); + checkResult(_spi.remove(bucket, timestamp, docId, context), bucket, docId, "remove"); + _env._metrics.merge_handler_metrics.remove_latency.addValue(start_time.getElapsedTimeAsDouble()); } } @@ -660,10 +652,10 @@ MergeHandler::applyDiffLocally( } if (byteCount + notNeededByteCount != 0) { - _env._metrics.mergeAverageDataReceivedNeeded.addValue( + _env._metrics.merge_handler_metrics.mergeAverageDataReceivedNeeded.addValue( static_cast<double>(byteCount) / (byteCount + notNeededByteCount)); } - _env._metrics.bytesMerged.inc(byteCount); + _env._metrics.merge_handler_metrics.bytesMerged.inc(byteCount); LOG(debug, "Merge(%s): Applied %u entries locally from ApplyBucketDiff.", bucket.toString().c_str(), addedCount); @@ -870,7 +862,7 @@ MergeHandler::processBucketMerge(const spi::Bucket& bucket, MergeStatus& status, if (applyDiffNeedLocalData(cmd->getDiff(), 0, true)) { framework::MilliSecTimer startTime(_env._component.getClock()); fetchLocalData(bucket, cmd->getLoadType(), cmd->getDiff(), 0, context); - _env._metrics.mergeDataReadLatency.addValue( + _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue( startTime.getElapsedTimeAsDouble()); } status.pendingId = cmd->getMsgId(); @@ -966,7 +958,7 @@ MergeHandler::handleMergeBucket(api::MergeBucketCommand& cmd, MessageTracker::UP "Bucket not found in buildBucketInfo step"); return tracker; } - _env._metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble()); + _env._metrics.merge_handler_metrics.mergeMetadataReadLatency.addValue(s->startTime.getElapsedTimeAsDouble()); LOG(spam, "Sending GetBucketDiff %" PRIu64 " for %s to next node %u " "with diff of %u entries.", cmd2->getMsgId(), @@ -1039,19 +1031,19 @@ namespace { result.push_back(b); ++j; } else { - // If we find equal timestamped entries that are not the - // same.. Flag an error. But there is nothing we can do - // about it. Note it as if it is the same entry so we - // dont try to merge them. + // If we find equal timestamped entries that are not the + // same.. Flag an error. But there is nothing we can do + // about it. Note it as if it is the same entry so we + // dont try to merge them. if (!(a == b)) { if (a._gid == b._gid && a._flags == b._flags) { if ((a._flags & getDeleteFlag()) != 0 && (b._flags & getDeleteFlag()) != 0) { - // Unfortunately this can happen, for instance - // if a remove comes to a bucket out of sync - // and reuses different headers in the two - // versions. + // Unfortunately this can happen, for instance + // if a remove comes to a bucket out of sync + // and reuses different headers in the two + // versions. LOG(debug, "Found entries with equal timestamps of " "the same gid who both are remove " "entries: %s <-> %s.", @@ -1071,9 +1063,9 @@ namespace { } else if ((a._flags & getDeleteFlag()) != (b._flags & getDeleteFlag())) { - // If we find one remove and one put entry on the - // same timestamp we are going to keep the remove - // entry to make the copies consistent. + // If we find one remove and one put entry on the + // same timestamp we are going to keep the remove + // entry to make the copies consistent. const api::GetBucketDiffCommand::Entry& deletedEntry( (a._flags & getDeleteFlag()) != 0 ? a : b); result.push_back(deletedEntry); @@ -1146,7 +1138,7 @@ MergeHandler::handleGetBucketDiff(api::GetBucketDiffCommand& cmd, MessageTracker LOG(error, "Diffing %s found suspect entries.", bucket.toString().c_str()); } - _env._metrics.mergeMetadataReadLatency.addValue( + _env._metrics.merge_handler_metrics.mergeMetadataReadLatency.addValue( startTime.getElapsedTimeAsDouble()); // If last node in merge chain, we can send reply straight away @@ -1243,7 +1235,6 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, LOG(warning, "Got GetBucketDiffReply for %s which we have no " "merge state for.", bucket.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } @@ -1252,7 +1243,6 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, LOG(warning, "Got GetBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", bucket.toString().c_str(), reply.getMsgId(), s.pendingId); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } api::StorageReply::SP replyToSend; @@ -1280,7 +1270,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, // We have sent something on, and shouldn't reply now. clearState = false; } else { - _env._metrics.mergeLatencyTotal.addValue( + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue( s.startTime.getElapsedTimeAsDouble()); } } @@ -1327,7 +1317,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra if (applyDiffNeedLocalData(cmd.getDiff(), index, !lastInChain)) { framework::MilliSecTimer startTime(_env._component.getClock()); fetchLocalData(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context()); - _env._metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); + _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Moving %zu entries, didn't need " "local data on node %u (%u).", @@ -1340,7 +1330,7 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra framework::MilliSecTimer startTime(_env._component.getClock()); api::BucketInfo info(applyDiffLocally(bucket, cmd.getLoadType(), cmd.getDiff(), index, tracker->context())); - _env._metrics.mergeDataWriteLatency.addValue( + _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", @@ -1409,7 +1399,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, LOG(warning, "Got ApplyBucketDiffReply for %s which we have no " "merge state for.", bucket.toString().c_str()); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } @@ -1418,7 +1407,6 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, LOG(warning, "Got ApplyBucketDiffReply for %s which had message " "id %" PRIu64 " when we expected %" PRIu64 ". Ignoring reply.", bucket.toString().c_str(), reply.getMsgId(), s.pendingId); - DUMP_LOGGED_BUCKET_OPERATIONS(bucket.getBucketId()); return; } bool clearState = true; @@ -1436,7 +1424,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, framework::MilliSecTimer startTime(_env._component.getClock()); fetchLocalData(bucket, reply.getLoadType(), diff, index, s.context); - _env._metrics.mergeDataReadLatency.addValue( + _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue( startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { @@ -1444,7 +1432,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, api::BucketInfo info( applyDiffLocally(bucket, reply.getLoadType(), diff, index, s.context)); - _env._metrics.mergeDataWriteLatency.addValue( + _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", @@ -1491,7 +1479,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, // We have sent something on and shouldn't reply now. clearState = false; } else { - _env._metrics.mergeLatencyTotal.addValue( + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue( s.startTime.getElapsedTimeAsDouble()); } } |