diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-11-23 23:14:49 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-11-23 23:14:49 +0100 |
commit | 3af9cb3c0bc51db7baa434c1a6d8354e79af94a2 (patch) | |
tree | 97d1e0f34c360fed7581257e0f7290267f8d6b58 /storage/src | |
parent | d8b60f5658e9305ae29b2ed8442b5fc74cfff29c (diff) |
Update merge latency metrics after async writes have completed.
Diffstat (limited to 'storage/src')
4 files changed, 70 insertions, 16 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index 701e8a80d3a..cdf7ac817fa 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -2,11 +2,14 @@ #include <vespa/storage/persistence/apply_bucket_diff_state.h> #include <vespa/storage/persistence/merge_bucket_info_syncer.h> +#include <vespa/storage/persistence/filestorage/merge_handler_metrics.h> #include <vespa/document/base/documentid.h> #include <vespa/document/bucket/bucketid.h> #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/document/test/make_document_bucket.h> +#include <vespa/metrics/metricset.h> #include <vespa/persistence/spi/result.h> +#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> #include <gtest/gtest.h> using document::DocumentId; @@ -75,19 +78,26 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test public: uint32_t sync_count; DummyMergeBucketInfoSyncer syncer; + metrics::MetricSet merge_handler_metrics_owner; + MergeHandlerMetrics merge_handler_metrics; + framework::defaultimplementation::FakeClock clock; MonitoredRefCount monitored_ref_count; ApplyBucketDiffStateTestBase() : ::testing::Test(), sync_count(0u), - syncer(sync_count) + syncer(sync_count), + merge_handler_metrics_owner("owner", {}, "owner"), + merge_handler_metrics(&merge_handler_metrics_owner), + clock(), + monitored_ref_count() { } ~ApplyBucketDiffStateTestBase(); std::shared_ptr<ApplyBucketDiffState> make_state() { - return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); + return ApplyBucketDiffState::create(syncer, merge_handler_metrics, clock, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } }; @@ -168,4 +178,29 @@ TEST_F(ApplyBucketDiffStateTest, failed_sync_bucket_info_is_detected) check_failure(fail); } +TEST_F(ApplyBucketDiffStateTest, data_write_latency_is_updated) +{ + clock.addMilliSecondsToTime(10); + state.reset(); + EXPECT_EQ(10.0, merge_handler_metrics.mergeDataWriteLatency.getLast()); + EXPECT_EQ(1, merge_handler_metrics.mergeDataWriteLatency.getCount()); +} + +TEST_F(ApplyBucketDiffStateTest, total_latency_is_not_updated) +{ + clock.addMilliSecondsToTime(14); + state.reset(); + EXPECT_EQ(0.0, merge_handler_metrics.mergeLatencyTotal.getLast()); + EXPECT_EQ(0, merge_handler_metrics.mergeLatencyTotal.getCount()); +} + +TEST_F(ApplyBucketDiffStateTest, total_latency_is_updated) +{ + state->set_merge_start_time(framework::MilliSecTimer(clock)); + clock.addMilliSecondsToTime(14); + state.reset(); + EXPECT_EQ(14.0, merge_handler_metrics.mergeLatencyTotal.getLast()); + EXPECT_EQ(1, merge_handler_metrics.mergeLatencyTotal.getCount()); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index 97aba76dfac..e976cefbec3 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -1,10 +1,12 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "apply_bucket_diff_state.h" -#include "mergehandler.h" +#include "merge_bucket_info_syncer.h" #include "persistenceutil.h" #include <vespa/document/base/documentid.h> #include <vespa/persistence/spi/result.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/stllike/asciistream.h> using storage::spi::Result; @@ -20,8 +22,10 @@ public: } }; -ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) +ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, RetainGuard&& retain_guard) : _merge_bucket_info_syncer(merge_bucket_info_syncer), + _merge_handler_metrics(merge_handler_metrics), + _start_time(clock), _bucket(bucket), _fail_message(), _failed_flag(), @@ -43,6 +47,10 @@ ApplyBucketDiffState::~ApplyBucketDiffState() _fail_message = e.what(); } } + _merge_handler_metrics.mergeDataWriteLatency.addValue(_start_time.getElapsedTimeAsDouble()); + if (_merge_start_time.has_value()) { + _merge_handler_metrics.mergeLatencyTotal.addValue(_merge_start_time.value().getElapsedTimeAsDouble()); + } if (_promise.has_value()) { _promise.value().set_value(_fail_message); } @@ -115,10 +123,16 @@ ApplyBucketDiffState::set_tracker(std::unique_ptr<MessageTracker>&& tracker) _tracker = std::move(tracker); } +void +ApplyBucketDiffState::set_merge_start_time(const framework::MilliSecTimer& merge_start_time) +{ + _merge_start_time = merge_start_time; +} + std::shared_ptr<ApplyBucketDiffState> -ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard) +ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, RetainGuard&& retain_guard) { - std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard))); + std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, merge_handler_metrics, clock, bucket, std::move(retain_guard))); return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter()); } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 20e380e85a7..0433cd4e108 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -3,6 +3,7 @@ #pragma once #include <vespa/persistence/spi/bucket.h> +#include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/util/retain_guard.h> #include <future> #include <memory> @@ -19,6 +20,7 @@ class ApplyBucketDiffEntryResult; struct MessageSender; class MessageTracker; class MergeBucketInfoSyncer; +struct MergeHandlerMetrics; /* * State of all bucket diff entry spi operation (putAsync or removeAsync) @@ -27,6 +29,8 @@ class MergeBucketInfoSyncer; class ApplyBucketDiffState { class Deleter; const MergeBucketInfoSyncer& _merge_bucket_info_syncer; + MergeHandlerMetrics& _merge_handler_metrics; + framework::MilliSecTimer _start_time; spi::Bucket _bucket; vespalib::string _fail_message; std::atomic_flag _failed_flag; @@ -36,10 +40,11 @@ class ApplyBucketDiffState { std::shared_ptr<api::StorageReply> _delayed_reply; MessageSender* _sender; vespalib::RetainGuard _retain_guard; + std::optional<framework::MilliSecTimer> _merge_start_time; - ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); + ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); public: - static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); + static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard); ~ApplyBucketDiffState(); void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op); void wait(); @@ -50,6 +55,7 @@ public: void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); void set_tracker(std::unique_ptr<MessageTracker>&& tracker); + void set_merge_start_time(const framework::MilliSecTimer& merge_start_time); const spi::Bucket& get_bucket() const noexcept { return _bucket; } }; diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 254a26aa454..ee62da95f55 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -1255,14 +1255,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra _env._nodeIndex, index); } if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) { - framework::MilliSecTimer startTime(_clock); - async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); } - _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue( - startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).", bucket.toString().c_str(), _env._nodeIndex, index); @@ -1365,13 +1362,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa _env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble()); } if (applyDiffHasLocallyNeededData(diff, index)) { - framework::MilliSecTimer startTime(_clock); - async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count)); + async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count)); applyDiffLocally(bucket, diff, index, s->context, async_results); if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) { check_apply_diff_sync(std::move(async_results)); } - _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble()); } else { LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)", bucket.toString().c_str(), @@ -1416,7 +1411,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa // We have sent something on and shouldn't reply now. clearState = false; } else { - _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble()); + if (async_results) { + async_results->set_merge_start_time(s->startTime); + } else { + _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble()); + } } } } else { |