From d39b461d4259dc3b4d8f405ac730334e13002ee2 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 17 Oct 2023 14:44:56 +0000 Subject: Improve thread safety of MessageBus ContentPolicy Updates of distribution config and cached cluster state are now both thread safe. Move to `shared_ptr` to allow for taking immutable strong refs. Also remove pointless two-phased config switch-over in favor of directly updating value inside lock. --- documentapi/src/tests/policies/policies_test.cpp | 10 ++-- .../messagebus/policies/contentpolicy.cpp | 59 +++++++++++++++------- .../messagebus/policies/contentpolicy.h | 45 ++++++++++------- 3 files changed, 71 insertions(+), 43 deletions(-) diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp index 9dd73a71920..7091b63b6b3 100644 --- a/documentapi/src/tests/policies/policies_test.cpp +++ b/documentapi/src/tests/policies/policies_test.cpp @@ -806,7 +806,7 @@ Test::requireThatContentPolicyIsRandomWithoutState() ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 5); - ASSERT_TRUE(policy.getSystemState() == nullptr); + ASSERT_FALSE(policy.getSystemState()); std::set lst; for (uint32_t i = 0; i < 666; i++) { @@ -858,12 +858,12 @@ Test::requireThatContentPolicyIsTargetedWithState() "cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit", slobrok.port(), getDefaultDistributionConfig(2, 5).c_str()); ContentPolicy &policy = setupContentPolicy(frame, param, "storage/cluster.mycluster/distributor/*/default", 5); - ASSERT_TRUE(policy.getSystemState() == nullptr); + ASSERT_FALSE(policy.getSystemState()); { std::vector leaf; ASSERT_TRUE(frame.select(leaf, 1)); leaf[0]->handleReply(std::make_unique("distributor:5 storage:5")); - ASSERT_TRUE(policy.getSystemState() != nullptr); + ASSERT_TRUE(policy.getSystemState()); EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:5 storage:5"); } std::set lst; @@ -897,12 +897,12 @@ Test::requireThatContentPolicyCombinesSystemAndSlobrokState() ContentPolicy &policy = setupContentPolicy( frame, param, "storage/cluster.mycluster/distributor/*/default", 1); - ASSERT_TRUE(policy.getSystemState() == nullptr); + ASSERT_FALSE(policy.getSystemState()); { std::vector leaf; ASSERT_TRUE(frame.select(leaf, 1)); leaf[0]->handleReply(std::make_unique("distributor:99 storage:99")); - ASSERT_TRUE(policy.getSystemState() != nullptr); + ASSERT_TRUE(policy.getSystemState()); EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:99 storage:99"); } for (int i = 0; i < 666; i++) { diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index e391699b750..c13f32f2df5 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -43,7 +43,7 @@ namespace { class CallBack : public config::IFetcherCallback { public: - CallBack(ContentPolicy & policy) : _policy(policy) { } + explicit CallBack(ContentPolicy & policy) : _policy(policy) { } void configure(std::unique_ptr config) override { _policy.configure(std::move(config)); } @@ -78,13 +78,13 @@ string ContentPolicy::init() ContentPolicy::~ContentPolicy() = default; string -ContentPolicy::createConfigId(const string & clusterName) const +ContentPolicy::createConfigId(const string & clusterName) { return clusterName; } string -ContentPolicy::createPattern(const string & clusterName, int distributor) const +ContentPolicy::createPattern(const string & clusterName, int distributor) { vespalib::asciistream ost; @@ -103,7 +103,8 @@ void ContentPolicy::configure(std::unique_ptr config) { try { - _nextDistribution = std::make_unique(*config); + std::lock_guard guard(_lock); + _distribution = std::make_unique(*config); } catch (const std::exception& e) { LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); throw e; @@ -116,8 +117,9 @@ ContentPolicy::doSelect(mbus::RoutingContext &context) const mbus::Message &msg = context.getMessage(); int distributor = -1; + auto [cur_state, cur_distribution] = internal_state_snapshot(); - if (_state.get()) { + if (cur_state) { document::BucketId id; switch(msg.getType()) { case DocumentProtocol::MESSAGE_PUTDOCUMENT: @@ -168,15 +170,10 @@ ContentPolicy::doSelect(mbus::RoutingContext &context) // Pick a distributor using ideal state algorithm try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); + assert(cur_distribution); + distributor = cur_distribution->getIdealDistributorNode(*cur_state, id); } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique(_state->toString()); + auto reply = std::make_unique(cur_state->toString()); reply->addError(mbus::Error( DocumentProtocol::ERROR_WRONG_DISTRIBUTION, "Too few distribution bits used for given cluster state")); @@ -185,7 +182,7 @@ ContentPolicy::doSelect(mbus::RoutingContext &context) } catch (storage::lib::NoDistributorsAvailableException& e) { // No distributors available in current cluster state. Remove // cluster state we cannot use and send to random target - _state.reset(); + reset_state(); distributor = -1; } } @@ -216,7 +213,7 @@ ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); } - return mbus::Hop(); + return {}; } void @@ -226,9 +223,9 @@ ContentPolicy::merge(mbus::RoutingContext &context) mbus::Reply::UP reply = it.removeReply(); if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast(*reply)); + updateStateFromReply(dynamic_cast(*reply)); } else if (reply->hasErrors()) { - _state.reset(); + reset_state(); } context.setReply(std::move(reply)); @@ -237,8 +234,8 @@ ContentPolicy::merge(mbus::RoutingContext &context) void ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) { - std::unique_ptr newState( - new storage::lib::ClusterState(wdr.getSystemState())); + auto newState = std::make_unique(wdr.getSystemState()); + std::lock_guard guard(_lock); if (!_state || newState->getVersion() >= _state->getVersion()) { if (_state) { wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", @@ -256,4 +253,28 @@ ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) } } +ContentPolicy::StateSnapshot +ContentPolicy::internal_state_snapshot() +{ + std::lock_guard guard(_lock); + return {_state, _distribution}; +} + +std::shared_ptr +ContentPolicy::getSystemState() const noexcept +{ + std::lock_guard guard(_lock); + return _state; +} + +void +ContentPolicy::reset_state() +{ + // It's possible for the caller to race between checking and resetting the state, + // but this should never lead to a worse outcome than sending to a random distributor + // as if no state had been cached prior. + std::lock_guard guard(_lock); + _state.reset(); +} + } // documentapi diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index e49ad378b90..182b35a0e98 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -6,55 +6,62 @@ #include #include #include +#include namespace config { class ICallback; class ConfigFetcher; } -namespace storage { -namespace lib { +namespace storage::lib { class Distribution; class ClusterState; } -} namespace documentapi { class ContentPolicy : public ExternSlobrokPolicy { private: - document::BucketIdFactory _bucketIdFactory; - std::unique_ptr _state; - string _clusterName; - string _clusterConfigId; - std::unique_ptr _callBack; - std::unique_ptr _configFetcher; - std::unique_ptr _distribution; - std::unique_ptr _nextDistribution; + document::BucketIdFactory _bucketIdFactory; + mutable std::mutex _lock; + std::shared_ptr _state; + string _clusterName; + string _clusterConfigId; + std::unique_ptr _callBack; + std::unique_ptr _configFetcher; + std::shared_ptr _distribution; + + using StateSnapshot = std::pair, + std::shared_ptr>; + + // Acquires _lock + [[nodiscard]] StateSnapshot internal_state_snapshot(); mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor); + // Acquires _lock + void updateStateFromReply(WrongDistributionReply& reply); + // Acquires _lock + void reset_state(); public: - ContentPolicy(const string& param); - ~ContentPolicy(); + explicit ContentPolicy(const string& param); + ~ContentPolicy() override; void doSelect(mbus::RoutingContext &context) override; void merge(mbus::RoutingContext &context) override; - void updateStateFromReply(WrongDistributionReply& reply); - /** * @return a pointer to the system state registered with this policy. If - * we haven't received a system state yet, returns NULL. + * we haven't received a system state yet, returns nullptr. */ - const storage::lib::ClusterState* getSystemState() const { return _state.get(); } + std::shared_ptr getSystemState() const noexcept; virtual void configure(std::unique_ptr config); string init() override; private: - string createConfigId(const string & clusterName) const; - string createPattern(const string & clusterName, int distributor) const; + static string createConfigId(const string & clusterName); + static string createPattern(const string & clusterName, int distributor); }; } -- cgit v1.2.3 From a6d24b3ac1d0416847dddbfd686e953552ca68f6 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Tue, 17 Oct 2023 15:51:30 +0000 Subject: Use `shared_mutex` to allow non-contending reads (common case) --- .../vespa/documentapi/messagebus/policies/contentpolicy.cpp | 10 +++++----- .../src/vespa/documentapi/messagebus/policies/contentpolicy.h | 4 ++-- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index c13f32f2df5..ea27d42e790 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -103,7 +103,7 @@ void ContentPolicy::configure(std::unique_ptr config) { try { - std::lock_guard guard(_lock); + std::lock_guard guard(_rw_lock); _distribution = std::make_unique(*config); } catch (const std::exception& e) { LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); @@ -235,7 +235,7 @@ void ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) { auto newState = std::make_unique(wdr.getSystemState()); - std::lock_guard guard(_lock); + std::lock_guard guard(_rw_lock); if (!_state || newState->getVersion() >= _state->getVersion()) { if (_state) { wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", @@ -256,14 +256,14 @@ ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) ContentPolicy::StateSnapshot ContentPolicy::internal_state_snapshot() { - std::lock_guard guard(_lock); + std::shared_lock guard(_rw_lock); return {_state, _distribution}; } std::shared_ptr ContentPolicy::getSystemState() const noexcept { - std::lock_guard guard(_lock); + std::shared_lock guard(_rw_lock); return _state; } @@ -273,7 +273,7 @@ ContentPolicy::reset_state() // It's possible for the caller to race between checking and resetting the state, // but this should never lead to a worse outcome than sending to a random distributor // as if no state had been cached prior. - std::lock_guard guard(_lock); + std::lock_guard guard(_rw_lock); _state.reset(); } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h index 182b35a0e98..7a3675c3001 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h @@ -6,7 +6,7 @@ #include #include #include -#include +#include namespace config { class ICallback; @@ -24,7 +24,7 @@ class ContentPolicy : public ExternSlobrokPolicy { private: document::BucketIdFactory _bucketIdFactory; - mutable std::mutex _lock; + mutable std::shared_mutex _rw_lock; std::shared_ptr _state; string _clusterName; string _clusterConfigId; -- cgit v1.2.3