diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-05 13:23:06 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2021-05-05 13:23:06 +0000 |
commit | dd1e1988e9462d549513873383294576516c0141 (patch) | |
tree | 32c170d92c0380574a65e392aca7425d01ca9cd3 /storage | |
parent | 5acb5ecef019dea31e337ea419b7787962decff0 (diff) |
Dispatch messages to be handled by BucketDBUpdater to main distributor thread
Required to ensure no race conditions can happen from processing such
messages from arbitrary RPC/CommunicationManager threads.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/distributor/distributor.cpp | 49 | ||||
-rw-r--r-- | storage/src/vespa/storage/distributor/distributor.h | 8 |
2 files changed, 50 insertions, 7 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 2f524f00e0e..cd42b0ecc1b 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -62,6 +62,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _stripe_pool(), _stripes(), _stripe_accessor(), + _message_queue(), + _fetched_messages(), _component(*this, compReg, "distributor"), _total_config(_component.total_distributor_config_sp()), _bucket_db_updater(), @@ -312,10 +314,9 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) if (_use_legacy_mode) { return _stripe->handle_or_enqueue_message(msg); } else { - // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread - // regardless of what RPC thread (comm mgr, FRT...) this is called from! if (should_be_handled_by_top_level_bucket_db_updater(*msg)) { - return msg->callHandler(*_bucket_db_updater, msg); + dispatch_to_main_distributor_thread_queue(msg); + return true; } assert(_stripes.size() == 1); assert(_stripe_pool->stripe_count() == 1); @@ -331,9 +332,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg) bool Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply) { - if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) { - return reply->callHandler(*_bucket_db_updater, reply); - } + // TODO STRIPE this is used by tests. Do we need to invoke top-level BucketDBUpdater for any of them? assert(_use_legacy_mode); return _stripe->handleReply(reply); } @@ -485,12 +484,47 @@ Distributor::scanAllBuckets() _stripe->scanAllBuckets(); } +void +Distributor::dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg) +{ + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Added to main thread message queue"); + framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks()); + _message_queue.emplace_back(msg); + guard.broadcast(); +} + +void +Distributor::fetch_external_messages() +{ + assert(!_use_legacy_mode); + assert(_fetched_messages.empty()); + _fetched_messages.swap(_message_queue); +} + +void +Distributor::process_fetched_external_messages() +{ + assert(!_use_legacy_mode); + for (auto& msg : _fetched_messages) { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Processing message in main thread"); + if (!msg->callHandler(*_bucket_db_updater, msg)) { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down"); + sendDown(msg); + } + } + if (!_fetched_messages.empty()) { + _fetched_messages.clear(); + _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED; + } +} + framework::ThreadWaitInfo Distributor::doCriticalTick(framework::ThreadIndex idx) { _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; if (!_use_legacy_mode) { enableNextDistribution(); + fetch_external_messages(); } // Propagates any new configs down to stripe(s) enableNextConfig(); @@ -508,8 +542,9 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx) _stripe->doNonCriticalTick(idx); _tickResult = _stripe->_tickResult; } else { - _bucket_db_updater->resend_delayed_messages(); _tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN; + process_fetched_external_messages(); + _bucket_db_updater->resend_delayed_messages(); } return _tickResult; } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index d9aada78c79..f96160cf13c 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -171,6 +171,12 @@ private: void enableNextDistribution(); void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>); + void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg); + void fetch_external_messages(); + void process_fetched_external_messages(); + + using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>; + DistributorComponentRegister& _comp_reg; std::shared_ptr<DistributorMetricSet> _metrics; ChainedMessageSender* _messageSender; @@ -180,6 +186,8 @@ private: std::unique_ptr<DistributorStripePool> _stripe_pool; std::vector<std::unique_ptr<DistributorStripe>> _stripes; std::unique_ptr<StripeAccessor> _stripe_accessor; + MessageQueue _message_queue; // Queue for top-level ops + MessageQueue _fetched_messages; distributor::DistributorComponent _component; std::shared_ptr<const DistributorConfiguration> _total_config; std::unique_ptr<BucketDBUpdater> _bucket_db_updater; |