diff options
author | Tor Egge <Tor.Egge@online.no> | 2021-11-24 15:51:53 +0100 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2021-11-24 15:57:02 +0100 |
commit | 1f0935b682988a95c29f509d5fcd418dc4ed4b52 (patch) | |
tree | f1feef9749ff8a8affb4670d193236cebb6ce37b /storage | |
parent | 67ea18c1dca8e096f5a3c00f075c6835fc80624f (diff) |
Update operation metrics for delayed or chained merge handler replies.
Diffstat (limited to 'storage')
4 files changed, 123 insertions, 7 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp index cdf7ac817fa..ec57d775f43 100644 --- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp +++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp @@ -3,6 +3,7 @@ #include <vespa/storage/persistence/apply_bucket_diff_state.h> #include <vespa/storage/persistence/merge_bucket_info_syncer.h> #include <vespa/storage/persistence/filestorage/merge_handler_metrics.h> +#include <vespa/storageapi/message/bucket.h> #include <vespa/document/base/documentid.h> #include <vespa/document/bucket/bucketid.h> #include <vespa/document/bucket/bucketidfactory.h> @@ -10,6 +11,8 @@ #include <vespa/metrics/metricset.h> #include <vespa/persistence/spi/result.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> +#include <tests/common/message_sender_stub.h> +#include <tests/persistence/persistencetestutils.h> #include <gtest/gtest.h> using document::DocumentId; @@ -73,22 +76,25 @@ void push_bad(ApplyBucketDiffState &state) } -class ApplyBucketDiffStateTestBase : public ::testing::Test +class ApplyBucketDiffStateTestBase : public PersistenceTestUtils { public: uint32_t sync_count; DummyMergeBucketInfoSyncer syncer; metrics::MetricSet merge_handler_metrics_owner; MergeHandlerMetrics merge_handler_metrics; + FileStorThreadMetrics::Op op_metrics; framework::defaultimplementation::FakeClock clock; + MessageSenderStub message_sender; MonitoredRefCount monitored_ref_count; ApplyBucketDiffStateTestBase() - : ::testing::Test(), + : PersistenceTestUtils(), sync_count(0u), syncer(sync_count), merge_handler_metrics_owner("owner", {}, "owner"), merge_handler_metrics(&merge_handler_metrics_owner), + op_metrics("op", "op", &merge_handler_metrics_owner), clock(), monitored_ref_count() { @@ -99,6 +105,13 @@ public: std::shared_ptr<ApplyBucketDiffState> make_state() { return ApplyBucketDiffState::create(syncer, merge_handler_metrics, clock, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count)); } + + MessageTracker::UP + create_tracker(std::shared_ptr<api::StorageMessage> cmd, document::Bucket bucket) { + return MessageTracker::createForTesting(framework::MilliSecTimer(clock), getEnv(), + message_sender, NoBucketLock::make(bucket), std::move(cmd)); + } + }; ApplyBucketDiffStateTestBase::~ApplyBucketDiffStateTestBase() = default; @@ -128,8 +141,44 @@ public: check_failure("Failed put for id::test::1 in Bucket(0x0000000000000010): Result(5, write blocked)"); } + void test_delayed_reply(bool failed, bool async_failed, bool chained_reply); + }; +void +ApplyBucketDiffStateTest::test_delayed_reply(bool failed, bool async_failed, bool chained_reply) +{ + auto cmd = std::make_shared<api::MergeBucketCommand>(dummy_document_bucket, std::vector<api::MergeBucketCommand::Node>{}, 0); + std::shared_ptr<api::StorageReply> reply = cmd->makeReply(); + auto tracker = create_tracker(cmd, dummy_document_bucket); + if (failed) { + reply->setResult(api::ReturnCode::Result::INTERNAL_FAILURE); + } + tracker->setMetric(op_metrics); + tracker->setReply(reply); + if (chained_reply) { + state->set_delayed_reply(std::move(tracker), message_sender, &op_metrics, framework::MilliSecTimer(clock), std::move(reply)); + } else { + state->set_delayed_reply(std::move(tracker), std::move(reply)); + } + clock.addMilliSecondsToTime(16); + if (async_failed) { + push_bad(*state); + } + state.reset(); + if (failed || async_failed) { + EXPECT_EQ(0.0, op_metrics.latency.getLast()); + EXPECT_EQ(0, op_metrics.latency.getCount()); + EXPECT_EQ(1, op_metrics.failed.getValue()); + } else { + EXPECT_EQ(16.0, op_metrics.latency.getLast()); + EXPECT_EQ(1, op_metrics.latency.getCount()); + EXPECT_EQ(0, op_metrics.failed.getValue()); + } + ASSERT_EQ(1, message_sender.replies.size()); + EXPECT_NE(failed || async_failed, std::dynamic_pointer_cast<api::MergeBucketReply>(message_sender.replies.front())->getResult().success()); +} + TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked) { push_ok(*state); @@ -203,4 +252,29 @@ TEST_F(ApplyBucketDiffStateTest, total_latency_is_updated) EXPECT_EQ(1, merge_handler_metrics.mergeLatencyTotal.getCount()); } +TEST_F(ApplyBucketDiffStateTest, delayed_ok_reply) +{ + test_delayed_reply(false, false, false); +} + +TEST_F(ApplyBucketDiffStateTest, delayed_failed_reply) +{ + test_delayed_reply(true, false, false); +} + +TEST_F(ApplyBucketDiffStateTest, delayed_ok_chained_reply) +{ + test_delayed_reply(false, false, true); +} + +TEST_F(ApplyBucketDiffStateTest, delayed_failed_chained_reply) +{ + test_delayed_reply(true, false, true); +} + +TEST_F(ApplyBucketDiffStateTest, delayed_async_failed_reply) +{ + test_delayed_reply(false, true, false); +} + } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp index e976cefbec3..07823792062 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp @@ -34,7 +34,10 @@ ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bu _tracker(), _delayed_reply(), _sender(nullptr), - _retain_guard(std::move(retain_guard)) + _op_metrics(nullptr), + _op_start_time(), + _retain_guard(std::move(retain_guard)), + _merge_start_time() { } @@ -59,6 +62,15 @@ ApplyBucketDiffState::~ApplyBucketDiffState() _delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message)); } if (_sender) { + if (_op_metrics != nullptr) { + if (_delayed_reply->getResult().success()) { + if (_op_start_time.has_value()) { + _op_metrics->latency.addValue(_op_start_time.value().getElapsedTimeAsDouble()); + } + } else { + _op_metrics->failed.inc(); + } + } _sender->sendReply(std::move(_delayed_reply)); } else { // _tracker->_reply and _delayed_reply points to the same reply. @@ -110,10 +122,12 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke } void -ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply) +ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, FileStorThreadMetrics::Op* op_metrics, const framework::MilliSecTimer& op_start_time, std::shared_ptr<api::StorageReply>&& delayed_reply) { _tracker = std::move(tracker); _sender = &sender; + _op_metrics = op_metrics; + _op_start_time = op_start_time; _delayed_reply = std::move(delayed_reply); } diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h index 0433cd4e108..49625bbf8b5 100644 --- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h +++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h @@ -4,6 +4,7 @@ #include <vespa/persistence/spi/bucket.h> #include <vespa/storageframework/generic/clock/timer.h> +#include <vespa/storage/persistence/filestorage/filestormetrics.h> #include <vespa/vespalib/util/retain_guard.h> #include <future> #include <memory> @@ -20,7 +21,6 @@ class ApplyBucketDiffEntryResult; struct MessageSender; class MessageTracker; class MergeBucketInfoSyncer; -struct MergeHandlerMetrics; /* * State of all bucket diff entry spi operation (putAsync or removeAsync) @@ -39,6 +39,8 @@ class ApplyBucketDiffState { std::unique_ptr<MessageTracker> _tracker; std::shared_ptr<api::StorageReply> _delayed_reply; MessageSender* _sender; + FileStorThreadMetrics::Op* _op_metrics; + std::optional<framework::MilliSecTimer> _op_start_time; vespalib::RetainGuard _retain_guard; std::optional<framework::MilliSecTimer> _merge_start_time; @@ -53,7 +55,7 @@ public: void sync_bucket_info(); std::future<vespalib::string> get_future(); void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply); - void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply); + void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, FileStorThreadMetrics::Op* op_metrics, const framework::MilliSecTimer& op_start_time, std::shared_ptr<api::StorageReply>&& delayed_reply); void set_tracker(std::unique_ptr<MessageTracker>&& tracker); void set_merge_start_time(const framework::MilliSecTimer& merge_start_time); const spi::Bucket& get_bucket() const noexcept { return _bucket; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index ee62da95f55..0c9cecdb6a1 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -104,6 +104,29 @@ void check_apply_diff_sync(std::shared_ptr<ApplyBucketDiffState> async_results) } } +FileStorThreadMetrics::Op *get_op_metrics(FileStorThreadMetrics& metrics, const api::StorageReply &reply) { + switch (reply.getType().getId()) { + case api::MessageType::MERGEBUCKET_REPLY_ID: + return &metrics.mergeBuckets; + case api::MessageType::APPLYBUCKETDIFF_REPLY_ID: + return &metrics.applyBucketDiff; + default: + ; + } + return nullptr; +} + +void update_op_metrics(FileStorThreadMetrics& metrics, const api::StorageReply &reply, const framework::MilliSecTimer& start_time) { + auto op_metrics = get_op_metrics(metrics, reply); + if (op_metrics) { + if (reply.getResult().success()) { + op_metrics->latency.addValue(start_time.getElapsedTimeAsDouble()); + } else { + op_metrics->failed.inc(); + } + } +} + } // anonymous namespace void @@ -1223,6 +1246,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe } if (replyToSend.get()) { replyToSend->setResult(reply.getResult()); + update_op_metrics(_env._metrics, *replyToSend, s->startTime); sender.sendReply(replyToSend); } } @@ -1433,7 +1457,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa if (async_results && replyToSend) { replyToSend->setResult(returnCode); - async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend)); + auto op_metrics = get_op_metrics(_env._metrics, *replyToSend); + async_results->set_delayed_reply(std::move(tracker), sender, op_metrics, s->startTime, std::move(replyToSend)); } if (clearState) { _env._fileStorHandler.clearMergeStatus(bucket.getBucket()); @@ -1441,6 +1466,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa if (replyToSend.get()) { // Send on replyToSend->setResult(returnCode); + update_op_metrics(_env._metrics, *replyToSend, s->startTime); sender.sendReply(replyToSend); } } |