aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp82
-rw-r--r--storage/src/tests/persistence/CMakeLists.txt1
-rw-r--r--storage/src/tests/persistence/active_operations_stats_test.cpp150
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.cpp16
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.h22
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/active_operations_stats.cpp133
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/active_operations_stats.h45
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h2
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp64
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h10
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp3
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormetrics.h4
15 files changed, 508 insertions, 49 deletions
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
index 3ed5e9f4a8d..1632867b627 100644
--- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
+++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp
@@ -2548,6 +2548,55 @@ TEST_F(TopLevelBucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_onl
}
}
+TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
+ lib::ClusterState state("distributor:1 storage:3");
+ set_cluster_state(state);
+ uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
+
+ // Known feature sets are initially empty.
+ auto stripes = distributor_stripes();
+ for (auto* s : stripes) {
+ for (uint16_t i : {0, 1, 2}) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
+ }
+ }
+
+ ASSERT_EQ(expected_msgs, _sender.commands().size());
+ for (uint32_t i = 0; i < _sender.commands().size(); i++) {
+ ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
+ dummy_buckets_to_return, [i](auto& reply) noexcept {
+ // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
+ // Node 0 does not support the fanciness.
+ if (i > 0) {
+ reply.supported_node_features().unordered_merge_chaining = true;
+ }
+ }));
+ }
+
+ // Node features should be propagated to all stripes
+ for (auto* s : stripes) {
+ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
+ EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
+ }
+}
+
+TEST_F(TopLevelBucketDBUpdaterTest, outdated_bucket_info_reply_is_ignored) {
+ set_cluster_state("version:1 distributor:1 storage:1");
+ ASSERT_EQ(message_count(1), _sender.commands().size());
+ auto req = std::dynamic_pointer_cast<api::RequestBucketInfoCommand>(_sender.commands().front());
+ _sender.clear();
+ // Force a new pending cluster state which overwrites the pending one.
+ lib::ClusterState new_state("version:2 distributor:1 storage:2");
+ set_cluster_state(new_state);
+
+ const api::StorageMessageAddress& address(*req->getAddress());
+ bool handled = bucket_db_updater().onRequestBucketInfoReply(
+ make_fake_bucket_reply(new_state, *req, address.getIndex(), 0, 0));
+ EXPECT_TRUE(handled); // Should be returned as handled even though it's technically ignored.
+}
+
+
struct BucketDBUpdaterSnapshotTest : TopLevelBucketDBUpdaterTest {
lib::ClusterState empty_state;
std::shared_ptr<lib::ClusterState> initial_baseline;
@@ -2678,37 +2727,4 @@ TEST_F(BucketDBUpdaterSnapshotTest, snapshot_is_unroutable_if_stale_reads_disabl
EXPECT_FALSE(def_rs.is_routable());
}
-TEST_F(BucketDBUpdaterSnapshotTest, node_feature_sets_are_aggregated_from_nodes_and_propagated_to_stripes) {
- lib::ClusterState state("distributor:1 storage:3");
- set_cluster_state(state);
- uint32_t expected_msgs = message_count(3), dummy_buckets_to_return = 1;
-
- // Known feature sets are initially empty.
- auto stripes = distributor_stripes();
- for (auto* s : stripes) {
- for (uint16_t i : {0, 1, 2}) {
- EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining);
- }
- }
-
- ASSERT_EQ(expected_msgs, _sender.commands().size());
- for (uint32_t i = 0; i < _sender.commands().size(); i++) {
- ASSERT_NO_FATAL_FAILURE(fake_bucket_reply(state, *_sender.command(i),
- dummy_buckets_to_return, [i](auto& reply) noexcept {
- // Pretend nodes 1 and 2 are on a shiny version with unordered merge chaining supported.
- // Node 0 does not support the fanciness.
- if (i > 0) {
- reply.supported_node_features().unordered_merge_chaining = true;
- }
- }));
- }
-
- // Node features should be propagated to all stripes
- for (auto* s : stripes) {
- EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining);
- EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining);
- EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining);
- }
-}
-
}
diff --git a/storage/src/tests/persistence/CMakeLists.txt b/storage/src/tests/persistence/CMakeLists.txt
index f0deec90aae..7b165e11b66 100644
--- a/storage/src/tests/persistence/CMakeLists.txt
+++ b/storage/src/tests/persistence/CMakeLists.txt
@@ -2,6 +2,7 @@
vespa_add_executable(storage_persistence_gtest_runner_app TEST
SOURCES
+ active_operations_stats_test.cpp
apply_bucket_diff_state_test.cpp
bucketownershipnotifiertest.cpp
has_mask_remapper_test.cpp
diff --git a/storage/src/tests/persistence/active_operations_stats_test.cpp b/storage/src/tests/persistence/active_operations_stats_test.cpp
new file mode 100644
index 00000000000..a5dd3d929db
--- /dev/null
+++ b/storage/src/tests/persistence/active_operations_stats_test.cpp
@@ -0,0 +1,150 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/persistence/dummyimpl/dummypersistence.h>
+#include <tests/persistence/common/filestortestfixture.h>
+#include <tests/persistence/filestorage/forwardingmessagesender.h>
+#include <vespa/storage/persistence/filestorage/filestormetrics.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/metrics/updatehook.h>
+
+using document::test::makeDocumentBucket;
+
+namespace storage {
+
+class ActiveOperationsStatsTest : public FileStorTestFixture
+{
+protected:
+ DummyStorageLink top;
+ std::unique_ptr<DummyStorageLink> dummyManager;
+ ForwardingMessageSender messageSender;
+ FileStorMetrics metrics;
+ std::unique_ptr<FileStorHandler> filestorHandler;
+ uint32_t stripeId;
+
+public:
+ ActiveOperationsStatsTest();
+ ~ActiveOperationsStatsTest() override;
+ std::shared_ptr<api::StorageMessage> createPut(uint64_t bucket, uint64_t docIdx);
+ std::shared_ptr<api::StorageMessage> createGet(uint64_t bucket) const;
+
+ void assert_active_operations_stats(const ActiveOperationsStats &stats, uint32_t exp_active_size, uint32_t exp_size_samples, uint32_t exp_latency_samples);
+ void update_metrics();
+ void test_active_operations_stats();
+};
+
+ActiveOperationsStatsTest::ActiveOperationsStatsTest()
+ : FileStorTestFixture(),
+ top(),
+ dummyManager(std::make_unique<DummyStorageLink>()),
+ messageSender(*dummyManager),
+ metrics(),
+ stripeId(0)
+{
+ setupPersistenceThreads(1);
+ _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo()));
+ top.push_back(std::move(dummyManager));
+ top.open();
+ metrics.initDiskMetrics(1, 1);
+ filestorHandler = std::make_unique<FileStorHandlerImpl>(messageSender, metrics,
+ _node->getComponentRegister());
+ filestorHandler->setGetNextMessageTimeout(20ms);
+}
+
+ActiveOperationsStatsTest::~ActiveOperationsStatsTest() = default;
+
+std::shared_ptr<api::StorageMessage>
+ActiveOperationsStatsTest::createPut(uint64_t bucket, uint64_t docIdx)
+{
+ auto doc = _node->getTestDocMan().createDocument(
+ "foobar", vespalib::make_string("id:foo:testdoctype1:n=%" PRIu64 ":%" PRIu64, bucket, docIdx));
+ auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(document::BucketId(16, bucket)), std::move(doc), 1234);
+ cmd->setAddress(makeSelfAddress());
+ return cmd;
+}
+
+std::shared_ptr<api::StorageMessage>
+ActiveOperationsStatsTest::createGet(uint64_t bucket) const
+{
+ auto cmd = std::make_shared<api::GetCommand>(
+ makeDocumentBucket(document::BucketId(16, bucket)),
+ document::DocumentId(vespalib::make_string("id:foo:testdoctype1:n=%" PRIu64 ":0", bucket)), document::AllFields::NAME);
+ cmd->setAddress(makeSelfAddress());
+ return cmd;
+}
+
+void
+ActiveOperationsStatsTest::assert_active_operations_stats(const ActiveOperationsStats &stats, uint32_t exp_active_size, uint32_t exp_size_samples, uint32_t exp_latency_samples)
+{
+ EXPECT_EQ(exp_active_size, stats.get_active_size());
+ EXPECT_EQ(exp_size_samples, stats.get_size_samples());
+ EXPECT_EQ(exp_latency_samples, stats.get_latency_samples());
+}
+
+void
+ActiveOperationsStatsTest::update_metrics()
+{
+ std::mutex dummy_lock;
+ auto &impl = dynamic_cast<FileStorHandlerImpl&>(*filestorHandler);
+ auto& hook = impl.get_metric_update_hook_for_testing();
+ hook.updateMetrics(metrics::MetricLockGuard(dummy_lock));
+}
+
+void
+ActiveOperationsStatsTest::test_active_operations_stats()
+{
+ auto lock0 = filestorHandler->getNextMessage(stripeId);
+ auto lock1 = filestorHandler->getNextMessage(stripeId);
+ auto lock2 = filestorHandler->getNextMessage(stripeId);
+ ASSERT_TRUE(lock0.first);
+ ASSERT_TRUE(lock1.first);
+ ASSERT_FALSE(lock2.first);
+ auto stats = filestorHandler->get_active_operations_stats(false);
+ {
+ SCOPED_TRACE("during");
+ assert_active_operations_stats(stats, 2, 2, 0);
+ }
+ EXPECT_EQ(3, stats.get_total_size());
+ lock0.first.reset();
+ lock1.first.reset();
+ stats = filestorHandler->get_active_operations_stats(false);
+ {
+ SCOPED_TRACE("after");
+ assert_active_operations_stats(stats, 0, 4, 2);
+ }
+ EXPECT_EQ(4, stats.get_total_size());
+ EXPECT_LT(0.0, stats.get_total_latency());
+ update_metrics();
+ auto &ao_metrics = metrics.disk->active_operations;
+ EXPECT_DOUBLE_EQ(1.0, ao_metrics.size.getAverage());
+ EXPECT_DOUBLE_EQ(0.0, ao_metrics.size.getMinimum());
+ EXPECT_DOUBLE_EQ(2.0, ao_metrics.size.getMaximum());
+ EXPECT_DOUBLE_EQ(4.0, ao_metrics.size.getCount());
+ EXPECT_LT(0.0, ao_metrics.latency.getAverage());
+ EXPECT_LT(0.0, ao_metrics.latency.getMinimum());
+ EXPECT_LT(0.0, ao_metrics.latency.getMaximum());
+ EXPECT_DOUBLE_EQ(2.0, ao_metrics.latency.getCount());
+}
+
+TEST_F(ActiveOperationsStatsTest, empty_stats)
+{
+ auto stats = filestorHandler->get_active_operations_stats(false);
+ assert_active_operations_stats(stats, 0, 0, 0);
+}
+
+TEST_F(ActiveOperationsStatsTest, exclusive_lock_active_operations_stats)
+{
+ filestorHandler->schedule(createPut(1234, 0));
+ filestorHandler->schedule(createPut(1234, 1));
+ filestorHandler->schedule(createPut(5432, 0));
+ test_active_operations_stats();
+}
+
+TEST_F(ActiveOperationsStatsTest, shared_lock_active_operations_stats)
+{
+ filestorHandler->schedule(createGet(1234));
+ filestorHandler->schedule(createGet(1234));
+ test_active_operations_stats();
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
index 613f0f6ce09..16be7733c1a 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.cpp
@@ -311,14 +311,12 @@ bool
TopLevelBucketDBUpdater::onRequestBucketInfoReply(
const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
- if (pending_cluster_state_accepted(repl)) {
- return true;
- }
- return false;
+ attempt_accept_reply_by_current_pending_state(repl);
+ return true;
}
-bool
-TopLevelBucketDBUpdater::pending_cluster_state_accepted(
+void
+TopLevelBucketDBUpdater::attempt_accept_reply_by_current_pending_state(
const std::shared_ptr<api::RequestBucketInfoReply>& repl)
{
if (_pending_cluster_state.get()
@@ -328,11 +326,14 @@ TopLevelBucketDBUpdater::pending_cluster_state_accepted(
auto guard = _stripe_accessor.rendezvous_and_hold_all();
process_completed_pending_cluster_state(*guard);
}
- return true;
+ } else {
+ // Reply is not recognized, so its corresponding command must have been
+ // sent by a previous, preempted cluster state. We must still swallow the
+ // reply to prevent it from being passed further down a storage chain that
+ // does not expect it.
+ LOG(spam, "Reply %s was not accepted by pending cluster state",
+ repl->toString().c_str());
}
- LOG(spam, "Reply %s was not accepted by pending cluster state",
- repl->toString().c_str());
- return false;
}
void
diff --git a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
index b1065e708a4..d8e49d5c383 100644
--- a/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/top_level_bucket_db_updater.h
@@ -85,7 +85,7 @@ private:
bool should_defer_state_enabling() const noexcept;
bool has_pending_cluster_state() const;
- bool pending_cluster_state_accepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
+ void attempt_accept_reply_by_current_pending_state(const std::shared_ptr<api::RequestBucketInfoReply>& repl);
bool is_pending_cluster_state_completed() const;
void process_completed_pending_cluster_state(StripeAccessGuard& guard);
void activate_pending_cluster_state(StripeAccessGuard& guard);
diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
index b23ec142448..62d1a80501a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
@@ -1,6 +1,8 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_filestorpersistence OBJECT
SOURCES
+ active_operations_metrics.cpp
+ active_operations_stats.cpp
filestorhandlerimpl.cpp
filestormanager.cpp
filestormetrics.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.cpp b/storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.cpp
new file mode 100644
index 00000000000..b48ef5bf463
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.cpp
@@ -0,0 +1,16 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "active_operations_metrics.h"
+
+namespace storage {
+
+ActiveOperationsMetrics::ActiveOperationsMetrics(metrics::MetricSet* parent)
+ : MetricSet("active_operations", {}, "metrics for active operations at service layer", parent),
+ size("size", {}, "Number of concurrent active operations", this),
+ latency("latency", {}, "Latency (in ms) for active operations", this)
+{
+}
+
+ActiveOperationsMetrics::~ActiveOperationsMetrics() = default;
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.h b/storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.h
new file mode 100644
index 00000000000..94856d70f9e
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/active_operations_metrics.h
@@ -0,0 +1,22 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/metrics/metricset.h>
+#include <vespa/metrics/valuemetric.h>
+
+namespace storage {
+
+/*
+ * Metrics for active operations with bucket lock at service layer.
+ */
+struct ActiveOperationsMetrics : public metrics::MetricSet
+{
+ metrics::DoubleAverageMetric size;
+ metrics::DoubleAverageMetric latency;
+
+ ActiveOperationsMetrics(metrics::MetricSet* parent);
+ ~ActiveOperationsMetrics() override;
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/active_operations_stats.cpp b/storage/src/vespa/storage/persistence/filestorage/active_operations_stats.cpp
new file mode 100644
index 00000000000..bd7468971d4
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/active_operations_stats.cpp
@@ -0,0 +1,133 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "active_operations_stats.h"
+
+namespace storage {
+
+namespace {
+
+template <typename T>
+void update_min_max(T value, std::optional<T>& min, std::optional<T>& max)
+{
+ if (!min.has_value() || value < min.value()) {
+ min = value;
+ }
+ if (!max.has_value() || value > max.value()) {
+ max = value;
+ }
+}
+
+template <typename T>
+void merge_min(std::optional<T>& min, const std::optional<T>& rhs_min)
+{
+ if (!rhs_min.has_value()) {
+ return;
+ }
+ if (min.has_value() && !(rhs_min.value() < min.value())) {
+ return;
+ }
+ min = rhs_min;
+}
+
+template <typename T>
+void merge_max(std::optional<T>& max, const std::optional<T>& rhs_max)
+{
+ if (!rhs_max.has_value()) {
+ return;
+ }
+ if (max.has_value() && !(rhs_max.value() > max.value())) {
+ return;
+ }
+ max = rhs_max;
+}
+
+template <typename T>
+void merge_min_max_sum(std::optional<T>& lhs, const std::optional<T>& rhs)
+{
+ if (!rhs.has_value()) {
+ return;
+ }
+ if (lhs.has_value()) {
+ lhs = lhs.value() + rhs.value();
+ return;
+ }
+ lhs = rhs;
+}
+
+}
+
+ActiveOperationsStats::ActiveOperationsStats() noexcept
+ : _size_samples(0u),
+ _total_size(0u),
+ _active_size(0u),
+ _min_size(),
+ _max_size(),
+ _latency_samples(0u),
+ _total_latency(0.0),
+ _min_latency(),
+ _max_latency()
+{
+}
+
+ActiveOperationsStats::~ActiveOperationsStats() = default;
+
+
+void
+ActiveOperationsStats::update_size() noexcept
+{
+ ++_size_samples;
+ _total_size += _active_size;
+ update_min_max(_active_size, _min_size, _max_size);
+}
+
+ActiveOperationsStats&
+ActiveOperationsStats::operator-=(const ActiveOperationsStats& rhs) noexcept
+{
+ _size_samples -= rhs._size_samples;
+ _total_size -= rhs._total_size;
+ _latency_samples -= rhs._latency_samples;
+ _total_latency -= rhs._total_latency;
+ return *this;
+}
+
+void
+ActiveOperationsStats::merge(const ActiveOperationsStats& rhs) noexcept
+{
+ _size_samples += rhs._size_samples;
+ _total_size += rhs._total_size;
+ _active_size += rhs._active_size;
+ merge_min_max_sum(_min_size, rhs._min_size);
+ merge_min_max_sum(_max_size, rhs._max_size);
+ _latency_samples += rhs._latency_samples;
+ _total_latency += rhs._total_latency;
+ merge_min(_min_latency, rhs._min_latency);
+ merge_max(_max_latency, rhs._max_latency);
+}
+
+void
+ActiveOperationsStats::operation_started() noexcept
+{
+ ++_active_size;
+ update_size();
+}
+
+void
+ActiveOperationsStats::operation_done(double latency) noexcept
+{
+ --_active_size;
+ update_size();
+ ++_latency_samples;
+ _total_latency += latency;
+ update_min_max(latency, _min_latency, _max_latency);
+}
+
+void
+ActiveOperationsStats::reset_min_max() noexcept
+{
+ _min_size.reset();
+ _max_size.reset();
+ _min_latency.reset();
+ _max_latency.reset();
+}
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/active_operations_stats.h b/storage/src/vespa/storage/persistence/filestorage/active_operations_stats.h
new file mode 100644
index 00000000000..bdf4e87b1f5
--- /dev/null
+++ b/storage/src/vespa/storage/persistence/filestorage/active_operations_stats.h
@@ -0,0 +1,45 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <cstdint>
+#include <optional>
+
+namespace storage {
+
+/*
+ * Stats for active operations at service layer
+ */
+class ActiveOperationsStats
+{
+ uint64_t _size_samples;
+ uint64_t _total_size;
+ uint32_t _active_size;
+ std::optional<uint32_t> _min_size;
+ std::optional<uint32_t> _max_size;
+ uint64_t _latency_samples;
+ double _total_latency;
+ std::optional<double> _min_latency;
+ std::optional<double> _max_latency;
+
+ void update_size() noexcept;
+public:
+ ActiveOperationsStats() noexcept;
+ ~ActiveOperationsStats();
+ ActiveOperationsStats& operator-=(const ActiveOperationsStats& rhs) noexcept;
+ void merge(const ActiveOperationsStats& rhs) noexcept;
+ void operation_started() noexcept;
+ void operation_done(double latency) noexcept;
+ void reset_min_max() noexcept;
+ uint64_t get_size_samples() const noexcept { return _size_samples; }
+ uint64_t get_latency_samples() const noexcept { return _latency_samples; }
+ uint64_t get_total_size() const noexcept { return _total_size; }
+ uint32_t get_active_size() const noexcept { return _active_size; }
+ double get_total_latency() const noexcept { return _total_latency; }
+ const std::optional<uint32_t>& get_min_size() const noexcept { return _min_size; }
+ const std::optional<uint32_t>& get_max_size() const noexcept { return _max_size; }
+ const std::optional<double>& get_min_latency() const noexcept { return _min_latency; }
+ const std::optional<double>& get_max_latency() const noexcept { return _max_latency; }
+};
+
+}
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index 70ed9845cb0..0d05fd21ce2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -29,6 +29,7 @@ namespace framework {
class HttpUrlPath;
}
+class ActiveOperationsStats;
class FileStorHandlerImpl;
struct FileStorMetrics;
struct MessageSender;
@@ -233,6 +234,7 @@ public:
virtual std::string dumpQueue() const = 0;
+ virtual ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const = 0;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index e395a7df9e0..f76d2693309 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -55,7 +55,8 @@ FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripe
_bucketIdFactory(_component.getBucketIdFactory()),
_getNextMessageTimeout(100ms),
_max_active_merges_per_stripe(per_stripe_merge_limit(numThreads, numStripes)),
- _paused(false)
+ _paused(false),
+ _last_active_operations_stats()
{
assert(numStripes > 0);
_stripes.reserve(numStripes);
@@ -297,6 +298,33 @@ FileStorHandlerImpl::abortQueuedOperations(const AbortBucketOperationsCommand& c
}
void
+FileStorHandlerImpl::update_active_operations_metrics()
+{
+ auto& metrics = _metrics->active_operations;
+ auto stats = get_active_operations_stats(true);
+ auto& last_stats = _last_active_operations_stats;
+ auto delta_stats = stats;
+ if (last_stats.has_value()) {
+ delta_stats -= last_stats.value();
+ }
+ last_stats = stats;
+ uint32_t size_samples = delta_stats.get_size_samples();
+ if (size_samples != 0) {
+ double min_size = delta_stats.get_min_size().value_or(0);
+ double max_size = delta_stats.get_max_size().value_or(0);
+ double avg_size = ((double) delta_stats.get_total_size()) / size_samples;
+ metrics.size.addValueBatch(avg_size, size_samples, min_size, max_size);
+ }
+ uint32_t latency_samples = delta_stats.get_latency_samples();
+ if (latency_samples != 0) {
+ double min_latency = delta_stats.get_min_latency().value_or(0.0);
+ double max_latency = delta_stats.get_max_latency().value_or(0.0);
+ double avg_latency = delta_stats.get_total_latency() / latency_samples;
+ metrics.latency.addValueBatch(avg_latency, latency_samples, min_latency, max_latency);
+ }
+}
+
+void
FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
{
std::lock_guard lockGuard(_mergeStatesLock);
@@ -307,6 +335,7 @@ FileStorHandlerImpl::updateMetrics(const MetricLockGuard &)
const auto & m = stripe->averageQueueWaitingTime;
_metrics->averageQueueWaitingTime.addTotalValueWithCount(m.getTotal(), m.getCount());
}
+ update_active_operations_metrics();
}
bool
@@ -852,7 +881,8 @@ FileStorHandlerImpl::Stripe::Stripe(const FileStorHandlerImpl & owner, MessageSe
_cond(std::make_unique<std::condition_variable>()),
_queue(std::make_unique<PriorityQueue>()),
_lockedBuckets(),
- _active_merges(0)
+ _active_merges(0),
+ _active_operations_stats()
{}
FileStorHandler::LockedMessage
@@ -1030,6 +1060,7 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
auto iter = _lockedBuckets.find(bucket);
assert(iter != _lockedBuckets.end());
auto& entry = iter->second;
+ Clock::time_point start_time;
if (reqOfReleasedLock == api::LockingRequirements::Exclusive) {
assert(entry._exclusiveLock);
@@ -1038,14 +1069,18 @@ FileStorHandlerImpl::Stripe::release(const document::Bucket & bucket,
assert(_active_merges > 0);
--_active_merges;
}
+ start_time = entry._exclusiveLock.value().timestamp;
entry._exclusiveLock.reset();
} else {
assert(!entry._exclusiveLock);
auto shared_iter = entry._sharedLocks.find(lockMsgId);
assert(shared_iter != entry._sharedLocks.end());
+ start_time = shared_iter->second.timestamp;
entry._sharedLocks.erase(shared_iter);
}
-
+ Clock::time_point now_ts = Clock::now();
+ double latency = std::chrono::duration<double, std::milli>(now_ts - start_time).count();
+ _active_operations_stats.operation_done(latency);
if (!entry._exclusiveLock && entry._sharedLocks.empty()) {
_lockedBuckets.erase(iter); // No more locks held
}
@@ -1070,6 +1105,7 @@ FileStorHandlerImpl::Stripe::lock(const monitor_guard &, const document::Bucket
(void) inserted;
assert(inserted.second);
}
+ _active_operations_stats.operation_started();
}
bool
@@ -1104,6 +1140,17 @@ FileStorHandlerImpl::Stripe::operationIsInhibited(const monitor_guard & guard, c
return isLocked(guard, bucket, msg.lockingRequirements());
}
+ActiveOperationsStats
+FileStorHandlerImpl::Stripe::get_active_operations_stats(bool reset_min_max) const
+{
+ std::lock_guard guard(*_lock);
+ auto result = _active_operations_stats;
+ if (reset_min_max) {
+ _active_operations_stats.reset_min_max();
+ }
+ return result;
+}
+
FileStorHandlerImpl::BucketLock::BucketLock(const monitor_guard & guard, Stripe& stripe,
const document::Bucket &bucket, uint8_t priority,
api::MessageType::Id msgType, api::StorageMessage::Id msgId,
@@ -1243,6 +1290,17 @@ FileStorHandlerImpl::getStatus(std::ostream& out, const framework::HttpUrlPath&
}
}
+ActiveOperationsStats
+FileStorHandlerImpl::get_active_operations_stats(bool reset_min_max) const
+{
+ ActiveOperationsStats result;
+ for (const auto & stripe : _stripes) {
+ auto stats = stripe.get_active_operations_stats(reset_min_max);
+ result.merge(stats);
+ }
+ return result;
+}
+
void
FileStorHandlerImpl::waitUntilNoLocks()
{
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index 5f212b18a7f..3a89ff74f07 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -16,6 +16,7 @@
#pragma once
#include "filestorhandler.h"
+#include "active_operations_stats.h"
#include <vespa/document/bucket/bucketid.h>
#include <vespa/metrics/metrictimer.h>
#include <vespa/storage/common/servicelayercomponent.h>
@@ -136,6 +137,7 @@ public:
PriorityQueue & exposeQueue() { return *_queue; }
BucketIdx & exposeBucketIdx() { return bmi::get<2>(*_queue); }
void setMetrics(FileStorStripeMetrics * metrics) { _metrics = metrics; }
+ ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const;
private:
bool hasActive(monitor_guard & monitor, const AbortBucketOperationsCommand& cmd) const;
FileStorHandler::LockedMessage get_next_async_message(monitor_guard& guard);
@@ -153,6 +155,7 @@ public:
std::unique_ptr<PriorityQueue> _queue;
LockedBuckets _lockedBuckets;
uint32_t _active_merges;
+ mutable ActiveOperationsStats _active_operations_stats;
};
class BucketLock : public FileStorHandler::BucketLockInterface {
@@ -232,6 +235,9 @@ public:
// Implements ResumeGuard::Callback
void resume() override;
+ // Use only for testing
+ framework::MetricUpdateHook& get_metric_update_hook_for_testing() { return *this; }
+
private:
ServiceLayerComponent _component;
std::atomic<DiskState> _state;
@@ -246,6 +252,7 @@ private:
mutable std::mutex _pauseMonitor;
mutable std::condition_variable _pauseCond;
std::atomic<bool> _paused;
+ std::optional<ActiveOperationsStats> _last_active_operations_stats;
// Returns the index in the targets array we are sending to, or -1 if none of them match.
int calculateTargetBasedOnDocId(const api::StorageMessage& msg, std::vector<RemapInfo*>& targets);
@@ -277,6 +284,8 @@ private:
static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg);
static bool messageMayBeAborted(const api::StorageMessage& msg);
+ void update_active_operations_metrics();
+
// Implements framework::MetricUpdateHook
void updateMetrics(const MetricLockGuard &) override;
@@ -322,6 +331,7 @@ private:
return _stripes[stripeId].getNextMessage(timeout);
}
+ ActiveOperationsStats get_active_operations_stats(bool reset_min_max) const override;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
index cb7e141f5db..c119fdc4f69 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.cpp
@@ -206,7 +206,8 @@ FileStorDiskMetrics::FileStorDiskMetrics(const std::string& name, const std::str
waitingForLockHitRate("waitingforlockrate", {},
"Amount of times a filestor thread has needed to wait for "
"lock to take next message in queue.", this),
- lockWaitTime("lockwaittime", {}, "Amount of time waiting used waiting for lock.", this)
+ lockWaitTime("lockwaittime", {}, "Amount of time waiting used waiting for lock.", this),
+ active_operations(this)
{
pendingMerges.unsetOnZeroValue();
waitingForLockHitRate.unsetOnZeroValue();
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
index 8f7f79add00..7543e6e0771 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormetrics.h
@@ -11,6 +11,7 @@
#pragma once
#include "merge_handler_metrics.h"
+#include "active_operations_metrics.h"
#include <vespa/metrics/metricset.h>
#include <vespa/metrics/summetric.h>
@@ -147,7 +148,8 @@ public:
metrics::LongAverageMetric queueSize;
metrics::LongAverageMetric pendingMerges;
metrics::DoubleAverageMetric waitingForLockHitRate;
- metrics::DoubleAverageMetric lockWaitTime;
+ metrics::DoubleAverageMetric lockWaitTime; // unused
+ ActiveOperationsMetrics active_operations;
FileStorDiskMetrics(const std::string& name, const std::string& description, MetricSet* owner);
~FileStorDiskMetrics() override;