From 9cd956e0ce791ad4947ff0d3a8e21a91191a4ea1 Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Fri, 23 Apr 2021 09:08:47 +0000 Subject: Make DistributorStripe aware of whether it uses legacy mode or not and add asserts. --- storage/src/vespa/storage/distributor/distributor.cpp | 15 +++++++++------ .../src/vespa/storage/distributor/distributor_stripe.cpp | 15 +++++++++++---- .../src/vespa/storage/distributor/distributor_stripe.h | 4 +++- .../storage/distributor/stripe_bucket_db_updater.cpp | 11 +++++++++-- .../vespa/storage/distributor/stripe_bucket_db_updater.h | 4 +++- 5 files changed, 35 insertions(+), 14 deletions(-) diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index eefcf3551d6..ff1fe270750 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -53,7 +53,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _comp_reg(compReg), _metrics(std::make_shared()), _messageSender(messageSender), - _stripe(std::make_unique(compReg, *_metrics, node_identity, threadPool, doneInitHandler, *this)), + _stripe(std::make_unique(compReg, *_metrics, node_identity, threadPool, + doneInitHandler, *this, (num_distributor_stripes == 0))), _stripe_accessor(std::make_unique(*_stripe)), _component(compReg, "distributor"), _bucket_db_updater(), @@ -306,11 +307,13 @@ Distributor::storageDistributionChanged() void Distributor::enableNextDistribution() { - if (_next_distribution && _bucket_db_updater) { - _distribution = _next_distribution; - _next_distribution = std::shared_ptr(); - auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); - _bucket_db_updater->storage_distribution_changed(new_configs); + if (_bucket_db_updater) { + if (_next_distribution) { + _distribution = _next_distribution; + _next_distribution = std::shared_ptr(); + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(_distribution); + _bucket_db_updater->storage_distribution_changed(new_configs); + } } else { _stripe->enableNextDistribution(); } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index b9d141b05a6..57aab1637de 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -38,7 +38,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, const NodeIdentity& node_identity, framework::TickingThreadPool& threadPool, DoneInitializeHandler& doneInitHandler, - ChainedMessageSender& messageSender) + ChainedMessageSender& messageSender, + bool use_legacy_mode) : DistributorStripeInterface(), framework::StatusReporter("distributor", "Distributor"), _clusterStateBundle(lib::ClusterState()), @@ -50,7 +51,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _maintenanceOperationOwner(*this, _component.getClock()), _operation_sequencer(std::make_unique()), _pendingMessageTracker(compReg), - _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg), + _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg, use_legacy_mode), _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg), @@ -79,7 +80,8 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _db_memory_sample_interval(30s), _last_db_memory_sample_time_point(), _inhibited_maintenance_tick_count(0), - _must_send_updated_host_info(false) + _must_send_updated_host_info(false), + _use_legacy_mode(use_legacy_mode) { _bucketDBStatusDelegate.registerStatusPage(); propagateDefaultDistribution(_component.getDistribution()); @@ -387,6 +389,7 @@ void DistributorStripe::invalidate_bucket_spaces_stats() { void DistributorStripe::storage_distribution_changed() { + assert(_use_legacy_mode); if (!_distribution.get() || *_component.getDistribution() != *_distribution) { @@ -468,6 +471,7 @@ DistributorStripe::checkBucketForSplit(document::BucketSpace bucketSpace, void DistributorStripe::enableNextDistribution() { + assert(_use_legacy_mode); if (_nextDistribution.get()) { _distribution = _nextDistribution; propagateDefaultDistribution(_distribution); @@ -492,6 +496,7 @@ DistributorStripe::propagateDefaultDistribution( // Only called when stripe is in rendezvous freeze void DistributorStripe::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { + assert(!_use_legacy_mode); auto default_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::default_space()); auto global_distr = new_configs.get_or_nullptr(document::FixedBucketSpaces::global_space()); assert(default_distr && global_distr); @@ -736,7 +741,9 @@ framework::ThreadWaitInfo DistributorStripe::doCriticalTick(framework::ThreadIndex) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - enableNextDistribution(); + if (_use_legacy_mode) { + enableNextDistribution(); + } enableNextConfig(); fetchStatusRequests(); fetchExternalMessages(); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 77ce510ea54..f0b49659286 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -59,7 +59,8 @@ public: const NodeIdentity& node_identity, framework::TickingThreadPool&, DoneInitializeHandler&, - ChainedMessageSender& messageSender); + ChainedMessageSender& messageSender, + bool use_legacy_mode); ~DistributorStripe() override; @@ -333,6 +334,7 @@ private: std::chrono::steady_clock::time_point _last_db_memory_sample_time_point; size_t _inhibited_maintenance_tick_count; bool _must_send_updated_host_info; + bool _use_legacy_mode; }; } diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp index cf1781e0ae8..61ff11d5ac3 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -31,7 +31,8 @@ StripeBucketDBUpdater::StripeBucketDBUpdater(DistributorStripeInterface& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, - DistributorComponentRegister& compReg) + DistributorComponentRegister& compReg, + bool use_legacy_mode) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), _node_ctx(_distributorComponent), @@ -48,7 +49,8 @@ StripeBucketDBUpdater::StripeBucketDBUpdater(DistributorStripeInterface& owner, _stale_reads_enabled(false), _active_distribution_contexts(), _explicit_transition_read_guard(), - _distribution_context_mutex() + _distribution_context_mutex(), + _use_legacy_mode(use_legacy_mode) { for (auto& elem : _op_ctx.bucket_space_repo()) { _active_distribution_contexts.emplace( @@ -221,6 +223,7 @@ StripeBucketDBUpdater::removeSuperfluousBuckets( const lib::ClusterStateBundle& newState, bool is_distribution_config_change) { + assert(_use_legacy_mode); const bool move_to_read_only_db = shouldDeferStateEnabling(); const char* up_states = _op_ctx.storage_node_up_states(); for (auto& elem : _op_ctx.bucket_space_repo()) { @@ -267,6 +270,7 @@ StripeBucketDBUpdater::remove_superfluous_buckets( const lib::ClusterState& new_state, bool is_distribution_change) { + assert(!_use_legacy_mode); (void)is_distribution_change; // TODO remove if not needed const bool move_to_read_only_db = shouldDeferStateEnabling(); const char* up_states = _op_ctx.storage_node_up_states(); @@ -308,6 +312,7 @@ StripeBucketDBUpdater::merge_entries_into_db(document::BucketSpace bucket_space, const std::unordered_set& outdated_nodes, const std::vector& entries) { + assert(!_use_legacy_mode); auto& s = _op_ctx.bucket_space_repo().get(bucket_space); auto& bucket_db = s.getBucketDatabase(); @@ -461,6 +466,7 @@ bool StripeBucketDBUpdater::onSetSystemState( const std::shared_ptr& cmd) { + assert(_use_legacy_mode); LOG(debug, "Received new cluster state %s", cmd->getSystemState().toString().c_str()); @@ -507,6 +513,7 @@ StripeBucketDBUpdater::onSetSystemState( bool StripeBucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr& cmd) { + assert(_use_legacy_mode); if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { const auto pending_version = _pendingClusterState->clusterStateVersion(); if (pending_version == cmd->version()) { diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index e34d28a69bc..decaa964a59 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -40,7 +40,8 @@ public: DistributorBucketSpaceRepo& bucketSpaceRepo, DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, - DistributorComponentRegister& compReg); + DistributorComponentRegister& compReg, + bool use_legacy_mode); ~StripeBucketDBUpdater() override; void flush(); @@ -279,6 +280,7 @@ private: document::BucketSpace::hash>; DbGuards _explicit_transition_read_guard; mutable std::mutex _distribution_context_mutex; + bool _use_legacy_mode; }; } -- cgit v1.2.3