aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-10-17 19:44:06 +0200
committerGitHub <noreply@github.com>2023-10-17 19:44:06 +0200
commit771b853b8a0b13a476854eed4e98317d02232ebc (patch)
tree6099599c299d5540614f4e98d252a11ecfea95c3
parent91ae05e15643b067d46a8fa9593d93c8b89bd1ab (diff)
parenta6d24b3ac1d0416847dddbfd686e953552ca68f6 (diff)
Merge pull request #28985 from vespa-engine/vekterli/improve-content-policy-thread-safetyv8.244.20
Improve thread safety of MessageBus (DocumentAPI) ContentPolicy
-rw-r--r--documentapi/src/tests/policies/policies_test.cpp10
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp59
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h45
3 files changed, 71 insertions, 43 deletions
diff --git a/documentapi/src/tests/policies/policies_test.cpp b/documentapi/src/tests/policies/policies_test.cpp
index 9dd73a71920..7091b63b6b3 100644
--- a/documentapi/src/tests/policies/policies_test.cpp
+++ b/documentapi/src/tests/policies/policies_test.cpp
@@ -806,7 +806,7 @@ Test::requireThatContentPolicyIsRandomWithoutState()
ContentPolicy &policy = setupContentPolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 5);
- ASSERT_TRUE(policy.getSystemState() == nullptr);
+ ASSERT_FALSE(policy.getSystemState());
std::set<string> lst;
for (uint32_t i = 0; i < 666; i++) {
@@ -858,12 +858,12 @@ Test::requireThatContentPolicyIsTargetedWithState()
"cluster=mycluster;slobroks=tcp/localhost:%d;clusterconfigid=%s;syncinit",
slobrok.port(), getDefaultDistributionConfig(2, 5).c_str());
ContentPolicy &policy = setupContentPolicy(frame, param, "storage/cluster.mycluster/distributor/*/default", 5);
- ASSERT_TRUE(policy.getSystemState() == nullptr);
+ ASSERT_FALSE(policy.getSystemState());
{
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 1));
leaf[0]->handleReply(std::make_unique<WrongDistributionReply>("distributor:5 storage:5"));
- ASSERT_TRUE(policy.getSystemState() != nullptr);
+ ASSERT_TRUE(policy.getSystemState());
EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:5 storage:5");
}
std::set<string> lst;
@@ -897,12 +897,12 @@ Test::requireThatContentPolicyCombinesSystemAndSlobrokState()
ContentPolicy &policy = setupContentPolicy(
frame, param,
"storage/cluster.mycluster/distributor/*/default", 1);
- ASSERT_TRUE(policy.getSystemState() == nullptr);
+ ASSERT_FALSE(policy.getSystemState());
{
std::vector<mbus::RoutingNode*> leaf;
ASSERT_TRUE(frame.select(leaf, 1));
leaf[0]->handleReply(std::make_unique<WrongDistributionReply>("distributor:99 storage:99"));
- ASSERT_TRUE(policy.getSystemState() != nullptr);
+ ASSERT_TRUE(policy.getSystemState());
EXPECT_EQUAL(policy.getSystemState()->toString(), "distributor:99 storage:99");
}
for (int i = 0; i < 666; i++) {
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
index e391699b750..ea27d42e790 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
@@ -43,7 +43,7 @@ namespace {
class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig>
{
public:
- CallBack(ContentPolicy & policy) : _policy(policy) { }
+ explicit CallBack(ContentPolicy & policy) : _policy(policy) { }
void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override {
_policy.configure(std::move(config));
}
@@ -78,13 +78,13 @@ string ContentPolicy::init()
ContentPolicy::~ContentPolicy() = default;
string
-ContentPolicy::createConfigId(const string & clusterName) const
+ContentPolicy::createConfigId(const string & clusterName)
{
return clusterName;
}
string
-ContentPolicy::createPattern(const string & clusterName, int distributor) const
+ContentPolicy::createPattern(const string & clusterName, int distributor)
{
vespalib::asciistream ost;
@@ -103,7 +103,8 @@ void
ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
{
try {
- _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
+ std::lock_guard guard(_rw_lock);
+ _distribution = std::make_unique<storage::lib::Distribution>(*config);
} catch (const std::exception& e) {
LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
throw e;
@@ -116,8 +117,9 @@ ContentPolicy::doSelect(mbus::RoutingContext &context)
const mbus::Message &msg = context.getMessage();
int distributor = -1;
+ auto [cur_state, cur_distribution] = internal_state_snapshot();
- if (_state.get()) {
+ if (cur_state) {
document::BucketId id;
switch(msg.getType()) {
case DocumentProtocol::MESSAGE_PUTDOCUMENT:
@@ -168,15 +170,10 @@ ContentPolicy::doSelect(mbus::RoutingContext &context)
// Pick a distributor using ideal state algorithm
try {
- // Update distribution here, to make it not take lock in average case
- if (_nextDistribution) {
- _distribution = std::move(_nextDistribution);
- _nextDistribution.reset();
- }
- assert(_distribution.get());
- distributor = _distribution->getIdealDistributorNode(*_state, id);
+ assert(cur_distribution);
+ distributor = cur_distribution->getIdealDistributorNode(*cur_state, id);
} catch (storage::lib::TooFewBucketBitsInUseException& e) {
- auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
+ auto reply = std::make_unique<WrongDistributionReply>(cur_state->toString());
reply->addError(mbus::Error(
DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
"Too few distribution bits used for given cluster state"));
@@ -185,7 +182,7 @@ ContentPolicy::doSelect(mbus::RoutingContext &context)
} catch (storage::lib::NoDistributorsAvailableException& e) {
// No distributors available in current cluster state. Remove
// cluster state we cannot use and send to random target
- _state.reset();
+ reset_state();
distributor = -1;
}
}
@@ -216,7 +213,7 @@ ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor)
return mbus::Hop::parse(entries[random() % entries.size()].second + "/default");
}
- return mbus::Hop();
+ return {};
}
void
@@ -226,9 +223,9 @@ ContentPolicy::merge(mbus::RoutingContext &context)
mbus::Reply::UP reply = it.removeReply();
if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) {
- updateStateFromReply(static_cast<WrongDistributionReply&>(*reply));
+ updateStateFromReply(dynamic_cast<WrongDistributionReply&>(*reply));
} else if (reply->hasErrors()) {
- _state.reset();
+ reset_state();
}
context.setReply(std::move(reply));
@@ -237,8 +234,8 @@ ContentPolicy::merge(mbus::RoutingContext &context)
void
ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr)
{
- std::unique_ptr<storage::lib::ClusterState> newState(
- new storage::lib::ClusterState(wdr.getSystemState()));
+ auto newState = std::make_unique<storage::lib::ClusterState>(wdr.getSystemState());
+ std::lock_guard guard(_rw_lock);
if (!_state || newState->getVersion() >= _state->getVersion()) {
if (_state) {
wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
@@ -256,4 +253,28 @@ ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr)
}
}
+ContentPolicy::StateSnapshot
+ContentPolicy::internal_state_snapshot()
+{
+ std::shared_lock guard(_rw_lock);
+ return {_state, _distribution};
+}
+
+std::shared_ptr<const storage::lib::ClusterState>
+ContentPolicy::getSystemState() const noexcept
+{
+ std::shared_lock guard(_rw_lock);
+ return _state;
+}
+
+void
+ContentPolicy::reset_state()
+{
+ // It's possible for the caller to race between checking and resetting the state,
+ // but this should never lead to a worse outcome than sending to a random distributor
+ // as if no state had been cached prior.
+ std::lock_guard guard(_rw_lock);
+ _state.reset();
+}
+
} // documentapi
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
index e49ad378b90..7a3675c3001 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.h
@@ -6,55 +6,62 @@
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/messagebus/routing/hop.h>
+#include <shared_mutex>
namespace config {
class ICallback;
class ConfigFetcher;
}
-namespace storage {
-namespace lib {
+namespace storage::lib {
class Distribution;
class ClusterState;
}
-}
namespace documentapi {
class ContentPolicy : public ExternSlobrokPolicy
{
private:
- document::BucketIdFactory _bucketIdFactory;
- std::unique_ptr<storage::lib::ClusterState> _state;
- string _clusterName;
- string _clusterConfigId;
- std::unique_ptr<config::ICallback> _callBack;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
- std::unique_ptr<storage::lib::Distribution> _distribution;
- std::unique_ptr<storage::lib::Distribution> _nextDistribution;
+ document::BucketIdFactory _bucketIdFactory;
+ mutable std::shared_mutex _rw_lock;
+ std::shared_ptr<const storage::lib::ClusterState> _state;
+ string _clusterName;
+ string _clusterConfigId;
+ std::unique_ptr<config::ICallback> _callBack;
+ std::unique_ptr<config::ConfigFetcher> _configFetcher;
+ std::shared_ptr<const storage::lib::Distribution> _distribution;
+
+ using StateSnapshot = std::pair<std::shared_ptr<const storage::lib::ClusterState>,
+ std::shared_ptr<const storage::lib::Distribution>>;
+
+ // Acquires _lock
+ [[nodiscard]] StateSnapshot internal_state_snapshot();
mbus::Hop getRecipient(mbus::RoutingContext& context, int distributor);
+ // Acquires _lock
+ void updateStateFromReply(WrongDistributionReply& reply);
+ // Acquires _lock
+ void reset_state();
public:
- ContentPolicy(const string& param);
- ~ContentPolicy();
+ explicit ContentPolicy(const string& param);
+ ~ContentPolicy() override;
void doSelect(mbus::RoutingContext &context) override;
void merge(mbus::RoutingContext &context) override;
- void updateStateFromReply(WrongDistributionReply& reply);
-
/**
* @return a pointer to the system state registered with this policy. If
- * we haven't received a system state yet, returns NULL.
+ * we haven't received a system state yet, returns nullptr.
*/
- const storage::lib::ClusterState* getSystemState() const { return _state.get(); }
+ std::shared_ptr<const storage::lib::ClusterState> getSystemState() const noexcept;
virtual void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config);
string init() override;
private:
- string createConfigId(const string & clusterName) const;
- string createPattern(const string & clusterName, int distributor) const;
+ static string createConfigId(const string & clusterName);
+ static string createPattern(const string & clusterName, int distributor);
};
}