summaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp')
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp59
1 files changed, 40 insertions, 19 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
index e391699b750..ea27d42e790 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
@@ -43,7 +43,7 @@ namespace {
class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig>
{
public:
- CallBack(ContentPolicy & policy) : _policy(policy) { }
+ explicit CallBack(ContentPolicy & policy) : _policy(policy) { }
void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override {
_policy.configure(std::move(config));
}
@@ -78,13 +78,13 @@ string ContentPolicy::init()
ContentPolicy::~ContentPolicy() = default;
string
-ContentPolicy::createConfigId(const string & clusterName) const
+ContentPolicy::createConfigId(const string & clusterName)
{
return clusterName;
}
string
-ContentPolicy::createPattern(const string & clusterName, int distributor) const
+ContentPolicy::createPattern(const string & clusterName, int distributor)
{
vespalib::asciistream ost;
@@ -103,7 +103,8 @@ void
ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
{
try {
- _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
+ std::lock_guard guard(_rw_lock);
+ _distribution = std::make_unique<storage::lib::Distribution>(*config);
} catch (const std::exception& e) {
LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
throw e;
@@ -116,8 +117,9 @@ ContentPolicy::doSelect(mbus::RoutingContext &context)
const mbus::Message &msg = context.getMessage();
int distributor = -1;
+ auto [cur_state, cur_distribution] = internal_state_snapshot();
- if (_state.get()) {
+ if (cur_state) {
document::BucketId id;
switch(msg.getType()) {
case DocumentProtocol::MESSAGE_PUTDOCUMENT:
@@ -168,15 +170,10 @@ ContentPolicy::doSelect(mbus::RoutingContext &context)
// Pick a distributor using ideal state algorithm
try {
- // Update distribution here, to make it not take lock in average case
- if (_nextDistribution) {
- _distribution = std::move(_nextDistribution);
- _nextDistribution.reset();
- }
- assert(_distribution.get());
- distributor = _distribution->getIdealDistributorNode(*_state, id);
+ assert(cur_distribution);
+ distributor = cur_distribution->getIdealDistributorNode(*cur_state, id);
} catch (storage::lib::TooFewBucketBitsInUseException& e) {
- auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
+ auto reply = std::make_unique<WrongDistributionReply>(cur_state->toString());
reply->addError(mbus::Error(
DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
"Too few distribution bits used for given cluster state"));
@@ -185,7 +182,7 @@ ContentPolicy::doSelect(mbus::RoutingContext &context)
} catch (storage::lib::NoDistributorsAvailableException& e) {
// No distributors available in current cluster state. Remove
// cluster state we cannot use and send to random target
- _state.reset();
+ reset_state();
distributor = -1;
}
}
@@ -216,7 +213,7 @@ ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor)
return mbus::Hop::parse(entries[random() % entries.size()].second + "/default");
}
- return mbus::Hop();
+ return {};
}
void
@@ -226,9 +223,9 @@ ContentPolicy::merge(mbus::RoutingContext &context)
mbus::Reply::UP reply = it.removeReply();
if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) {
- updateStateFromReply(static_cast<WrongDistributionReply&>(*reply));
+ updateStateFromReply(dynamic_cast<WrongDistributionReply&>(*reply));
} else if (reply->hasErrors()) {
- _state.reset();
+ reset_state();
}
context.setReply(std::move(reply));
@@ -237,8 +234,8 @@ ContentPolicy::merge(mbus::RoutingContext &context)
void
ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr)
{
- std::unique_ptr<storage::lib::ClusterState> newState(
- new storage::lib::ClusterState(wdr.getSystemState()));
+ auto newState = std::make_unique<storage::lib::ClusterState>(wdr.getSystemState());
+ std::lock_guard guard(_rw_lock);
if (!_state || newState->getVersion() >= _state->getVersion()) {
if (_state) {
wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
@@ -256,4 +253,28 @@ ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr)
}
}
+ContentPolicy::StateSnapshot
+ContentPolicy::internal_state_snapshot()
+{
+ std::shared_lock guard(_rw_lock);
+ return {_state, _distribution};
+}
+
+std::shared_ptr<const storage::lib::ClusterState>
+ContentPolicy::getSystemState() const noexcept
+{
+ std::shared_lock guard(_rw_lock);
+ return _state;
+}
+
+void
+ContentPolicy::reset_state()
+{
+ // It's possible for the caller to race between checking and resetting the state,
+ // but this should never lead to a worse outcome than sending to a random distributor
+ // as if no state had been cached prior.
+ std::lock_guard guard(_rw_lock);
+ _state.reset();
+}
+
} // documentapi