summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-11 12:04:44 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-11 12:04:44 +0000
commit57509991a95da37d119c0a1bc059898ea8d32b9c (patch)
tree7e2bf4e4c61e115b1e19317e26c335c664246cb6
parent4f0fc6d74b24fbc7af2606afc1306cac95bc3704 (diff)
Extend TickableStripe interface to avoid direct access to DistributorStripe internals
Also lets us test guard functionality much more easily since its target is now fully mockable.
-rw-r--r--storage/src/tests/distributor/distributor_stripe_pool_test.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp89
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h31
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h8
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h6
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h44
8 files changed, 224 insertions, 27 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
index 408027eb894..00f6349b218 100644
--- a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
@@ -48,9 +48,42 @@ struct DistributorStripePoolThreadingTest : Test {
}
};
+struct MockTickableStripe : TickableStripe {
+ bool tick() override { abort(); }
+ void flush_and_close() override { abort(); }
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); }
+ void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); }
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
+ void clear_pending_cluster_state_bundle() override { abort(); }
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
+ void notify_distribution_change_enabled() override { abort(); }
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
+ abort();
+ }
+ void merge_entries_into_db(document::BucketSpace,
+ api::Timestamp,
+ const lib::Distribution&,
+ const lib::ClusterState&,
+ const char*,
+ const std::unordered_set<uint16_t>&,
+ const std::vector<dbtransition::Entry>&) override
+ {
+ abort();
+ }
+ void update_read_snapshot_before_db_pruning() override { abort(); }
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle&) override { abort(); }
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
+ void clear_read_only_bucket_repo_databases() override { abort(); }
+
+ void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
+ StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
+ void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
+ void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
+};
+
// Optimistic invariant checker that cannot prove correctness, but will hopefully
// make tests scream if something is obviously incorrect.
-struct ParkingInvariantCheckingMockStripe : TickableStripe {
+struct ParkingInvariantCheckingMockStripe : MockTickableStripe {
std::atomic<bool>& _is_parked;
bool _to_return;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 668930e65ee..1373eb27249 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -961,4 +961,93 @@ DistributorStripe::pending_operation_stats() const
return {_operationOwner.size(), _maintenanceOperationOwner.size()};
}
+void
+DistributorStripe::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state)
+{
+ getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+}
+
+void
+DistributorStripe::clear_pending_cluster_state_bundle()
+{
+ getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+}
+
+void
+DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state)
+{
+ // TODO STRIPE replace legacy func
+ enableClusterStateBundle(new_state);
+}
+
+void
+DistributorStripe::notify_distribution_change_enabled()
+{
+ // TODO STRIPE replace legacy func
+ notifyDistributionChangeEnabled();
+}
+
+PotentialDataLossReport
+DistributorStripe::remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ return bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+}
+
+void
+DistributorStripe::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
+}
+
+void
+DistributorStripe::update_read_snapshot_before_db_pruning()
+{
+ bucket_db_updater().update_read_snapshot_before_db_pruning();
+}
+
+void
+DistributorStripe::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state)
+{
+ bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+}
+
+void
+DistributorStripe::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state)
+{
+ bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+}
+
+void
+DistributorStripe::clear_read_only_bucket_repo_databases()
+{
+ bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+}
+
+void
+DistributorStripe::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const
+{
+ ideal_state_manager().dump_bucket_space_db_status(bucket_space, out);
+}
+
+void
+DistributorStripe::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const
+{
+ bucket_db_updater().report_single_bucket_requests(xos);
+}
+
+void
+DistributorStripe::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const
+{
+ bucket_db_updater().report_delayed_single_bucket_requests(xos);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 475defbf105..b82b5483bd3 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -71,7 +71,7 @@ public:
const ClusterContext& cluster_context() const override {
return _component.cluster_context();
}
- void flush_and_close();
+ void flush_and_close() override;
bool handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>&);
void send_up_with_tracking(const std::shared_ptr<api::StorageMessage>&);
// Bypasses message tracker component. Thread safe.
@@ -119,7 +119,7 @@ public:
bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
- StripeAccessGuard::PendingOperationStats pending_operation_stats() const;
+ StripeAccessGuard::PendingOperationStats pending_operation_stats() const override;
std::string getActiveIdealStateOperations() const;
std::string getActiveOperations() const;
@@ -255,8 +255,6 @@ private:
void enableNextDistribution(); // TODO STRIPE remove once legacy is gone
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone
void propagateClusterStates();
- void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs);
- void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config);
BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
template <typename NodeFunctor>
@@ -265,6 +263,31 @@ private:
void send_updated_host_info_if_required();
void propagate_config_snapshot_to_internal_components();
+ // Additional implementations of TickableStripe:
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
+ void clear_pending_cluster_state_bundle() override;
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override;
+ void notify_distribution_change_enabled() override;
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) override;
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) override;
+ void update_read_snapshot_before_db_pruning() override;
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
+ void clear_read_only_bucket_repo_databases() override;
+ void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override;
+ void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
+ void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
+
lib::ClusterStateBundle _clusterStateBundle;
std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
// Read-only bucket space repo with DBs that only contain buckets transiently
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 4ac52b0ede8..72534de57ae 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -49,6 +49,8 @@ class DistributorStripePool {
friend class DistributorStripeThread;
public:
+ using const_iterator = StripeVector::const_iterator;
+
DistributorStripePool();
~DistributorStripePool();
@@ -59,6 +61,12 @@ public:
void start(const std::vector<TickableStripe*>& stripes);
void stop_and_join();
+ const_iterator begin() const noexcept { return _stripes.begin(); }
+ const_iterator end() const noexcept { return _stripes.end(); }
+
+ const_iterator cbegin() const noexcept { return _stripes.cbegin(); }
+ const_iterator cend() const noexcept { return _stripes.cend(); }
+
void park_all_threads() noexcept;
void unpark_all_threads() noexcept;
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index a4a59745e3e..0c35162f25c 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -37,22 +37,22 @@ void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpac
void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
// TODO STRIPE multiple stripes
- first_stripe().getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+ first_stripe().set_pending_cluster_state_bundle(pending_state);
}
void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() {
// TODO STRIPE multiple stripes
- first_stripe().getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+ first_stripe().clear_pending_cluster_state_bundle();
}
void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
// TODO STRIPE multiple stripes
- first_stripe().enableClusterStateBundle(new_state);
+ first_stripe().enable_cluster_state_bundle(new_state);
}
void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() {
// TODO STRIPE multiple stripes
- first_stripe().notifyDistributionChangeEnabled();
+ first_stripe().notify_distribution_change_enabled();
}
PotentialDataLossReport
@@ -61,7 +61,7 @@ MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace
bool is_distribution_change)
{
// TODO STRIPE multiple stripes
- return first_stripe().bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+ return first_stripe().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
}
void
@@ -74,33 +74,33 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck
const std::vector<dbtransition::Entry>& entries)
{
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
- new_state, storage_up_states, outdated_nodes, entries);
+ first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().update_read_snapshot_before_db_pruning();
+ first_stripe().update_read_snapshot_before_db_pruning();
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+ first_stripe().update_read_snapshot_after_db_pruning(new_state);
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+ first_stripe().update_read_snapshot_after_activation(activated_state);
}
void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+ first_stripe().clear_read_only_bucket_repo_databases();
}
void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const {
// TODO STRIPE multiple stripes
- first_stripe().ideal_state_manager().dump_bucket_space_db_status(bucket_space, out);
+ first_stripe().report_bucket_db_status(bucket_space, out);
}
StripeAccessGuard::PendingOperationStats
@@ -111,20 +111,20 @@ MultiThreadedStripeAccessGuard::pending_operation_stats() const {
void MultiThreadedStripeAccessGuard::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().report_single_bucket_requests(xos);
+ first_stripe().report_single_bucket_requests(xos);
}
void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().report_delayed_single_bucket_requests(xos);
+ first_stripe().report_delayed_single_bucket_requests(xos);
}
-DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
- return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe());
+TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
+ return _stripe_pool.stripe_thread(0).stripe();
}
-const DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept {
- return dynamic_cast<const DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe());
+const TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept {
+ return _stripe_pool.stripe_thread(0).stripe();
}
std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index a44f069d615..ef84a9ac79f 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -7,7 +7,7 @@ namespace storage::distributor {
class MultiThreadedStripeAccessor;
class DistributorStripePool;
-class DistributorStripe;
+class TickableStripe;
/**
* StripeAccessGuard implementation which provides exclusive access to a set of stripes
@@ -60,8 +60,8 @@ public:
private:
// TODO STRIPE remove once multi threaded stripe support is implemented
- DistributorStripe& first_stripe() noexcept;
- const DistributorStripe& first_stripe() const noexcept;
+ TickableStripe& first_stripe() noexcept;
+ const TickableStripe& first_stripe() const noexcept;
};
/**
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 5f843b8ff33..ad367cb95a0 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -145,7 +145,7 @@ private:
friend class DistributorTestUtil;
// TODO refactor and rewire to avoid needing this direct meddling
- friend class MultiThreadedStripeAccessGuard;
+ friend class DistributorStripe;
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
// components agree on the currently active cluster state bundle.
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
index 323db627d31..8d077455a48 100644
--- a/storage/src/vespa/storage/distributor/tickable_stripe.h
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -1,6 +1,18 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "stripe_access_guard.h" // TODO STRIPE break up
+
+namespace storage::lib {
+class ClusterState;
+class ClusterStateBundle;
+class Distribution;
+}
+
+namespace storage { class DistributorConfiguration; }
+
+namespace vespalib::xml { class XmlOutputStream; }
+
namespace storage::distributor {
/**
@@ -19,6 +31,38 @@ public:
// Only used for multi-threaded striped setups.
// TODO return an enum indicating type of last processed event? E.g. external, maintenance, none, ...
virtual bool tick() = 0;
+
+ virtual void flush_and_close() = 0;
+
+ virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0;
+
+ virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;
+ virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0;
+ virtual void clear_pending_cluster_state_bundle() = 0;
+ virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void notify_distribution_change_enabled() = 0;
+
+ virtual PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) = 0;
+ virtual void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) = 0;
+
+ virtual void update_read_snapshot_before_db_pruning() = 0;
+ virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
+ virtual void clear_read_only_bucket_repo_databases() = 0;
+ // Functions used for state reporting
+ virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0;
+ virtual StripeAccessGuard::PendingOperationStats pending_operation_stats() const = 0;
+ virtual void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0;
+ virtual void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0;
+
};
}