summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-05 13:23:06 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-05-05 13:23:06 +0000
commitdd1e1988e9462d549513873383294576516c0141 (patch)
tree32c170d92c0380574a65e392aca7425d01ca9cd3 /storage
parent5acb5ecef019dea31e337ea419b7787962decff0 (diff)
Dispatch messages to be handled by BucketDBUpdater to main distributor thread
Required to ensure no race conditions can happen from processing such messages from arbitrary RPC/CommunicationManager threads.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp49
-rw-r--r--storage/src/vespa/storage/distributor/distributor.h8
2 files changed, 50 insertions, 7 deletions
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 2f524f00e0e..cd42b0ecc1b 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -62,6 +62,8 @@ Distributor::Distributor(DistributorComponentRegister& compReg,
_stripe_pool(),
_stripes(),
_stripe_accessor(),
+ _message_queue(),
+ _fetched_messages(),
_component(*this, compReg, "distributor"),
_total_config(_component.total_distributor_config_sp()),
_bucket_db_updater(),
@@ -312,10 +314,9 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
if (_use_legacy_mode) {
return _stripe->handle_or_enqueue_message(msg);
} else {
- // FIXME STRIPE this MUST be in a separate thread to enforce processing in a single thread
- // regardless of what RPC thread (comm mgr, FRT...) this is called from!
if (should_be_handled_by_top_level_bucket_db_updater(*msg)) {
- return msg->callHandler(*_bucket_db_updater, msg);
+ dispatch_to_main_distributor_thread_queue(msg);
+ return true;
}
assert(_stripes.size() == 1);
assert(_stripe_pool->stripe_count() == 1);
@@ -331,9 +332,7 @@ Distributor::onDown(const std::shared_ptr<api::StorageMessage>& msg)
bool
Distributor::handleReply(const std::shared_ptr<api::StorageReply>& reply)
{
- if (!_use_legacy_mode && should_be_handled_by_top_level_bucket_db_updater(*reply)) {
- return reply->callHandler(*_bucket_db_updater, reply);
- }
+ // TODO STRIPE this is used by tests. Do we need to invoke top-level BucketDBUpdater for any of them?
assert(_use_legacy_mode);
return _stripe->handleReply(reply);
}
@@ -485,12 +484,47 @@ Distributor::scanAllBuckets()
_stripe->scanAllBuckets();
}
+void
+Distributor::dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg)
+{
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Added to main thread message queue");
+ framework::TickingLockGuard guard(_threadPool.freezeCriticalTicks());
+ _message_queue.emplace_back(msg);
+ guard.broadcast();
+}
+
+void
+Distributor::fetch_external_messages()
+{
+ assert(!_use_legacy_mode);
+ assert(_fetched_messages.empty());
+ _fetched_messages.swap(_message_queue);
+}
+
+void
+Distributor::process_fetched_external_messages()
+{
+ assert(!_use_legacy_mode);
+ for (auto& msg : _fetched_messages) {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Processing message in main thread");
+ if (!msg->callHandler(*_bucket_db_updater, msg)) {
+ MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down");
+ sendDown(msg);
+ }
+ }
+ if (!_fetched_messages.empty()) {
+ _fetched_messages.clear();
+ _tickResult = framework::ThreadWaitInfo::MORE_WORK_ENQUEUED;
+ }
+}
+
framework::ThreadWaitInfo
Distributor::doCriticalTick(framework::ThreadIndex idx)
{
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
if (!_use_legacy_mode) {
enableNextDistribution();
+ fetch_external_messages();
}
// Propagates any new configs down to stripe(s)
enableNextConfig();
@@ -508,8 +542,9 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
_stripe->doNonCriticalTick(idx);
_tickResult = _stripe->_tickResult;
} else {
- _bucket_db_updater->resend_delayed_messages();
_tickResult = framework::ThreadWaitInfo::NO_MORE_CRITICAL_WORK_KNOWN;
+ process_fetched_external_messages();
+ _bucket_db_updater->resend_delayed_messages();
}
return _tickResult;
}
diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h
index d9aada78c79..f96160cf13c 100644
--- a/storage/src/vespa/storage/distributor/distributor.h
+++ b/storage/src/vespa/storage/distributor/distributor.h
@@ -171,6 +171,12 @@ private:
void enableNextDistribution();
void propagateDefaultDistribution(std::shared_ptr<const lib::Distribution>);
+ void dispatch_to_main_distributor_thread_queue(const std::shared_ptr<api::StorageMessage>& msg);
+ void fetch_external_messages();
+ void process_fetched_external_messages();
+
+ using MessageQueue = std::vector<std::shared_ptr<api::StorageMessage>>;
+
DistributorComponentRegister& _comp_reg;
std::shared_ptr<DistributorMetricSet> _metrics;
ChainedMessageSender* _messageSender;
@@ -180,6 +186,8 @@ private:
std::unique_ptr<DistributorStripePool> _stripe_pool;
std::vector<std::unique_ptr<DistributorStripe>> _stripes;
std::unique_ptr<StripeAccessor> _stripe_accessor;
+ MessageQueue _message_queue; // Queue for top-level ops
+ MessageQueue _fetched_messages;
distributor::DistributorComponent _component;
std::shared_ptr<const DistributorConfiguration> _total_config;
std::unique_ptr<BucketDBUpdater> _bucket_db_updater;