diff options
Diffstat (limited to 'storage')
11 files changed, 187 insertions, 43 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 6975a2595ad..2f524f00e0e 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -7,9 +7,11 @@ #include "distributor_bucket_space.h" #include "distributor_status.h" #include "distributor_stripe.h" +#include "distributor_stripe_pool.h" +#include "distributor_stripe_thread.h" #include "distributormetricsset.h" #include "idealstatemetricsset.h" -#include "legacy_single_stripe_accessor.h" +#include "multi_threaded_stripe_access_guard.h" #include "operation_sequencer.h" #include "ownership_transfer_safe_time_point_calculator.h" #include "throttlingoperationstarter.h" @@ -57,7 +59,9 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _use_legacy_mode(num_distributor_stripes == 0), _stripe(std::make_unique<DistributorStripe>(compReg, *_metrics, node_identity, threadPool, doneInitHandler, *this, _use_legacy_mode)), - _stripe_accessor(std::make_unique<LegacySingleStripeAccessor>(*_stripe)), + _stripe_pool(), + _stripes(), + _stripe_accessor(), _component(*this, compReg, "distributor"), _total_config(_component.total_distributor_config_sp()), _bucket_db_updater(), @@ -74,10 +78,13 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _component.registerMetricUpdateHook(_metricUpdateHook, framework::SecondTime(0)); if (!_use_legacy_mode) { LOG(info, "Setting up distributor with %u stripes", num_distributor_stripes); // TODO STRIPE remove once legacy gone + _stripe_pool = std::make_unique<DistributorStripePool>(); + _stripe_accessor = std::make_unique<MultiThreadedStripeAccessor>(*_stripe_pool); _bucket_db_updater = std::make_unique<BucketDBUpdater>(_component, _component, *this, *this, _component.getDistribution(), *_stripe_accessor); + _stripes.emplace_back(std::move(_stripe)); } _hostInfoReporter.enableReporting(config().getEnableHostInfoReporting()); _distributorStatusDelegate.registerStatusPage(); @@ -91,6 +98,20 @@ Distributor::~Distributor() closeNextLink(); } +// TODO STRIPE remove +DistributorStripe& +Distributor::first_stripe() noexcept { + assert(_stripes.size() == 1); + return *_stripes[0]; +} + +// TODO STRIPE remove +const DistributorStripe& +Distributor::first_stripe() const noexcept { + assert(_stripes.size() == 1); + return *_stripes[0]; +} + // TODO STRIPE figure out how to handle inspection functions used by tests when legacy mode no longer exists. // All functions below that assert on _use_legacy_mode are only currently used by tests @@ -217,6 +238,10 @@ Distributor::onOpen() if (_component.getDistributorConfig().startDistributorThread) { _threadPool.addThread(*this); _threadPool.start(_component.getThreadPool()); + if (!_use_legacy_mode) { + std::vector<TickableStripe*> pool_stripes({_stripes[0].get()}); + _stripe_pool->start(pool_stripes); + } } else { LOG(warning, "Not starting distributor thread as it's configured to " "run. Unless you are just running a test tool, this is a " @@ -226,8 +251,17 @@ Distributor::onOpen() void Distributor::onClose() { LOG(debug, "Distributor::onClose invoked"); - _stripe->flush_and_close(); - if (_bucket_db_updater) { + if (_use_legacy_mode) { + _stripe->flush_and_close(); + } else { + { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + guard->flush_and_close(); + } + // TODO STRIPE must ensure no incoming requests can be posted on stripes between close + // and pool stop+join! + _stripe_pool->stop_and_join(); + assert(_bucket_db_updater); _bucket_db_updater->flush(); } } @@ -273,14 +307,25 @@ bool should_be_handled_by_top_level_bucket_db_updater(const api::StorageMessage& bool Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) { - // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread - // regardless of what RPC thread (comm mgr, FRT...) this is called from! - if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*msg)) { - return msg->callHandler(*_bucket_db_updater, msg); - } // TODO STRIPE can we route both requests and responses that are BucketCommand|Reply based on their bucket alone? // that covers most operations already... - return _stripe->handle_or_enqueue_message(msg); + if (_use_legacy_mode) { + return _stripe->handle_or_enqueue_message(msg); + } else { + // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread + // regardless of what RPC thread (comm mgr, FRT...) this is called from! + if (should_be_handled_by_top_level_bucket_db_updater(*msg)) { + return msg->callHandler(*_bucket_db_updater, msg); + } + assert(_stripes.size() == 1); + assert(_stripe_pool->stripe_count() == 1); + // TODO STRIPE correct routing with multiple stripes + bool handled = first_stripe().handle_or_enqueue_message(msg); + if (handled) { + _stripe_pool->stripe_thread(0).notify_event_has_triggered(); + } + return handled; + } } bool @@ -289,6 +334,7 @@ Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) { return reply->callHandler(*_bucket_db_updater, reply); } + assert(_use_legacy_mode); return _stripe->handleReply(reply); } @@ -375,33 +421,61 @@ Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { // TODO STRIPE cannot directly access stripe when not in legacy mode! - _stripe->propagateDefaultDistribution(std::move(distribution)); + if (_use_legacy_mode) { + _stripe->propagateDefaultDistribution(std::move(distribution)); + } else { + // Should only be called at ctor time, at which point the pool is not yet running. + assert(_stripe_pool->stripe_count() == 0); + assert(_stripes.size() == 1); // TODO STRIPE all the stripes yes + auto new_configs = BucketSpaceDistributionConfigs::from_default_distribution(std::move(distribution)); + for (auto& stripe : _stripes) { + stripe->update_distribution_config(new_configs); + } + } } std::unordered_map<uint16_t, uint32_t> Distributor::getMinReplica() const { // TODO STRIPE merged snapshot from all stripes - return _stripe->getMinReplica(); + if (_use_legacy_mode) { + return _stripe->getMinReplica(); + } else { + return first_stripe().getMinReplica(); + } } BucketSpacesStatsProvider::PerNodeBucketSpacesStats Distributor::getBucketSpacesStats() const { // TODO STRIPE merged snapshot from all stripes - return _stripe->getBucketSpacesStats(); + if (_use_legacy_mode) { + return _stripe->getBucketSpacesStats(); + } else { + return first_stripe().getBucketSpacesStats(); + } } SimpleMaintenanceScanner::PendingMaintenanceStats Distributor::pending_maintenance_stats() const { // TODO STRIPE merged snapshot from all stripes - return _stripe->pending_maintenance_stats(); + if (_use_legacy_mode) { + return _stripe->pending_maintenance_stats(); + } else { + return first_stripe().pending_maintenance_stats(); + } } void Distributor::propagateInternalScanMetricsToExternal() { - _stripe->propagateInternalScanMetricsToExternal(); + // TODO STRIPE propagate to all stripes + // TODO STRIPE reconsider metric wiring... + if (_use_legacy_mode) { + _stripe->propagateInternalScanMetricsToExternal(); + } else { + first_stripe().propagateInternalScanMetricsToExternal(); + } } void @@ -420,21 +494,23 @@ Distributor::doCriticalTick(framework::ThreadIndex idx) } // Propagates any new configs down to stripe(s) enableNextConfig(); - // TODO STRIPE only do in legacy mode, use stripe pool ticking otherwise - _stripe->doCriticalTick(idx); - _tickResult.merge(_stripe->_tickResult); + if (_use_legacy_mode) { + _stripe->doCriticalTick(idx); + _tickResult.merge(_stripe->_tickResult); + } return _tickResult; } framework::ThreadWaitInfo Distributor::doNonCriticalTick(framework::ThreadIndex idx) { - if (!_use_legacy_mode) { + if (_use_legacy_mode) { + _stripe->doNonCriticalTick(idx); + _tickResult = _stripe->_tickResult; + } else { _bucket_db_updater->resend_delayed_messages(); + _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; } - // TODO STRIPE stripes need their own thread loops! - _stripe->doNonCriticalTick(idx); - _tickResult = _stripe->_tickResult; return _tickResult; } @@ -463,27 +539,53 @@ Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_c vespalib::string Distributor::getReportContentType(const framework::HttpUrlPath& path) const { - return _stripe->getReportContentType(path); + // This is const thread safe + // TODO STRIPE we should probably do this in the top-level distributor + if (_use_legacy_mode) { + return _stripe->getReportContentType(path); + } else { + return first_stripe().getReportContentType(path); + } } std::string Distributor::getActiveIdealStateOperations() const { - return _stripe->getActiveIdealStateOperations(); + // TODO STRIPE need to aggregate status responses _across_ stripes..! + if (_use_legacy_mode) { + return _stripe->getActiveIdealStateOperations(); + } else { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + return first_stripe().getActiveIdealStateOperations(); + } } bool Distributor::reportStatus(std::ostream& out, const framework::HttpUrlPath& path) const { - return _stripe->reportStatus(out, path); + // TODO STRIPE need to aggregate status responses _across_ stripes..! + if (_use_legacy_mode) { + return _stripe->reportStatus(out, path); + } else { + auto guard = _stripe_accessor->rendezvous_and_hold_all(); + return first_stripe().reportStatus(out, path); + } } bool Distributor::handleStatusRequest(const DelegatedStatusRequest& request) const { // TODO STRIPE need to aggregate status responses _across_ stripes..! - return _stripe->handleStatusRequest(request); + if (_use_legacy_mode) { + return _stripe->handleStatusRequest(request); + } else { + // Can't hold guard here or we'll deadlock by never allowing the thread to process the request + bool handled = first_stripe().handleStatusRequest(request); + // TODO STRIPE wake up stripe thread; handleStatusRequest waits for completion + // (not really needed since this will be removed) + return handled; + } } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index 0420f1b1f22..d9aada78c79 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -40,8 +40,9 @@ class BucketDBUpdater; class DistributorBucketSpaceRepo; class DistributorStatus; class DistributorStripe; +class DistributorStripePool; +class StripeAccessor; class OperationSequencer; -class LegacySingleStripeAccessor; class OwnershipTransferSafeTimePointCalculator; class SimpleMaintenanceScanner; class ThrottlingOperationStarter; @@ -118,6 +119,10 @@ private: friend class DistributorTestUtil; friend class MetricUpdateHook; + // TODO STRIPE remove + DistributorStripe& first_stripe() noexcept; + const DistributorStripe& first_stripe() const noexcept; + void setNodeStateUp(); bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); @@ -171,8 +176,10 @@ private: ChainedMessageSender* _messageSender; const bool _use_legacy_mode; // TODO STRIPE multiple stripes...! This is for proof of concept of wiring. - std::unique_ptr<DistributorStripe> _stripe; - std::unique_ptr<LegacySingleStripeAccessor> _stripe_accessor; + std::unique_ptr<DistributorStripe> _stripe; + std::unique_ptr<DistributorStripePool> _stripe_pool; + std::vector<std::unique_ptr<DistributorStripe>> _stripes; + std::unique_ptr<StripeAccessor> _stripe_accessor; distributor::DistributorComponent _component; std::shared_ptr<const DistributorConfiguration> _total_config; std::unique_ptr<BucketDBUpdater> _bucket_db_updater; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 1f6a5b318fd..e9969b79bd4 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -60,6 +60,7 @@ DistributorStripe::DistributorStripe(DistributorComponentRegister& compReg, _externalOperationHandler(_component, _component, getMetrics(), getMessageSender(), *_operation_sequencer, *this, _component, _idealStateManager, _operationOwner), + _external_message_mutex(), _threadPool(threadPool), _doneInitializeHandler(doneInitHandler), _doneInitializing(false), @@ -168,14 +169,19 @@ DistributorStripe::handle_or_enqueue_message(const std::shared_ptr<api::StorageM if (_externalOperationHandler.try_handle_message_outside_main_thread(msg)) { return true; } - // TODO STRIPE redesign how message queue guarding and wakeup is performed. - // Currently involves a _thread pool global_ lock transitively via tick guard! - framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); MBUS_TRACE(msg->getTrace(), 9, "Distributor: Added to message queue. Thread state: " + _threadPool.getStatus()); - _messageQueue.push_back(msg); - guard.broadcast(); + if (_use_legacy_mode) { + // TODO STRIPE remove + framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); + _messageQueue.push_back(msg); + guard.broadcast(); + } else { + std::lock_guard lock(_external_message_mutex); + _messageQueue.push_back(msg); + // Caller has the responsibility to wake up correct stripe + } return true; } @@ -727,6 +733,7 @@ DistributorStripe::scanNextBucket() void DistributorStripe::send_updated_host_info_if_required() { if (_must_send_updated_host_info) { + // TODO STRIPE how to handle with multiple stripes? _component.getStateUpdater().immediately_send_get_node_state_replies(); _must_send_updated_host_info = false; } @@ -745,10 +752,9 @@ framework::ThreadWaitInfo DistributorStripe::doCriticalTick(framework::ThreadIndex) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; - if (_use_legacy_mode) { - enableNextDistribution(); - enableNextConfig(); - } + assert(_use_legacy_mode); + enableNextDistribution(); + enableNextConfig(); fetchStatusRequests(); fetchExternalMessages(); return _tickResult; @@ -758,6 +764,11 @@ framework::ThreadWaitInfo DistributorStripe::doNonCriticalTick(framework::ThreadIndex) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + if (!_use_legacy_mode) { + std::lock_guard lock(_external_message_mutex); + fetchStatusRequests(); + fetchExternalMessages(); + } handleStatusRequests(); startExternalOperations(); if (initializing()) { @@ -920,14 +931,19 @@ DistributorStripe::reportStatus(std::ostream& out, return true; } +// TODO STRIPE remove this; delegated to top-level Distributor only bool DistributorStripe::handleStatusRequest(const DelegatedStatusRequest& request) const { auto wrappedRequest = std::make_shared<DistributorStatus>(request); - { + if (_use_legacy_mode) { framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); _statusToDo.push_back(wrappedRequest); guard.broadcast(); + } else { + std::lock_guard lock(_external_message_mutex); + _statusToDo.push_back(wrappedRequest); + // FIXME won't be woken up explicitly, but will be processed after 1ms anyway. } wrappedRequest->waitForCompletion(); return true; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index 7b34367cecb..7b0cfc66a1b 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -22,6 +22,7 @@ #include <vespa/storageapi/message/state.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageframework/generic/thread/tickingthread.h> +#include <mutex> #include <queue> #include <unordered_map> @@ -298,6 +299,7 @@ private: std::vector<std::shared_ptr<api::StorageMessage>>, IndirectHigherPriority >; + mutable std::mutex _external_message_mutex; MessageQueue _messageQueue; ClientRequestPriorityQueue _client_request_priority_queue; MessageQueue _fetchedMessages; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 9149482cd5d..4ac52b0ede8 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -62,10 +62,10 @@ public: void park_all_threads() noexcept; void unpark_all_threads() noexcept; - [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept { + [[nodiscard]] const DistributorStripeThread& stripe_thread(size_t idx) const noexcept { return *_stripes[idx]; } - [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept { + [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept { return *_stripes[idx]; } [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h index b02d733895e..60f10889afd 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h @@ -53,6 +53,9 @@ public: TickableStripe* operator->() noexcept { return &_stripe; } const TickableStripe* operator->() const noexcept { return &_stripe; } + + TickableStripe& stripe() noexcept { return _stripe; } + const TickableStripe& stripe() const noexcept { return _stripe; } private: [[nodiscard]] bool should_stop_thread_relaxed() const noexcept { return _should_stop.load(std::memory_order_relaxed); diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp index 0c6c0206608..29b71436fe8 100644 --- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp +++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.cpp @@ -14,6 +14,10 @@ LegacySingleStripeAccessGuard::~LegacySingleStripeAccessGuard() { _accessor.mark_guard_released(); } +void LegacySingleStripeAccessGuard::flush_and_close() { + _stripe.flush_and_close(); +} + void LegacySingleStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { _stripe.update_total_distributor_config(std::move(config)); } diff --git a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h index caf1e397e5b..f119462038e 100644 --- a/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h +++ b/storage/src/vespa/storage/distributor/legacy_single_stripe_accessor.h @@ -21,6 +21,8 @@ public: DistributorStripe& stripe); ~LegacySingleStripeAccessGuard() override; + void flush_and_close() override; + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index 6bc9c03158a..03e47d2cb67 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -21,6 +21,10 @@ MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() { _accessor.mark_guard_released(); } +void MultiThreadedStripeAccessGuard::flush_and_close() { + first_stripe().flush_and_close(); +} + void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { // TODO STRIPE multiple stripes first_stripe().update_total_distributor_config(std::move(config)); @@ -95,7 +99,7 @@ void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { } DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { - return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0)); + return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe_thread(0).stripe()); } std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() { diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h index 376eccd1c4a..03e36c29bba 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -27,6 +27,8 @@ public: DistributorStripePool& stripe_pool); ~MultiThreadedStripeAccessGuard() override; + void flush_and_close() override; + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; @@ -65,7 +67,7 @@ class MultiThreadedStripeAccessor : public StripeAccessor { friend class MultiThreadedStripeAccessGuard; public: - MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool) + explicit MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool) : _stripe_pool(stripe_pool), _guard_held(false) {} diff --git a/storage/src/vespa/storage/distributor/stripe_access_guard.h b/storage/src/vespa/storage/distributor/stripe_access_guard.h index 69aae755dec..1d570cbb3bc 100644 --- a/storage/src/vespa/storage/distributor/stripe_access_guard.h +++ b/storage/src/vespa/storage/distributor/stripe_access_guard.h @@ -28,6 +28,8 @@ class StripeAccessGuard { public: virtual ~StripeAccessGuard() = default; + virtual void flush_and_close() = 0; + virtual void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) = 0; virtual void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) = 0; |