summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-11 16:17:45 +0200
committerGitHub <noreply@github.com>2021-05-11 16:17:45 +0200
commit839a6f9a7d1f66937f51db3766a2dfd3e7b90675 (patch)
treec44ef15ce5a0209bbf046a0eb91005829321a8a2
parent6308b75159d8c45ec77e48d431412d2aea9646be (diff)
parent57509991a95da37d119c0a1bc059898ea8d32b9c (diff)
Merge pull request #17819 from vespa-engine/vekterli/extend-tickable-stripe-interface
Extend TickableStripe interface to avoid direct access to DistributorStripe internals
-rw-r--r--storage/src/tests/distributor/distributor_stripe_pool_test.cpp35
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp89
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h31
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h8
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp36
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h6
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h44
8 files changed, 224 insertions, 27 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
index 408027eb894..00f6349b218 100644
--- a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
@@ -48,9 +48,42 @@ struct DistributorStripePoolThreadingTest : Test {
}
};
+struct MockTickableStripe : TickableStripe {
+ bool tick() override { abort(); }
+ void flush_and_close() override { abort(); }
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration>) override { abort(); }
+ void update_distribution_config(const BucketSpaceDistributionConfigs&) override { abort(); }
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
+ void clear_pending_cluster_state_bundle() override { abort(); }
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle&) override { abort(); }
+ void notify_distribution_change_enabled() override { abort(); }
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
+ abort();
+ }
+ void merge_entries_into_db(document::BucketSpace,
+ api::Timestamp,
+ const lib::Distribution&,
+ const lib::ClusterState&,
+ const char*,
+ const std::unordered_set<uint16_t>&,
+ const std::vector<dbtransition::Entry>&) override
+ {
+ abort();
+ }
+ void update_read_snapshot_before_db_pruning() override { abort(); }
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle&) override { abort(); }
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle&) override { abort(); }
+ void clear_read_only_bucket_repo_databases() override { abort(); }
+
+ void report_bucket_db_status(document::BucketSpace, std::ostream&) const override { abort(); }
+ StripeAccessGuard::PendingOperationStats pending_operation_stats() const override { abort(); }
+ void report_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
+ void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream&) const override { abort(); }
+};
+
// Optimistic invariant checker that cannot prove correctness, but will hopefully
// make tests scream if something is obviously incorrect.
-struct ParkingInvariantCheckingMockStripe : TickableStripe {
+struct ParkingInvariantCheckingMockStripe : MockTickableStripe {
std::atomic<bool>& _is_parked;
bool _to_return;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 668930e65ee..1373eb27249 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -961,4 +961,93 @@ DistributorStripe::pending_operation_stats() const
return {_operationOwner.size(), _maintenanceOperationOwner.size()};
}
+void
+DistributorStripe::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state)
+{
+ getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+}
+
+void
+DistributorStripe::clear_pending_cluster_state_bundle()
+{
+ getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+}
+
+void
+DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state)
+{
+ // TODO STRIPE replace legacy func
+ enableClusterStateBundle(new_state);
+}
+
+void
+DistributorStripe::notify_distribution_change_enabled()
+{
+ // TODO STRIPE replace legacy func
+ notifyDistributionChangeEnabled();
+}
+
+PotentialDataLossReport
+DistributorStripe::remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ return bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+}
+
+void
+DistributorStripe::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
+}
+
+void
+DistributorStripe::update_read_snapshot_before_db_pruning()
+{
+ bucket_db_updater().update_read_snapshot_before_db_pruning();
+}
+
+void
+DistributorStripe::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state)
+{
+ bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+}
+
+void
+DistributorStripe::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state)
+{
+ bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+}
+
+void
+DistributorStripe::clear_read_only_bucket_repo_databases()
+{
+ bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+}
+
+void
+DistributorStripe::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const
+{
+ ideal_state_manager().dump_bucket_space_db_status(bucket_space, out);
+}
+
+void
+DistributorStripe::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const
+{
+ bucket_db_updater().report_single_bucket_requests(xos);
+}
+
+void
+DistributorStripe::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const
+{
+ bucket_db_updater().report_delayed_single_bucket_requests(xos);
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index 475defbf105..b82b5483bd3 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -71,7 +71,7 @@ public:
const ClusterContext& cluster_context() const override {
return _component.cluster_context();
}
- void flush_and_close();
+ void flush_and_close() override;
bool handle_or_enqueue_message(const std::shared_ptr<api::StorageMessage>&);
void send_up_with_tracking(const std::shared_ptr<api::StorageMessage>&);
// Bypasses message tracker component. Thread safe.
@@ -119,7 +119,7 @@ public:
bool handleStatusRequest(const DelegatedStatusRequest& request) const override;
- StripeAccessGuard::PendingOperationStats pending_operation_stats() const;
+ StripeAccessGuard::PendingOperationStats pending_operation_stats() const override;
std::string getActiveIdealStateOperations() const;
std::string getActiveOperations() const;
@@ -255,8 +255,6 @@ private:
void enableNextDistribution(); // TODO STRIPE remove once legacy is gone
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); // TODO STRIPE remove once legacy is gone
void propagateClusterStates();
- void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs);
- void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config);
BucketSpacesStatsProvider::BucketSpacesStats make_invalid_stats_per_configured_space() const;
template <typename NodeFunctor>
@@ -265,6 +263,31 @@ private:
void send_updated_host_info_if_required();
void propagate_config_snapshot_to_internal_components();
+ // Additional implementations of TickableStripe:
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
+ void clear_pending_cluster_state_bundle() override;
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override;
+ void notify_distribution_change_enabled() override;
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) override;
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) override;
+ void update_read_snapshot_before_db_pruning() override;
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
+ void clear_read_only_bucket_repo_databases() override;
+ void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const override;
+ void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
+ void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const override;
+
lib::ClusterStateBundle _clusterStateBundle;
std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo;
// Read-only bucket space repo with DBs that only contain buckets transiently
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 4ac52b0ede8..72534de57ae 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -49,6 +49,8 @@ class DistributorStripePool {
friend class DistributorStripeThread;
public:
+ using const_iterator = StripeVector::const_iterator;
+
DistributorStripePool();
~DistributorStripePool();
@@ -59,6 +61,12 @@ public:
void start(const std::vector<TickableStripe*>& stripes);
void stop_and_join();
+ const_iterator begin() const noexcept { return _stripes.begin(); }
+ const_iterator end() const noexcept { return _stripes.end(); }
+
+ const_iterator cbegin() const noexcept { return _stripes.cbegin(); }
+ const_iterator cend() const noexcept { return _stripes.cend(); }
+
void park_all_threads() noexcept;
void unpark_all_threads() noexcept;
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index a4a59745e3e..0c35162f25c 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -37,22 +37,22 @@ void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpac
void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
// TODO STRIPE multiple stripes
- first_stripe().getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+ first_stripe().set_pending_cluster_state_bundle(pending_state);
}
void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() {
// TODO STRIPE multiple stripes
- first_stripe().getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+ first_stripe().clear_pending_cluster_state_bundle();
}
void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
// TODO STRIPE multiple stripes
- first_stripe().enableClusterStateBundle(new_state);
+ first_stripe().enable_cluster_state_bundle(new_state);
}
void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() {
// TODO STRIPE multiple stripes
- first_stripe().notifyDistributionChangeEnabled();
+ first_stripe().notify_distribution_change_enabled();
}
PotentialDataLossReport
@@ -61,7 +61,7 @@ MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace
bool is_distribution_change)
{
// TODO STRIPE multiple stripes
- return first_stripe().bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+ return first_stripe().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
}
void
@@ -74,33 +74,33 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck
const std::vector<dbtransition::Entry>& entries)
{
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
- new_state, storage_up_states, outdated_nodes, entries);
+ first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().update_read_snapshot_before_db_pruning();
+ first_stripe().update_read_snapshot_before_db_pruning();
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+ first_stripe().update_read_snapshot_after_db_pruning(new_state);
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+ first_stripe().update_read_snapshot_after_activation(activated_state);
}
void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+ first_stripe().clear_read_only_bucket_repo_databases();
}
void MultiThreadedStripeAccessGuard::report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const {
// TODO STRIPE multiple stripes
- first_stripe().ideal_state_manager().dump_bucket_space_db_status(bucket_space, out);
+ first_stripe().report_bucket_db_status(bucket_space, out);
}
StripeAccessGuard::PendingOperationStats
@@ -111,20 +111,20 @@ MultiThreadedStripeAccessGuard::pending_operation_stats() const {
void MultiThreadedStripeAccessGuard::report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().report_single_bucket_requests(xos);
+ first_stripe().report_single_bucket_requests(xos);
}
void MultiThreadedStripeAccessGuard::report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const {
// TODO STRIPE multiple stripes
- first_stripe().bucket_db_updater().report_delayed_single_bucket_requests(xos);
+ first_stripe().report_delayed_single_bucket_requests(xos);
}
-DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
- return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe());
+TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
+ return _stripe_pool.stripe_thread(0).stripe();
}
-const DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept {
- return dynamic_cast<const DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe());
+const TickableStripe& MultiThreadedStripeAccessGuard::first_stripe() const noexcept {
+ return _stripe_pool.stripe_thread(0).stripe();
}
std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
index a44f069d615..ef84a9ac79f 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -7,7 +7,7 @@ namespace storage::distributor {
class MultiThreadedStripeAccessor;
class DistributorStripePool;
-class DistributorStripe;
+class TickableStripe;
/**
* StripeAccessGuard implementation which provides exclusive access to a set of stripes
@@ -60,8 +60,8 @@ public:
private:
// TODO STRIPE remove once multi threaded stripe support is implemented
- DistributorStripe& first_stripe() noexcept;
- const DistributorStripe& first_stripe() const noexcept;
+ TickableStripe& first_stripe() noexcept;
+ const TickableStripe& first_stripe() const noexcept;
};
/**
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 5f843b8ff33..ad367cb95a0 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -145,7 +145,7 @@ private:
friend class DistributorTestUtil;
// TODO refactor and rewire to avoid needing this direct meddling
- friend class MultiThreadedStripeAccessGuard;
+ friend class DistributorStripe;
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
// components agree on the currently active cluster state bundle.
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
index 323db627d31..8d077455a48 100644
--- a/storage/src/vespa/storage/distributor/tickable_stripe.h
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -1,6 +1,18 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "stripe_access_guard.h" // TODO STRIPE break up
+
+namespace storage::lib {
+class ClusterState;
+class ClusterStateBundle;
+class Distribution;
+}
+
+namespace storage { class DistributorConfiguration; }
+
+namespace vespalib::xml { class XmlOutputStream; }
+
namespace storage::distributor {
/**
@@ -19,6 +31,38 @@ public:
// Only used for multi-threaded striped setups.
// TODO return an enum indicating type of last processed event? E.g. external, maintenance, none, ...
virtual bool tick() = 0;
+
+ virtual void flush_and_close() = 0;
+
+ virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0;
+
+ virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0;
+ virtual void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) = 0;
+ virtual void clear_pending_cluster_state_bundle() = 0;
+ virtual void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void notify_distribution_change_enabled() = 0;
+
+ virtual PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) = 0;
+ virtual void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) = 0;
+
+ virtual void update_read_snapshot_before_db_pruning() = 0;
+ virtual void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) = 0;
+ virtual void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) = 0;
+ virtual void clear_read_only_bucket_repo_databases() = 0;
+ // Functions used for state reporting
+ virtual void report_bucket_db_status(document::BucketSpace bucket_space, std::ostream& out) const = 0;
+ virtual StripeAccessGuard::PendingOperationStats pending_operation_stats() const = 0;
+ virtual void report_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0;
+ virtual void report_delayed_single_bucket_requests(vespalib::xml::XmlOutputStream& xos) const = 0;
+
};
}