summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-12 10:33:52 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-12 13:02:15 +0000
commit92b0379305df85fa3f3f0a1e42bb75956080c2ef (patch)
tree24ec39f4131ce01841010b77ff8996687f92c86f /storage
parent839a6f9a7d1f66937f51db3766a2dfd3e7b90675 (diff)
Add initial multi stripe support to access guard
Still missing functionality for: - Merging bucket entries across stripes - Aggregating pending operation stats across stripes
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/distributor_stripe_pool_test.cpp35
-rw-r--r--storage/src/tests/distributor/mock_tickable_stripe.h42
-rw-r--r--storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp59
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp92
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h6
-rw-r--r--storage/src/vespa/storage/distributor/potential_data_loss_report.h7
7 files changed, 178 insertions, 64 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index d808acc0d3a..f43280a5b44 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -25,6 +25,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
maintenanceschedulertest.cpp
mergelimitertest.cpp
mergeoperationtest.cpp
+ multi_thread_stripe_access_guard_test.cpp
nodeinfotest.cpp
nodemaintenancestatstrackertest.cpp
operation_sequencer_test.cpp
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
index 00f6349b218..fb7c446a781 100644
--- a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
@@ -1,6 +1,6 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "mock_tickable_stripe.h"
#include <vespa/storage/distributor/distributor_stripe_pool.h>
-#include <vespa/storage/distributor/tickable_stripe.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/util/time.h>
#include <atomic>
@@ -48,39 +48,6 @@ struct DistributorStripePoolThreadingTest : Test {
}
};
-struct MockTickableStripe : TickableStripe {
- bool tick() override { abort(); }
- void flush_and_close() override { abort(); }
- void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); }
- void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); }
- void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
- void clear_pending_cluster_state_bundle() override { abort(); }
- void enable_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
- void notify_distribution_change_enabled() override { abort(); }
- PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
- abort();
- }
- void merge_entries_into_db(document::BucketSpace,
- api::Timestamp,
- const lib::Distribution&,
- const lib::ClusterState&,
- const char*,
- const std::unordered_set<uint16_t>&,
- const std::vector<dbtransition::Entry>&) override
- {
- abort();
- }
- void update_read_snapshot_before_db_pruning() override { abort(); }
- void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle&) override { abort(); }
- void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
- void clear_read_only_bucket_repo_databases() override { abort(); }
-
- void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
- StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
- void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
- void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
-};
-
// Optimistic invariant checker that cannot prove correctness, but will hopefully
// make tests scream if something is obviously incorrect.
struct ParkingInvariantCheckingMockStripe : MockTickableStripe {
diff --git a/storage/src/tests/distributor/mock_tickable_stripe.h b/storage/src/tests/distributor/mock_tickable_stripe.h
new file mode 100644
index 00000000000..532bef5e50d
--- /dev/null
+++ b/storage/src/tests/distributor/mock_tickable_stripe.h
@@ -0,0 +1,42 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/distributor/tickable_stripe.h>
+#include <cstdlib>
+
+namespace storage::distributor {
+
+struct MockTickableStripe : TickableStripe {
+ bool tick() override { abort(); }
+ void flush_and_close() override { abort(); }
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); }
+ void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); }
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
+ void clear_pending_cluster_state_bundle() override { abort(); }
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
+ void notify_distribution_change_enabled() override { abort(); }
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
+ abort();
+ }
+ void merge_entries_into_db(document::BucketSpace,
+ api::Timestamp,
+ const lib::Distribution&,
+ const lib::ClusterState&,
+ const char*,
+ const std::unordered_set<uint16_t>&,
+ const std::vector<dbtransition::Entry>&) override
+ {
+ abort();
+ }
+ void update_read_snapshot_before_db_pruning() override { abort(); }
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle&) override { abort(); }
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
+ void clear_read_only_bucket_repo_databases() override { abort(); }
+
+ void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
+ StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
+ void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
+ void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
+};
+
+}
diff --git a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
new file mode 100644
index 00000000000..8513186d1e1
--- /dev/null
+++ b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
@@ -0,0 +1,59 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "mock_tickable_stripe.h"
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
+#include <vespa/storage/distributor/multi_threaded_stripe_access_guard.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct AggregationTestingMockTickableStripe : MockTickableStripe {
+ PotentialDataLossReport report;
+
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
+ return report;
+ }
+
+ bool tick() override {
+ return false;
+ }
+};
+
+struct MultiThreadedStripeAccessGuardTest : Test {
+ DistributorStripePool _pool;
+ MultiThreadedStripeAccessor _accessor;
+ AggregationTestingMockTickableStripe _stripe1;
+ AggregationTestingMockTickableStripe _stripe2;
+ AggregationTestingMockTickableStripe _stripe3;
+
+ MultiThreadedStripeAccessGuardTest()
+ : _pool(),
+ _accessor(_pool)
+ {}
+
+ ~MultiThreadedStripeAccessGuardTest() {
+ _pool.stop_and_join();
+ }
+
+ void start_pool_with_stripes() {
+ _pool.start({{&_stripe1, &_stripe2, &_stripe3}});
+ }
+};
+
+TEST_F(MultiThreadedStripeAccessGuardTest, remove_superfluous_buckets_aggregates_reports_across_stripes) {
+ _stripe1.report = PotentialDataLossReport(20, 100);
+ _stripe2.report = PotentialDataLossReport(5, 200);
+ _stripe3.report = PotentialDataLossReport(7, 350);
+ start_pool_with_stripes();
+
+ auto guard = _accessor.rendezvous_and_hold_all();
+ auto report = guard->remove_superfluous_buckets(document::FixedBucketSpaces::default_space(),
+ lib::ClusterState(), false);
+ EXPECT_EQ(report.buckets, 32);
+ EXPECT_EQ(report.documents, 650);
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index 0c35162f25c..a5adf732824 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -12,7 +12,7 @@ MultiThreadedStripeAccessGuard::MultiThreadedStripeAccessGuard(
: _accessor(accessor),
_stripe_pool(stripe_pool)
{
- assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE many more yes yes
+ assert(_stripe_pool.stripe_count() > 0);
_stripe_pool.park_all_threads();
}
@@ -22,37 +22,45 @@ MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() {
}
void MultiThreadedStripeAccessGuard::flush_and_close() {
- first_stripe().flush_and_close();
+ for_each_stripe([](TickableStripe& stripe) {
+ stripe.flush_and_close();
+ });
}
void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
- // TODO STRIPE multiple stripes
- first_stripe().update_total_distributor_config(std::move(config));
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.update_total_distributor_config(config);
+ });
}
void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
- // TODO STRIPE multiple stripes
- first_stripe().update_distribution_config(new_configs);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.update_distribution_config(new_configs);
+ });
}
void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
- // TODO STRIPE multiple stripes
- first_stripe().set_pending_cluster_state_bundle(pending_state);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.set_pending_cluster_state_bundle(pending_state);
+ });
}
void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() {
- // TODO STRIPE multiple stripes
- first_stripe().clear_pending_cluster_state_bundle();
+ for_each_stripe([](TickableStripe& stripe) {
+ stripe.clear_pending_cluster_state_bundle();
+ });
}
void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
- // TODO STRIPE multiple stripes
- first_stripe().enable_cluster_state_bundle(new_state);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.enable_cluster_state_bundle(new_state);
+ });
}
void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() {
- // TODO STRIPE multiple stripes
- first_stripe().notify_distribution_change_enabled();
+ for_each_stripe([](TickableStripe& stripe) {
+ stripe.notify_distribution_change_enabled();
+ });
}
PotentialDataLossReport
@@ -60,8 +68,11 @@ MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace
const lib::ClusterState& new_state,
bool is_distribution_change)
{
- // TODO STRIPE multiple stripes
- return first_stripe().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+ PotentialDataLossReport report;
+ for_each_stripe([&](TickableStripe& stripe) {
+ report.merge(stripe.remove_superfluous_buckets(bucket_space, new_state, is_distribution_change));
+ });
+ return report;
}
void
@@ -79,28 +90,33 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {
- // TODO STRIPE multiple stripes
- first_stripe().update_read_snapshot_before_db_pruning();
+ for_each_stripe([](TickableStripe& stripe) {
+ stripe.update_read_snapshot_before_db_pruning();
+ });
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
- // TODO STRIPE multiple stripes
- first_stripe().update_read_snapshot_after_db_pruning(new_state);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.update_read_snapshot_after_db_pruning(new_state);
+ });
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
- // TODO STRIPE multiple stripes
- first_stripe().update_read_snapshot_after_activation(activated_state);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.update_read_snapshot_after_activation(activated_state);
+ });
}
void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
- // TODO STRIPE multiple stripes
- first_stripe().clear_read_only_bucket_repo_databases();
+ for_each_stripe([](TickableStripe& stripe) {
+ stripe.clear_read_only_bucket_repo_databases();
+ });
}
void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const {
- // TODO STRIPE multiple stripes
- first_stripe().report_bucket_db_status(bucket_space, out);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.report_bucket_db_status(bucket_space, out);
+ });
}
StripeAccessGuard::PendingOperationStats
@@ -110,13 +126,15 @@ MultiThreadedStripeAccessGuard::pending_operation_stats() const {
}
void MultiThreadedStripeAccessGuard::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const {
- // TODO STRIPE multiple stripes
- first_stripe().report_single_bucket_requests(xos);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.report_single_bucket_requests(xos);
+ });
}
void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const {
- // TODO STRIPE multiple stripes
- first_stripe().report_delayed_single_bucket_requests(xos);
+ for_each_stripe([&](TickableStripe& stripe) {
+ stripe.report_delayed_single_bucket_requests(xos);
+ });
}
TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
@@ -127,6 +145,20 @@ const TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexc
return _stripe_pool.stripe_thread(0).stripe();
}
+template <typename Func>
+void MultiThreadedStripeAccessGuard::for_each_stripe(Func&& f) {
+ for (auto& stripe_thread : _stripe_pool) {
+ f(stripe_thread->stripe());
+ }
+}
+
+template <typename Func>
+void MultiThreadedStripeAccessGuard::for_each_stripe(Func&& f) const {
+ for (const auto& stripe_thread : _stripe_pool) {
+ f(stripe_thread->stripe());
+ }
+}
+
std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
// For sanity checking of invariant of only one guard being allowed at any given time.
assert(!_guard_held);
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index ef84a9ac79f..da5fd8e5f37 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -62,6 +62,12 @@ private:
// TODO STRIPE remove once multi threaded stripe support is implemented
TickableStripe& first_stripe() noexcept;
const TickableStripe& first_stripe() const noexcept;
+
+ template <typename Func>
+ void for_each_stripe(Func&& f);
+
+ template <typename Func>
+ void for_each_stripe(Func&& f) const;
};
/**
diff --git a/storage/src/vespa/storage/distributor/potential_data_loss_report.h b/storage/src/vespa/storage/distributor/potential_data_loss_report.h
index 96abd787649..a5fb0adae62 100644
--- a/storage/src/vespa/storage/distributor/potential_data_loss_report.h
+++ b/storage/src/vespa/storage/distributor/potential_data_loss_report.h
@@ -13,6 +13,13 @@ struct PotentialDataLossReport {
size_t buckets = 0;
size_t documents = 0;
+ constexpr PotentialDataLossReport() noexcept = default;
+
+ constexpr PotentialDataLossReport(size_t buckets_, size_t documents_) noexcept
+ : buckets(buckets_),
+ documents(documents_)
+ {}
+
void merge(const PotentialDataLossReport& other) noexcept {
buckets += other.buckets;
documents += other.documents;