diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-05 08:44:48 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-05 08:52:39 +0000 |
commit | ca7086563beac4bbae4e3d83f72de04a2ccd292c (patch) | |
tree | 4f7a61764327c9b412e0915182d06ae9868654e5 | |
parent | 02a7400a60b05d4d8db5c77def32d681199e0fb8 (diff) |
Run single stripe in its own thread when not using legacy mode
The (currently single) stripe is now run as part of the distributor
stripe pool instead of being transitively invoked by the main thread.
Introduce an explicit message mutex per stripe that is used for
external messages and status requests when not using legacy mode.
Use per-stripe wakeup mechanisms instead of the framework-global
mutex used in the legacy code path.
Additional work remains to bring back a dedicated message run-queue
for the top-level distributor, so this is not yet thread safe for
operations to the main `BucketDBUpdater`.
11 files changed, 187 insertions, 43 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 6975a2595ad..2f524f00e0e 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -7,9 +7,11 @@ #include "distributor_bucket_space.h" #include "distributor_status.h" #include "distributor_stripe.h" +#include "distributor_stripe_pool.h" +#include "distributor_stripe_thread.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" -#include "legacy_single_stripe_accessor.h" +#include "multi_threaded_stripe_access_guard.h" #include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" #include "throttlingoperationstarter.h" @@ -57,7 +59,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler, *this, _use_legacy_mode)), - _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)), + _stripe_pool(), + _stripes(), + _stripe_accessor(), _component(*this, compReg, "distributor"), _total_config(_component.total_distributor_config_sp()), _bucket_db_updater(), @@ -74,10 +78,13 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone + _stripe_pool = std::make_unique<DistributorStripePool>(); + _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool); _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, _component.getDistribution(), *_stripe_accessor); + _stripes.emplace_back(std::move(_stripe)); } _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting()); _distributorStatusDelegate.registerStatusPage(); @@ -91,6 +98,20 @@ Distributor::~Distributor() closeNextLink(); } +// TODO STRIPE remove +DistributorStripe& +Distributor::first_stripe() noexcept { + assert(_stripes.size() == 1); + return *_stripes[0]; +} + +// TODO STRIPE remove +const DistributorStripe& +Distributor::first_stripe() const noexcept { + assert(_stripes.size() == 1); + return *_stripes[0]; +} + // TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists. // All functions below that assert on _use_legacy_mode are only currently used by tests @@ -217,6 +238,10 @@ Distributor::onOpen() if (_component.getDistributorConfig().startDistributorThread) { _threadPool.addThread(*this); _threadPool.start(_component.getThreadPool()); + if (!_use_legacy_mode) { + std::vector<TickableStripe*> pool_stripes({_stripes[0].get()}); + _stripe_pool->start(pool_stripes); + } } else { LOG(warning, "Not starting distributor thread as it's configured to " "run. Unless you are just running a test tool, this is a " @@ -226,8 +251,17 @@ Distributor::onOpen() void Distributor::onClose() { LOG(debug, "Distributor::onClose invoked"); - _stripe->flush_and_close(); - if (_bucket_db_updater) { + if (_use_legacy_mode) { + _stripe->flush_and_close(); + } else { + { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + guard->flush_and_close(); + } + // TODO STRIPE must ensure no incoming requests can be posted on stripes between close + // and pool stop+join! + _stripe_pool->stop_and_join(); + assert(_bucket_db_updater); _bucket_db_updater->flush(); } } @@ -273,14 +307,25 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& bool Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread - // regardless of what RPC thread (comm mgr, FRT...) this is called from! - if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) { - return msg->callHandler(*_bucket_db_updater, msg); - } // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? // that covers most operations already... - return _stripe->handle_or_enqueue_message(msg); + if (_use_legacy_mode) { + return _stripe->handle_or_enqueue_message(msg); + } else { + // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread + // regardless of what RPC thread (comm mgr, FRT...) this is called from! + if (should_be_handled_by_top_level_bucket_db_updater(*msg)) { + return msg->callHandler(*_bucket_db_updater, msg); + } + assert(_stripes.size() == 1); + assert(_stripe_pool->stripe_count() == 1); + // TODO STRIPE correct routing with multiple stripes + bool handled = first_stripe().handle_or_enqueue_message(msg); + if (handled) { + _stripe_pool->stripe_thread(0).notify_event_has_triggered(); + } + return handled; + } } bool @@ -289,6 +334,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) { return reply->callHandler(*_bucket_db_updater, reply); } + assert(_use_legacy_mode); return _stripe->handleReply(reply); } @@ -375,33 +421,61 @@ Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { // TODO STRIPE cannot directly access stripe when not in legacy mode! - _stripe->propagateDefaultDistribution(std::move(distribution)); + if (_use_legacy_mode) { + _stripe->propagateDefaultDistribution(std::move(distribution)); + } else { + // Should only be called at ctor time, at which point the pool is not yet running. + assert(_stripe_pool->stripe_count() == 0); + assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + for (auto& stripe : _stripes) { + stripe->update_distribution_config(new_configs); + } + } } std::unordered_map<uint16_t, uint32_t> Distributor::getMinReplica() const { // TODO STRIPE merged snapshot from all stripes - return _stripe->getMinReplica(); + if (_use_legacy_mode) { + return _stripe->getMinReplica(); + } else { + return first_stripe().getMinReplica(); + } } BucketSpacesStatsProvider::PerNodeBucketSpacesStats Distributor::getBucketSpacesStats() const { // TODO STRIPE merged snapshot from all stripes - return _stripe->getBucketSpacesStats(); + if (_use_legacy_mode) { + return _stripe->getBucketSpacesStats(); + } else { + return first_stripe().getBucketSpacesStats(); + } } SimpleMaintenanceScanner::PendingMaintenanceStats Distributor::pending_maintenance_stats() const { // TODO STRIPE merged snapshot from all stripes - return _stripe->pending_maintenance_stats(); + if (_use_legacy_mode) { + return _stripe->pending_maintenance_stats(); + } else { + return first_stripe().pending_maintenance_stats(); + } } void Distributor::propagateInternalScanMetricsToExternal() { - _stripe->propagateInternalScanMetricsToExternal(); + // TODO STRIPE propagate to all stripes + // TODO STRIPE reconsider metric wiring... + if (_use_legacy_mode) { + _stripe->propagateInternalScanMetricsToExternal(); + } else { + first_stripe().propagateInternalScanMetricsToExternal(); + } } void @@ -420,21 +494,23 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) } // Propagates any new configs down to stripe(s) enableNextConfig(); - // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise - _stripe->doCriticalTick(idx); - _tickResult.merge(_stripe->_tickResult); + if (_use_legacy_mode) { + _stripe->doCriticalTick(idx); + _tickResult.merge(_stripe->_tickResult); + } return _tickResult; } framework::ThreadWaitInfo Distributor::doNonCriticalTick(framework::ThreadIndex idx) { - if (!_use_legacy_mode) { + if (_use_legacy_mode) { + _stripe->doNonCriticalTick(idx); + _tickResult = _stripe->_tickResult; + } else { _bucket_db_updater->resend_delayed_messages(); + _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; } - // TODO STRIPE stripes need their own thread loops! - _stripe->doNonCriticalTick(idx); - _tickResult = _stripe->_tickResult; return _tickResult; } @@ -463,27 +539,53 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c vespalib::string Distributor::getReportContentType(const framework::HttpUrlPath& path) const { - return _stripe->getReportContentType(path); + // This is const thread safe + // TODO STRIPE we should probably do this in the top-level distributor + if (_use_legacy_mode) { + return _stripe->getReportContentType(path); + } else { + return first_stripe().getReportContentType(path); + } } std::string Distributor::getActiveIdealStateOperations() const { - return _stripe->getActiveIdealStateOperations(); + // TODO STRIPE need to aggregate status responses _across_ stripes..! + if (_use_legacy_mode) { + return _stripe->getActiveIdealStateOperations(); + } else { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + return first_stripe().getActiveIdealStateOperations(); + } } bool Distributor::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const { - return _stripe->reportStatus(out, path); + // TODO STRIPE need to aggregate status responses _across_ stripes..! + if (_use_legacy_mode) { + return _stripe->reportStatus(out, path); + } else { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + return first_stripe().reportStatus(out, path); + } } bool Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const { // TODO STRIPE need to aggregate status responses _across_ stripes..! - return _stripe->handleStatusRequest(request); + if (_use_legacy_mode) { + return _stripe->handleStatusRequest(request); + } else { + // Can't hold guard here or we'll deadlock by never allowing the thread to process the request + bool handled = first_stripe().handleStatusRequest(request); + // TODO STRIPE wake up stripe thread; handleStatusRequest waits for completion + // (not really needed since this will be removed) + return handled; + } } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 0420f1b1f22..d9aada78c79 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -40,8 +40,9 @@ class BucketDBUpdater; class DistributorBucketSpaceRepo; class DistributorStatus; class DistributorStripe; +class DistributorStripePool; +class StripeAccessor; class OperationSequencer; -class LegacySingleStripeAccessor; class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; class ThrottlingOperationStarter; @@ -118,6 +119,10 @@ private: friend class DistributorTestUtil; friend class MetricUpdateHook; + // TODO STRIPE remove + DistributorStripe& first_stripe() noexcept; + const DistributorStripe& first_stripe() const noexcept; + void setNodeStateUp(); bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); @@ -171,8 +176,10 @@ private: ChainedMessageSender* _messageSender; const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. - std::unique_ptr<DistributorStripe> _stripe; - std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor; + std::unique_ptr<DistributorStripe> _stripe; + std::unique_ptr<DistributorStripePool> _stripe_pool; + std::vector<std::unique_ptr<DistributorStripe>> _stripes; + std::unique_ptr<StripeAccessor> _stripe_accessor; distributor::DistributorComponent _component; std::shared_ptr<const DistributorConfiguration> _total_config; std::unique_ptr<BucketDBUpdater> _bucket_db_updater; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 1f6a5b318fd..e9969b79bd4 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -60,6 +60,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), *_operation_sequencer, *this, _component, _idealStateManager, _operationOwner), + _external_message_mutex(), _threadPool(threadPool), _doneInitializeHandler(doneInitHandler), _doneInitializing(false), @@ -168,14 +169,19 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { return true; } - // TODO STRIPE redesign how message queue guarding and wakeup is performed. - // Currently involves a _thread pool global_ lock transitively via tick guard! - framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); MBUS_TRACE(msg->getTrace(), 9, "Distributor: Added to message queue. Thread state: " + _threadPool.getStatus()); - _messageQueue.push_back(msg); - guard.broadcast(); + if (_use_legacy_mode) { + // TODO STRIPE remove + framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); + _messageQueue.push_back(msg); + guard.broadcast(); + } else { + std::lock_guard lock(_external_message_mutex); + _messageQueue.push_back(msg); + // Caller has the responsibility to wake up correct stripe + } return true; } @@ -727,6 +733,7 @@ DistributorStripe::scanNextBucket() void DistributorStripe::send_updated_host_info_if_required() { if (_must_send_updated_host_info) { + // TODO STRIPE how to handle with multiple stripes? _component.getStateUpdater().immediately_send_get_node_state_replies(); _must_send_updated_host_info = false; } @@ -745,10 +752,9 @@ framework::ThreadWaitInfo DistributorStripe::doCriticalTick(framework::ThreadIndex) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (_use_legacy_mode) { - enableNextDistribution(); - enableNextConfig(); - } + assert(_use_legacy_mode); + enableNextDistribution(); + enableNextConfig(); fetchStatusRequests(); fetchExternalMessages(); return _tickResult; @@ -758,6 +764,11 @@ framework::ThreadWaitInfo DistributorStripe::doNonCriticalTick(framework::ThreadIndex) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + if (!_use_legacy_mode) { + std::lock_guard lock(_external_message_mutex); + fetchStatusRequests(); + fetchExternalMessages(); + } handleStatusRequests(); startExternalOperations(); if (initializing()) { @@ -920,14 +931,19 @@ DistributorStripe::reportStatus(std::ostream& out, return true; } +// TODO STRIPE remove this; delegated to top-level Distributor only bool DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const { auto wrappedRequest = std::make_shared<DistributorStatus>(request); - { + if (_use_legacy_mode) { framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); _statusToDo.push_back(wrappedRequest); guard.broadcast(); + } else { + std::lock_guard lock(_external_message_mutex); + _statusToDo.push_back(wrappedRequest); + // FIXME won't be woken up explicitly, but will be processed after 1ms anyway. } wrappedRequest->waitForCompletion(); return true; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 7b34367cecb..7b0cfc66a1b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -22,6 +22,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageframework/generic/thread/tickingthread.h> +#include <mutex> #include <queue> #include <unordered_map> @@ -298,6 +299,7 @@ private: std::vector<std::shared_ptr<api::StorageMessage>>, IndirectHigherPriority >; + mutable std::mutex _external_message_mutex; MessageQueue _messageQueue; ClientRequestPriorityQueue _client_request_priority_queue; MessageQueue _fetchedMessages; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 9149482cd5d..4ac52b0ede8 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -62,10 +62,10 @@ public: void park_all_threads() noexcept; void unpark_all_threads() noexcept; - [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept { + [[nodiscard]] const DistributorStripeThread& stripe_thread(size_t idx) const noexcept { return *_stripes[idx]; } - [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept { + [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept { return *_stripes[idx]; } [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h index b02d733895e..60f10889afd 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h @@ -53,6 +53,9 @@ public: TickableStripe* operator->() noexcept { return &_stripe; } const TickableStripe* operator->() const noexcept { return &_stripe; } + + TickableStripe& stripe() noexcept { return _stripe; } + const TickableStripe& stripe() const noexcept { return _stripe; } private: [[nodiscard]] bool should_stop_thread_relaxed() const noexcept { return _should_stop.load(std::memory_order_relaxed); diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp index 0c6c0206608..29b71436fe8 100644 --- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp +++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp @@ -14,6 +14,10 @@ LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() { _accessor.mark_guard_released(); } +void LegacySingleStripeAccessGuard::flush_and_close() { + _stripe.flush_and_close(); +} + void LegacySingleStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { _stripe.update_total_distributor_config(std::move(config)); } diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h index caf1e397e5b..f119462038e 100644 --- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h +++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h @@ -21,6 +21,8 @@ public: DistributorStripe& stripe); ~LegacySingleStripeAccessGuard() override; + void flush_and_close() override; + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index 6bc9c03158a..03e47d2cb67 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -21,6 +21,10 @@ MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() { _accessor.mark_guard_released(); } +void MultiThreadedStripeAccessGuard::flush_and_close() { + first_stripe().flush_and_close(); +} + void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { // TODO STRIPE multiple stripes first_stripe().update_total_distributor_config(std::move(config)); @@ -95,7 +99,7 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { } DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { - return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0)); + return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe()); } std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() { diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index 376eccd1c4a..03e36c29bba 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -27,6 +27,8 @@ public: DistributorStripePool& stripe_pool); ~MultiThreadedStripeAccessGuard() override; + void flush_and_close() override; + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; @@ -65,7 +67,7 @@ class MultiThreadedStripeAccessor : public StripeAccessor { friend class MultiThreadedStripeAccessGuard; public: - MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool) + explicit MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool) : _stripe_pool(stripe_pool), _guard_held(false) {} diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h index 69aae755dec..1d570cbb3bc 100644 --- a/storage/src/vespa/storage/distributor/stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -28,6 +28,8 @@ class StripeAccessGuard { public: virtual ~StripeAccessGuard() = default; + virtual void flush_and_close() = 0; + virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; |