aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2021-11-24 15:51:53 +0100
committerTor Egge <Tor.Egge@online.no>2021-11-24 15:57:02 +0100
commit1f0935b682988a95c29f509d5fcd418dc4ed4b52 (patch)
treef1feef9749ff8a8affb4670d193236cebb6ce37b /storage
parent67ea18c1dca8e096f5a3c00f075c6835fc80624f (diff)
Update operation metrics for delayed or chained merge handler replies.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/apply_bucket_diff_state_test.cpp78
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp18
-rw-r--r--storage/src/vespa/storage/persistence/apply_bucket_diff_state.h6
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp28
4 files changed, 123 insertions, 7 deletions
diff --git a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
index cdf7ac817fa..ec57d775f43 100644
--- a/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
+++ b/storage/src/tests/persistence/apply_bucket_diff_state_test.cpp
@@ -3,6 +3,7 @@
#include <vespa/storage/persistence/apply_bucket_diff_state.h>
#include <vespa/storage/persistence/merge_bucket_info_syncer.h>
#include <vespa/storage/persistence/filestorage/merge_handler_metrics.h>
+#include <vespa/storageapi/message/bucket.h>
#include <vespa/document/base/documentid.h>
#include <vespa/document/bucket/bucketid.h>
#include <vespa/document/bucket/bucketidfactory.h>
@@ -10,6 +11,8 @@
#include <vespa/metrics/metricset.h>
#include <vespa/persistence/spi/result.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
+#include <tests/common/message_sender_stub.h>
+#include <tests/persistence/persistencetestutils.h>
#include <gtest/gtest.h>
using document::DocumentId;
@@ -73,22 +76,25 @@ void push_bad(ApplyBucketDiffState &state)
}
-class ApplyBucketDiffStateTestBase : public ::testing::Test
+class ApplyBucketDiffStateTestBase : public PersistenceTestUtils
{
public:
uint32_t sync_count;
DummyMergeBucketInfoSyncer syncer;
metrics::MetricSet merge_handler_metrics_owner;
MergeHandlerMetrics merge_handler_metrics;
+ FileStorThreadMetrics::Op op_metrics;
framework::defaultimplementation::FakeClock clock;
+ MessageSenderStub message_sender;
MonitoredRefCount monitored_ref_count;
ApplyBucketDiffStateTestBase()
- : ::testing::Test(),
+ : PersistenceTestUtils(),
sync_count(0u),
syncer(sync_count),
merge_handler_metrics_owner("owner", {}, "owner"),
merge_handler_metrics(&merge_handler_metrics_owner),
+ op_metrics("op", "op", &merge_handler_metrics_owner),
clock(),
monitored_ref_count()
{
@@ -99,6 +105,13 @@ public:
std::shared_ptr<ApplyBucketDiffState> make_state() {
return ApplyBucketDiffState::create(syncer, merge_handler_metrics, clock, spi::Bucket(dummy_document_bucket), RetainGuard(monitored_ref_count));
}
+
+ MessageTracker::UP
+ create_tracker(std::shared_ptr<api::StorageMessage> cmd, document::Bucket bucket) {
+ return MessageTracker::createForTesting(framework::MilliSecTimer(clock), getEnv(),
+ message_sender, NoBucketLock::make(bucket), std::move(cmd));
+ }
+
};
ApplyBucketDiffStateTestBase::~ApplyBucketDiffStateTestBase() = default;
@@ -128,8 +141,44 @@ public:
check_failure("Failed put for id::test::1 in Bucket(0x0000000000000010): Result(5, write blocked)");
}
+ void test_delayed_reply(bool failed, bool async_failed, bool chained_reply);
+
};
+void
+ApplyBucketDiffStateTest::test_delayed_reply(bool failed, bool async_failed, bool chained_reply)
+{
+ auto cmd = std::make_shared<api::MergeBucketCommand>(dummy_document_bucket, std::vector<api::MergeBucketCommand::Node>{}, 0);
+ std::shared_ptr<api::StorageReply> reply = cmd->makeReply();
+ auto tracker = create_tracker(cmd, dummy_document_bucket);
+ if (failed) {
+ reply->setResult(api::ReturnCode::Result::INTERNAL_FAILURE);
+ }
+ tracker->setMetric(op_metrics);
+ tracker->setReply(reply);
+ if (chained_reply) {
+ state->set_delayed_reply(std::move(tracker), message_sender, &op_metrics, framework::MilliSecTimer(clock), std::move(reply));
+ } else {
+ state->set_delayed_reply(std::move(tracker), std::move(reply));
+ }
+ clock.addMilliSecondsToTime(16);
+ if (async_failed) {
+ push_bad(*state);
+ }
+ state.reset();
+ if (failed || async_failed) {
+ EXPECT_EQ(0.0, op_metrics.latency.getLast());
+ EXPECT_EQ(0, op_metrics.latency.getCount());
+ EXPECT_EQ(1, op_metrics.failed.getValue());
+ } else {
+ EXPECT_EQ(16.0, op_metrics.latency.getLast());
+ EXPECT_EQ(1, op_metrics.latency.getCount());
+ EXPECT_EQ(0, op_metrics.failed.getValue());
+ }
+ ASSERT_EQ(1, message_sender.replies.size());
+ EXPECT_NE(failed || async_failed, std::dynamic_pointer_cast<api::MergeBucketReply>(message_sender.replies.front())->getResult().success());
+}
+
TEST_F(ApplyBucketDiffStateTest, ok_results_can_be_checked)
{
push_ok(*state);
@@ -203,4 +252,29 @@ TEST_F(ApplyBucketDiffStateTest, total_latency_is_updated)
EXPECT_EQ(1, merge_handler_metrics.mergeLatencyTotal.getCount());
}
+TEST_F(ApplyBucketDiffStateTest, delayed_ok_reply)
+{
+ test_delayed_reply(false, false, false);
+}
+
+TEST_F(ApplyBucketDiffStateTest, delayed_failed_reply)
+{
+ test_delayed_reply(true, false, false);
+}
+
+TEST_F(ApplyBucketDiffStateTest, delayed_ok_chained_reply)
+{
+ test_delayed_reply(false, false, true);
+}
+
+TEST_F(ApplyBucketDiffStateTest, delayed_failed_chained_reply)
+{
+ test_delayed_reply(true, false, true);
+}
+
+TEST_F(ApplyBucketDiffStateTest, delayed_async_failed_reply)
+{
+ test_delayed_reply(false, true, false);
+}
+
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
index e976cefbec3..07823792062 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.cpp
@@ -34,7 +34,10 @@ ApplyBucketDiffState::ApplyBucketDiffState(const MergeBucketInfoSyncer& merge_bu
_tracker(),
_delayed_reply(),
_sender(nullptr),
- _retain_guard(std::move(retain_guard))
+ _op_metrics(nullptr),
+ _op_start_time(),
+ _retain_guard(std::move(retain_guard)),
+ _merge_start_time()
{
}
@@ -59,6 +62,15 @@ ApplyBucketDiffState::~ApplyBucketDiffState()
_delayed_reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, _fail_message));
}
if (_sender) {
+ if (_op_metrics != nullptr) {
+ if (_delayed_reply->getResult().success()) {
+ if (_op_start_time.has_value()) {
+ _op_metrics->latency.addValue(_op_start_time.value().getElapsedTimeAsDouble());
+ }
+ } else {
+ _op_metrics->failed.inc();
+ }
+ }
_sender->sendReply(std::move(_delayed_reply));
} else {
// _tracker->_reply and _delayed_reply points to the same reply.
@@ -110,10 +122,12 @@ ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracke
}
void
-ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply)
+ApplyBucketDiffState::set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, FileStorThreadMetrics::Op* op_metrics, const framework::MilliSecTimer& op_start_time, std::shared_ptr<api::StorageReply>&& delayed_reply)
{
_tracker = std::move(tracker);
_sender = &sender;
+ _op_metrics = op_metrics;
+ _op_start_time = op_start_time;
_delayed_reply = std::move(delayed_reply);
}
diff --git a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
index 0433cd4e108..49625bbf8b5 100644
--- a/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
+++ b/storage/src/vespa/storage/persistence/apply_bucket_diff_state.h
@@ -4,6 +4,7 @@
#include <vespa/persistence/spi/bucket.h>
#include <vespa/storageframework/generic/clock/timer.h>
+#include <vespa/storage/persistence/filestorage/filestormetrics.h>
#include <vespa/vespalib/util/retain_guard.h>
#include <future>
#include <memory>
@@ -20,7 +21,6 @@ class ApplyBucketDiffEntryResult;
struct MessageSender;
class MessageTracker;
class MergeBucketInfoSyncer;
-struct MergeHandlerMetrics;
/*
* State of all bucket diff entry spi operation (putAsync or removeAsync)
@@ -39,6 +39,8 @@ class ApplyBucketDiffState {
std::unique_ptr<MessageTracker> _tracker;
std::shared_ptr<api::StorageReply> _delayed_reply;
MessageSender* _sender;
+ FileStorThreadMetrics::Op* _op_metrics;
+ std::optional<framework::MilliSecTimer> _op_start_time;
vespalib::RetainGuard _retain_guard;
std::optional<framework::MilliSecTimer> _merge_start_time;
@@ -53,7 +55,7 @@ public:
void sync_bucket_info();
std::future<vespalib::string> get_future();
void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, std::shared_ptr<api::StorageReply>&& delayed_reply);
- void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, std::shared_ptr<api::StorageReply>&& delayed_reply);
+ void set_delayed_reply(std::unique_ptr<MessageTracker>&& tracker, MessageSender& sender, FileStorThreadMetrics::Op* op_metrics, const framework::MilliSecTimer& op_start_time, std::shared_ptr<api::StorageReply>&& delayed_reply);
void set_tracker(std::unique_ptr<MessageTracker>&& tracker);
void set_merge_start_time(const framework::MilliSecTimer& merge_start_time);
const spi::Bucket& get_bucket() const noexcept { return _bucket; }
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index ee62da95f55..0c9cecdb6a1 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -104,6 +104,29 @@ void check_apply_diff_sync(std::shared_ptr<ApplyBucketDiffState> async_results)
}
}
+FileStorThreadMetrics::Op *get_op_metrics(FileStorThreadMetrics& metrics, const api::StorageReply &reply) {
+ switch (reply.getType().getId()) {
+ case api::MessageType::MERGEBUCKET_REPLY_ID:
+ return &metrics.mergeBuckets;
+ case api::MessageType::APPLYBUCKETDIFF_REPLY_ID:
+ return &metrics.applyBucketDiff;
+ default:
+ ;
+ }
+ return nullptr;
+}
+
+void update_op_metrics(FileStorThreadMetrics& metrics, const api::StorageReply &reply, const framework::MilliSecTimer& start_time) {
+ auto op_metrics = get_op_metrics(metrics, reply);
+ if (op_metrics) {
+ if (reply.getResult().success()) {
+ op_metrics->latency.addValue(start_time.getElapsedTimeAsDouble());
+ } else {
+ op_metrics->failed.inc();
+ }
+ }
+}
+
} // anonymous namespace
void
@@ -1223,6 +1246,7 @@ MergeHandler::handleGetBucketDiffReply(api::GetBucketDiffReply& reply, MessageSe
}
if (replyToSend.get()) {
replyToSend->setResult(reply.getResult());
+ update_op_metrics(_env._metrics, *replyToSend, s->startTime);
sender.sendReply(replyToSend);
}
}
@@ -1433,7 +1457,8 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
if (async_results && replyToSend) {
replyToSend->setResult(returnCode);
- async_results->set_delayed_reply(std::move(tracker), sender, std::move(replyToSend));
+ auto op_metrics = get_op_metrics(_env._metrics, *replyToSend);
+ async_results->set_delayed_reply(std::move(tracker), sender, op_metrics, s->startTime, std::move(replyToSend));
}
if (clearState) {
_env._fileStorHandler.clearMergeStatus(bucket.getBucket());
@@ -1441,6 +1466,7 @@ MergeHandler::handleApplyBucketDiffReply(api::ApplyBucketDiffReply& reply, Messa
if (replyToSend.get()) {
// Send on
replyToSend->setResult(returnCode);
+ update_op_metrics(_env._metrics, *replyToSend, s->startTime);
sender.sendReply(replyToSend);
}
}