diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-11 16:17:45 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-11 16:17:45 +0200 |
commit | 839a6f9a7d1f66937f51db3766a2dfd3e7b90675 (patch) | |
tree | c44ef15ce5a0209bbf046a0eb91005829321a8a2 | |
parent | 6308b75159d8c45ec77e48d431412d2aea9646be (diff) | |
parent | 57509991a95da37d119c0a1bc059898ea8d32b9c (diff) |
Merge pull request #17819 from vespa-engine/vekterli/extend-tickable-stripe-interface
Extend TickableStripe interface to avoid direct access to DistributorStripe internals
8 files changed, 224 insertions, 27 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp index 408027eb894..00f6349b218 100644 --- a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp +++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp @@ -48,9 +48,42 @@ struct DistributorStripePoolThreadingTest : Test { } }; +struct MockTickableStripe : TickableStripe { + bool tick() override { abort(); } + void flush_and_close() override { abort(); } + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); } + void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); } + void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); } + void clear_pending_cluster_state_bundle() override { abort(); } + void enable_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); } + void notify_distribution_change_enabled() override { abort(); } + PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override { + abort(); + } + void merge_entries_into_db(document::BucketSpace, + api::Timestamp, + const lib::Distribution&, + const lib::ClusterState&, + const char*, + const std::unordered_set<uint16_t>&, + const std::vector<dbtransition::Entry>&) override + { + abort(); + } + void update_read_snapshot_before_db_pruning() override { abort(); } + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle&) override { abort(); } + void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); } + void clear_read_only_bucket_repo_databases() override { abort(); } + + void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); } + StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); } + void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); } + void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); } +}; + // Optimistic invariant checker that cannot prove correctness, but will hopefully // make tests scream if something is obviously incorrect. -struct ParkingInvariantCheckingMockStripe : TickableStripe { +struct ParkingInvariantCheckingMockStripe : MockTickableStripe { std::atomic<bool>& _is_parked; bool _to_return; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 668930e65ee..1373eb27249 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -961,4 +961,93 @@ DistributorStripe::pending_operation_stats() const return {_operationOwner.size(), _maintenanceOperationOwner.size()}; } +void +DistributorStripe::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) +{ + getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state); +} + +void +DistributorStripe::clear_pending_cluster_state_bundle() +{ + getBucketSpaceRepo().clear_pending_cluster_state_bundle(); +} + +void +DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) +{ + // TODO STRIPE replace legacy func + enableClusterStateBundle(new_state); +} + +void +DistributorStripe::notify_distribution_change_enabled() +{ + // TODO STRIPE replace legacy func + notifyDistributionChangeEnabled(); +} + +PotentialDataLossReport +DistributorStripe::remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) +{ + return bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); +} + +void +DistributorStripe::merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) +{ + bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, entries); +} + +void +DistributorStripe::update_read_snapshot_before_db_pruning() +{ + bucket_db_updater().update_read_snapshot_before_db_pruning(); +} + +void +DistributorStripe::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) +{ + bucket_db_updater().update_read_snapshot_after_db_pruning(new_state); +} + +void +DistributorStripe::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) +{ + bucket_db_updater().update_read_snapshot_after_activation(activated_state); +} + +void +DistributorStripe::clear_read_only_bucket_repo_databases() +{ + bucket_db_updater().clearReadOnlyBucketRepoDatabases(); +} + +void +DistributorStripe::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const +{ + ideal_state_manager().dump_bucket_space_db_status(bucket_space, out); +} + +void +DistributorStripe::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const +{ + bucket_db_updater().report_single_bucket_requests(xos); +} + +void +DistributorStripe::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const +{ + bucket_db_updater().report_delayed_single_bucket_requests(xos); +} + } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 475defbf105..b82b5483bd3 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -71,7 +71,7 @@ public: const ClusterContext& cluster_context() const override { return _component.cluster_context(); } - void flush_and_close(); + void flush_and_close() override; bool handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>&); void send_up_with_tracking(const std::shared_ptr<api::StorageMessage>&); // Bypasses message tracker component. Thread safe. @@ -119,7 +119,7 @@ public: bool handleStatusRequest(const DelegatedStatusRequest& request) const override; - StripeAccessGuard::PendingOperationStats pending_operation_stats() const; + StripeAccessGuard::PendingOperationStats pending_operation_stats() const override; std::string getActiveIdealStateOperations() const; std::string getActiveOperations() const; @@ -255,8 +255,6 @@ private: void enableNextDistribution(); // TODO STRIPE remove once legacy is gone void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone void propagateClusterStates(); - void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs); - void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config); BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const; template <typename NodeFunctor> @@ -265,6 +263,31 @@ private: void send_updated_host_info_if_required(); void propagate_config_snapshot_to_internal_components(); + // Additional implementations of TickableStripe: + void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; + void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; + void clear_pending_cluster_state_bundle() override; + void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override; + void notify_distribution_change_enabled() override; + PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) override; + void merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) override; + void update_read_snapshot_before_db_pruning() override; + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override; + void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override; + void clear_read_only_bucket_repo_databases() override; + void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override; + void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; + void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override; + lib::ClusterStateBundle _clusterStateBundle; std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo; // Read-only bucket space repo with DBs that only contain buckets transiently diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 4ac52b0ede8..72534de57ae 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -49,6 +49,8 @@ class DistributorStripePool { friend class DistributorStripeThread; public: + using const_iterator = StripeVector::const_iterator; + DistributorStripePool(); ~DistributorStripePool(); @@ -59,6 +61,12 @@ public: void start(const std::vector<TickableStripe*>& stripes); void stop_and_join(); + const_iterator begin() const noexcept { return _stripes.begin(); } + const_iterator end() const noexcept { return _stripes.end(); } + + const_iterator cbegin() const noexcept { return _stripes.cbegin(); } + const_iterator cend() const noexcept { return _stripes.cend(); } + void park_all_threads() noexcept; void unpark_all_threads() noexcept; diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index a4a59745e3e..0c35162f25c 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -37,22 +37,22 @@ void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpac void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) { // TODO STRIPE multiple stripes - first_stripe().getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state); + first_stripe().set_pending_cluster_state_bundle(pending_state); } void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() { // TODO STRIPE multiple stripes - first_stripe().getBucketSpaceRepo().clear_pending_cluster_state_bundle(); + first_stripe().clear_pending_cluster_state_bundle(); } void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) { // TODO STRIPE multiple stripes - first_stripe().enableClusterStateBundle(new_state); + first_stripe().enable_cluster_state_bundle(new_state); } void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() { // TODO STRIPE multiple stripes - first_stripe().notifyDistributionChangeEnabled(); + first_stripe().notify_distribution_change_enabled(); } PotentialDataLossReport @@ -61,7 +61,7 @@ MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bool is_distribution_change) { // TODO STRIPE multiple stripes - return first_stripe().bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); + return first_stripe().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); } void @@ -74,33 +74,33 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck const std::vector<dbtransition::Entry>& entries) { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, - new_state, storage_up_states, outdated_nodes, entries); + first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, entries); } void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().update_read_snapshot_before_db_pruning(); + first_stripe().update_read_snapshot_before_db_pruning(); } void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().update_read_snapshot_after_db_pruning(new_state); + first_stripe().update_read_snapshot_after_db_pruning(new_state); } void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().update_read_snapshot_after_activation(activated_state); + first_stripe().update_read_snapshot_after_activation(activated_state); } void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases(); + first_stripe().clear_read_only_bucket_repo_databases(); } void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const { // TODO STRIPE multiple stripes - first_stripe().ideal_state_manager().dump_bucket_space_db_status(bucket_space, out); + first_stripe().report_bucket_db_status(bucket_space, out); } StripeAccessGuard::PendingOperationStats @@ -111,20 +111,20 @@ MultiThreadedStripeAccessGuard::pending_operation_stats() const { void MultiThreadedStripeAccessGuard::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().report_single_bucket_requests(xos); + first_stripe().report_single_bucket_requests(xos); } void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const { // TODO STRIPE multiple stripes - first_stripe().bucket_db_updater().report_delayed_single_bucket_requests(xos); + first_stripe().report_delayed_single_bucket_requests(xos); } -DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { - return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe()); +TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { + return _stripe_pool.stripe_thread(0).stripe(); } -const DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept { - return dynamic_cast<const DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe()); +const TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept { + return _stripe_pool.stripe_thread(0).stripe(); } std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() { diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index a44f069d615..ef84a9ac79f 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -7,7 +7,7 @@ namespace storage::distributor { class MultiThreadedStripeAccessor; class DistributorStripePool; -class DistributorStripe; +class TickableStripe; /** * StripeAccessGuard implementation which provides exclusive access to a set of stripes @@ -60,8 +60,8 @@ public: private: // TODO STRIPE remove once multi threaded stripe support is implemented - DistributorStripe& first_stripe() noexcept; - const DistributorStripe& first_stripe() const noexcept; + TickableStripe& first_stripe() noexcept; + const TickableStripe& first_stripe() const noexcept; }; /** diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index 5f843b8ff33..ad367cb95a0 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -145,7 +145,7 @@ private: friend class DistributorTestUtil; // TODO refactor and rewire to avoid needing this direct meddling - friend class MultiThreadedStripeAccessGuard; + friend class DistributorStripe; // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor // components agree on the currently active cluster state bundle. diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h index 323db627d31..8d077455a48 100644 --- a/storage/src/vespa/storage/distributor/tickable_stripe.h +++ b/storage/src/vespa/storage/distributor/tickable_stripe.h @@ -1,6 +1,18 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "stripe_access_guard.h" // TODO STRIPE break up + +namespace storage::lib { +class ClusterState; +class ClusterStateBundle; +class Distribution; +} + +namespace storage { class DistributorConfiguration; } + +namespace vespalib::xml { class XmlOutputStream; } + namespace storage::distributor { /** @@ -19,6 +31,38 @@ public: // Only used for multi-threaded striped setups. // TODO return an enum indicating type of last processed event? E.g. external, maintenance, none, ... virtual bool tick() = 0; + + virtual void flush_and_close() = 0; + + virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; + + virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; + virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0; + virtual void clear_pending_cluster_state_bundle() = 0; + virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) = 0; + virtual void notify_distribution_change_enabled() = 0; + + virtual PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) = 0; + virtual void merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) = 0; + + virtual void update_read_snapshot_before_db_pruning() = 0; + virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0; + virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0; + virtual void clear_read_only_bucket_repo_databases() = 0; + // Functions used for state reporting + virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0; + virtual StripeAccessGuard::PendingOperationStats pending_operation_stats() const = 0; + virtual void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0; + virtual void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0; + }; } |