diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-12 15:58:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-12 15:58:02 +0200 |
commit | f291875df7ba4052a5d403fe62f73607233f71da (patch) | |
tree | 2a5ac626e60f8daf102f50a4a358a58dd346afb5 /storage/src/vespa | |
parent | e8a424601463d91fc4e4f7dce4bd9ff57b87dcb8 (diff) | |
parent | 92b0379305df85fa3f3f0a1e42bb75956080c2ef (diff) |
Merge pull request #17830 from vespa-engine/vekterli/add-initial-multi-stripe-support-to-access-guard
Add initial multi stripe support to access guard [run-systemtest]
Diffstat (limited to 'storage/src/vespa')
3 files changed, 75 insertions, 30 deletions
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index 0c35162f25c..a5adf732824 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -12,7 +12,7 @@ MultiThreadedStripeAccessGuard::MultiThreadedStripeAccessGuard( : _accessor(accessor), _stripe_pool(stripe_pool) { - assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE many more yes yes + assert(_stripe_pool.stripe_count() > 0); _stripe_pool.park_all_threads(); } @@ -22,37 +22,45 @@ MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() { } void MultiThreadedStripeAccessGuard::flush_and_close() { - first_stripe().flush_and_close(); + for_each_stripe([](TickableStripe& stripe) { + stripe.flush_and_close(); + }); } void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { - // TODO STRIPE multiple stripes - first_stripe().update_total_distributor_config(std::move(config)); + for_each_stripe([&](TickableStripe& stripe) { + stripe.update_total_distributor_config(config); + }); } void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { - // TODO STRIPE multiple stripes - first_stripe().update_distribution_config(new_configs); + for_each_stripe([&](TickableStripe& stripe) { + stripe.update_distribution_config(new_configs); + }); } void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) { - // TODO STRIPE multiple stripes - first_stripe().set_pending_cluster_state_bundle(pending_state); + for_each_stripe([&](TickableStripe& stripe) { + stripe.set_pending_cluster_state_bundle(pending_state); + }); } void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() { - // TODO STRIPE multiple stripes - first_stripe().clear_pending_cluster_state_bundle(); + for_each_stripe([](TickableStripe& stripe) { + stripe.clear_pending_cluster_state_bundle(); + }); } void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) { - // TODO STRIPE multiple stripes - first_stripe().enable_cluster_state_bundle(new_state); + for_each_stripe([&](TickableStripe& stripe) { + stripe.enable_cluster_state_bundle(new_state); + }); } void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() { - // TODO STRIPE multiple stripes - first_stripe().notify_distribution_change_enabled(); + for_each_stripe([](TickableStripe& stripe) { + stripe.notify_distribution_change_enabled(); + }); } PotentialDataLossReport @@ -60,8 +68,11 @@ MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace const lib::ClusterState& new_state, bool is_distribution_change) { - // TODO STRIPE multiple stripes - return first_stripe().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); + PotentialDataLossReport report; + for_each_stripe([&](TickableStripe& stripe) { + report.merge(stripe.remove_superfluous_buckets(bucket_space, new_state, is_distribution_change)); + }); + return report; } void @@ -79,28 +90,33 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck } void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() { - // TODO STRIPE multiple stripes - first_stripe().update_read_snapshot_before_db_pruning(); + for_each_stripe([](TickableStripe& stripe) { + stripe.update_read_snapshot_before_db_pruning(); + }); } void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { - // TODO STRIPE multiple stripes - first_stripe().update_read_snapshot_after_db_pruning(new_state); + for_each_stripe([&](TickableStripe& stripe) { + stripe.update_read_snapshot_after_db_pruning(new_state); + }); } void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { - // TODO STRIPE multiple stripes - first_stripe().update_read_snapshot_after_activation(activated_state); + for_each_stripe([&](TickableStripe& stripe) { + stripe.update_read_snapshot_after_activation(activated_state); + }); } void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { - // TODO STRIPE multiple stripes - first_stripe().clear_read_only_bucket_repo_databases(); + for_each_stripe([](TickableStripe& stripe) { + stripe.clear_read_only_bucket_repo_databases(); + }); } void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const { - // TODO STRIPE multiple stripes - first_stripe().report_bucket_db_status(bucket_space, out); + for_each_stripe([&](TickableStripe& stripe) { + stripe.report_bucket_db_status(bucket_space, out); + }); } StripeAccessGuard::PendingOperationStats @@ -110,13 +126,15 @@ MultiThreadedStripeAccessGuard::pending_operation_stats() const { } void MultiThreadedStripeAccessGuard::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { - // TODO STRIPE multiple stripes - first_stripe().report_single_bucket_requests(xos); + for_each_stripe([&](TickableStripe& stripe) { + stripe.report_single_bucket_requests(xos); + }); } void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { - // TODO STRIPE multiple stripes - first_stripe().report_delayed_single_bucket_requests(xos); + for_each_stripe([&](TickableStripe& stripe) { + stripe.report_delayed_single_bucket_requests(xos); + }); } TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { @@ -127,6 +145,20 @@ const TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexc return _stripe_pool.stripe_thread(0).stripe(); } +template <typename Func> +void MultiThreadedStripeAccessGuard::for_each_stripe(Func&& f) { + for (auto& stripe_thread : _stripe_pool) { + f(stripe_thread->stripe()); + } +} + +template <typename Func> +void MultiThreadedStripeAccessGuard::for_each_stripe(Func&& f) const { + for (const auto& stripe_thread : _stripe_pool) { + f(stripe_thread->stripe()); + } +} + std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() { // For sanity checking of invariant of only one guard being allowed at any given time. assert(!_guard_held); diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index ef84a9ac79f..da5fd8e5f37 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -62,6 +62,12 @@ private: // TODO STRIPE remove once multi threaded stripe support is implemented TickableStripe& first_stripe() noexcept; const TickableStripe& first_stripe() const noexcept; + + template <typename Func> + void for_each_stripe(Func&& f); + + template <typename Func> + void for_each_stripe(Func&& f) const; }; /** diff --git a/storage/src/vespa/storage/distributor/potential_data_loss_report.h b/storage/src/vespa/storage/distributor/potential_data_loss_report.h index 96abd787649..a5fb0adae62 100644 --- a/storage/src/vespa/storage/distributor/potential_data_loss_report.h +++ b/storage/src/vespa/storage/distributor/potential_data_loss_report.h @@ -13,6 +13,13 @@ struct PotentialDataLossReport { size_t buckets = 0; size_t documents = 0; + constexpr PotentialDataLossReport() noexcept = default; + + constexpr PotentialDataLossReport(size_t buckets_, size_t documents_) noexcept + : buckets(buckets_), + documents(documents_) + {} + void merge(const PotentialDataLossReport& other) noexcept { buckets += other.buckets; documents += other.documents; |