summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-11-23 23:14:49 +0100
committerTor Egge <Tor.Egge@online.no>2021-11-23 23:14:49 +0100
commit3af9cb3c0bc51db7baa434c1a6d8354e79af94a2 (patch)
tree97d1e0f34c360fed7581257e0f7290267f8d6b58 /storage
parentd8b60f5658e9305ae29b2ed8442b5fc74cfff29c (diff)
Update merge latency metrics after async writes have completed.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp39
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp22
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h10
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp15
4 files changed, 70 insertions, 16 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index 701e8a80d3a..cdf7ac817fa 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -2,11 +2,14 @@
#include <vespa/storage/persistence/apply_bucket_diff_state.h>
#include <vespa/storage/persistence/merge_bucket_info_syncer.h>
+#include <vespa/storage/persistence/filestorage/merge_handler_metrics.h>
#include <vespa/document/base/documentid.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/metrics/metricset.h>
#include <vespa/persistence/spi/result.h>
+#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
#include <gtest/gtest.h>
using document::DocumentId;
@@ -75,19 +78,26 @@ class ApplyBucketDiffStateTestBase : public ::testing::Test
public:
uint32_t sync_count;
DummyMergeBucketInfoSyncer syncer;
+ metrics::MetricSet merge_handler_metrics_owner;
+ MergeHandlerMetrics merge_handler_metrics;
+ framework::defaultimplementation::FakeClock clock;
MonitoredRefCount monitored_ref_count;
ApplyBucketDiffStateTestBase()
: ::testing::Test(),
sync_count(0u),
- syncer(sync_count)
+ syncer(sync_count),
+ merge_handler_metrics_owner("owner", {}, "owner"),
+ merge_handler_metrics(&merge_handler_metrics_owner),
+ clock(),
+ monitored_ref_count()
{
}
~ApplyBucketDiffStateTestBase();
std::shared_ptr<ApplyBucketDiffState> make_state() {
- return ApplyBucketDiffState::create(syncer, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
+ return ApplyBucketDiffState::create(syncer, merge_handler_metrics, clock, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
};
@@ -168,4 +178,29 @@ TEST_F(ApplyBucketDiffStateTest, failed_sync_bucket_info_is_detected)
check_failure(fail);
}
+TEST_F(ApplyBucketDiffStateTest, data_write_latency_is_updated)
+{
+ clock.addMilliSecondsToTime(10);
+ state.reset();
+ EXPECT_EQ(10.0, merge_handler_metrics.mergeDataWriteLatency.getLast());
+ EXPECT_EQ(1, merge_handler_metrics.mergeDataWriteLatency.getCount());
+}
+
+TEST_F(ApplyBucketDiffStateTest, total_latency_is_not_updated)
+{
+ clock.addMilliSecondsToTime(14);
+ state.reset();
+ EXPECT_EQ(0.0, merge_handler_metrics.mergeLatencyTotal.getLast());
+ EXPECT_EQ(0, merge_handler_metrics.mergeLatencyTotal.getCount());
+}
+
+TEST_F(ApplyBucketDiffStateTest, total_latency_is_updated)
+{
+ state->set_merge_start_time(framework::MilliSecTimer(clock));
+ clock.addMilliSecondsToTime(14);
+ state.reset();
+ EXPECT_EQ(14.0, merge_handler_metrics.mergeLatencyTotal.getLast());
+ EXPECT_EQ(1, merge_handler_metrics.mergeLatencyTotal.getCount());
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index 97aba76dfac..e976cefbec3 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -1,10 +1,12 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "apply_bucket_diff_state.h"
-#include "mergehandler.h"
+#include "merge_bucket_info_syncer.h"
#include "persistenceutil.h"
#include <vespa/document/base/documentid.h>
#include <vespa/persistence/spi/result.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/stllike/asciistream.h>
using storage::spi::Result;
@@ -20,8 +22,10 @@ public:
}
};
-ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
+ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, RetainGuard&& retain_guard)
: _merge_bucket_info_syncer(merge_bucket_info_syncer),
+ _merge_handler_metrics(merge_handler_metrics),
+ _start_time(clock),
_bucket(bucket),
_fail_message(),
_failed_flag(),
@@ -43,6 +47,10 @@ ApplyBucketDiffState::~ApplyBucketDiffState()
_fail_message = e.what();
}
}
+ _merge_handler_metrics.mergeDataWriteLatency.addValue(_start_time.getElapsedTimeAsDouble());
+ if (_merge_start_time.has_value()) {
+ _merge_handler_metrics.mergeLatencyTotal.addValue(_merge_start_time.value().getElapsedTimeAsDouble());
+ }
if (_promise.has_value()) {
_promise.value().set_value(_fail_message);
}
@@ -115,10 +123,16 @@ ApplyBucketDiffState::set_tracker(std::unique_ptr<MessageTracker>&& tracker)
_tracker = std::move(tracker);
}
+void
+ApplyBucketDiffState::set_merge_start_time(const framework::MilliSecTimer& merge_start_time)
+{
+ _merge_start_time = merge_start_time;
+}
+
std::shared_ptr<ApplyBucketDiffState>
-ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, const spi::Bucket& bucket, RetainGuard&& retain_guard)
+ApplyBucketDiffState::create(const MergeBucketInfoSyncer& merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, RetainGuard&& retain_guard)
{
- std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, bucket, std::move(retain_guard)));
+ std::unique_ptr<ApplyBucketDiffState> state(new ApplyBucketDiffState(merge_bucket_info_syncer, merge_handler_metrics, clock, bucket, std::move(retain_guard)));
return std::shared_ptr<ApplyBucketDiffState>(state.release(), Deleter());
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 20e380e85a7..0433cd4e108 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -3,6 +3,7 @@
#pragma once
#include <vespa/persistence/spi/bucket.h>
+#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/util/retain_guard.h>
#include <future>
#include <memory>
@@ -19,6 +20,7 @@ class ApplyBucketDiffEntryResult;
struct MessageSender;
class MessageTracker;
class MergeBucketInfoSyncer;
+struct MergeHandlerMetrics;
/*
* State of all bucket diff entry spi operation (putAsync or removeAsync)
@@ -27,6 +29,8 @@ class MergeBucketInfoSyncer;
class ApplyBucketDiffState {
class Deleter;
const MergeBucketInfoSyncer& _merge_bucket_info_syncer;
+ MergeHandlerMetrics& _merge_handler_metrics;
+ framework::MilliSecTimer _start_time;
spi::Bucket _bucket;
vespalib::string _fail_message;
std::atomic_flag _failed_flag;
@@ -36,10 +40,11 @@ class ApplyBucketDiffState {
std::shared_ptr<api::StorageReply> _delayed_reply;
MessageSender* _sender;
vespalib::RetainGuard _retain_guard;
+ std::optional<framework::MilliSecTimer> _merge_start_time;
- ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
+ ApplyBucketDiffState(const MergeBucketInfoSyncer &merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
public:
- static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
+ static std::shared_ptr<ApplyBucketDiffState> create(const MergeBucketInfoSyncer &merge_bucket_info_syncer, MergeHandlerMetrics& merge_handler_metrics, const framework::Clock& clock, const spi::Bucket& bucket, vespalib::RetainGuard&& retain_guard);
~ApplyBucketDiffState();
void on_entry_complete(std::unique_ptr<storage::spi::Result> result, const document::DocumentId &doc_id, const char *op);
void wait();
@@ -50,6 +55,7 @@ public:
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_tracker(std::unique_ptr<MessageTracker>&& tracker);
+ void set_merge_start_time(const framework::MilliSecTimer& merge_start_time);
const spi::Bucket& get_bucket() const noexcept { return _bucket; }
};
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 254a26aa454..ee62da95f55 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -1255,14 +1255,11 @@ MergeHandler::handleApplyBucketDiff(api::ApplyBucketDiffCommand& cmd, MessageTra
_env._nodeIndex, index);
}
if (applyDiffHasLocallyNeededData(cmd.getDiff(), index)) {
- framework::MilliSecTimer startTime(_clock);
- async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, cmd.getDiff(), index, tracker->context(), async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
}
- _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(
- startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u).",
bucket.toString().c_str(), _env._nodeIndex, index);
@@ -1365,13 +1362,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
_env._metrics.merge_handler_metrics.mergeDataReadLatency.addValue(startTime.getElapsedTimeAsDouble());
}
if (applyDiffHasLocallyNeededData(diff, index)) {
- framework::MilliSecTimer startTime(_clock);
- async_results = ApplyBucketDiffState::create(*this, bucket, RetainGuard(*_monitored_ref_count));
+ async_results = ApplyBucketDiffState::create(*this, _env._metrics.merge_handler_metrics, _clock, bucket, RetainGuard(*_monitored_ref_count));
applyDiffLocally(bucket, diff, index, s->context, async_results);
if (!_async_apply_bucket_diff.load(std::memory_order_relaxed)) {
check_apply_diff_sync(std::move(async_results));
}
- _env._metrics.merge_handler_metrics.mergeDataWriteLatency.addValue(startTime.getElapsedTimeAsDouble());
} else {
LOG(spam, "Merge(%s): Didn't need fetched data on node %u (%u)",
bucket.toString().c_str(),
@@ -1416,7 +1411,11 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
// We have sent something on and shouldn't reply now.
clearState = false;
} else {
- _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble());
+ if (async_results) {
+ async_results->set_merge_start_time(s->startTime);
+ } else {
+ _env._metrics.merge_handler_metrics.mergeLatencyTotal.addValue(s->startTime.getElapsedTimeAsDouble());
+ }
}
}
} else {