diff options
Diffstat (limited to 'documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp')
-rw-r--r-- | documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp | 59 |
1 files changed, 40 insertions, 19 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index e391699b750..ea27d42e790 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -43,7 +43,7 @@ namespace { class CallBack : public config::IFetcherCallback<storage::lib::Distribution::DistributionConfig> { public: - CallBack(ContentPolicy & policy) : _policy(policy) { } + explicit CallBack(ContentPolicy & policy) : _policy(policy) { } void configure(std::unique_ptr<storage::lib::Distribution::DistributionConfig> config) override { _policy.configure(std::move(config)); } @@ -78,13 +78,13 @@ string ContentPolicy::init() ContentPolicy::~ContentPolicy() = default; string -ContentPolicy::createConfigId(const string & clusterName) const +ContentPolicy::createConfigId(const string & clusterName) { return clusterName; } string -ContentPolicy::createPattern(const string & clusterName, int distributor) const +ContentPolicy::createPattern(const string & clusterName, int distributor) { vespalib::asciistream ost; @@ -103,7 +103,8 @@ void ContentPolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) { try { - _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); + std::lock_guard guard(_rw_lock); + _distribution = std::make_unique<storage::lib::Distribution>(*config); } catch (const std::exception& e) { LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); throw e; @@ -116,8 +117,9 @@ ContentPolicy::doSelect(mbus::RoutingContext &context) const mbus::Message &msg = context.getMessage(); int distributor = -1; + auto [cur_state, cur_distribution] = internal_state_snapshot(); - if (_state.get()) { + if (cur_state) { document::BucketId id; switch(msg.getType()) { case DocumentProtocol::MESSAGE_PUTDOCUMENT: @@ -168,15 +170,10 @@ ContentPolicy::doSelect(mbus::RoutingContext &context) // Pick a distributor using ideal state algorithm try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution) { - _distribution = std::move(_nextDistribution); - _nextDistribution.reset(); - } - assert(_distribution.get()); - distributor = _distribution->getIdealDistributorNode(*_state, id); + assert(cur_distribution); + distributor = cur_distribution->getIdealDistributorNode(*cur_state, id); } catch (storage::lib::TooFewBucketBitsInUseException& e) { - auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); + auto reply = std::make_unique<WrongDistributionReply>(cur_state->toString()); reply->addError(mbus::Error( DocumentProtocol::ERROR_WRONG_DISTRIBUTION, "Too few distribution bits used for given cluster state")); @@ -185,7 +182,7 @@ ContentPolicy::doSelect(mbus::RoutingContext &context) } catch (storage::lib::NoDistributorsAvailableException& e) { // No distributors available in current cluster state. Remove // cluster state we cannot use and send to random target - _state.reset(); + reset_state(); distributor = -1; } } @@ -216,7 +213,7 @@ ContentPolicy::getRecipient(mbus::RoutingContext& context, int distributor) return mbus::Hop::parse(entries[random() % entries.size()].second + "/default"); } - return mbus::Hop(); + return {}; } void @@ -226,9 +223,9 @@ ContentPolicy::merge(mbus::RoutingContext &context) mbus::Reply::UP reply = it.removeReply(); if (reply->getType() == DocumentProtocol::REPLY_WRONGDISTRIBUTION) { - updateStateFromReply(static_cast<WrongDistributionReply&>(*reply)); + updateStateFromReply(dynamic_cast<WrongDistributionReply&>(*reply)); } else if (reply->hasErrors()) { - _state.reset(); + reset_state(); } context.setReply(std::move(reply)); @@ -237,8 +234,8 @@ ContentPolicy::merge(mbus::RoutingContext &context) void ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) { - std::unique_ptr<storage::lib::ClusterState> newState( - new storage::lib::ClusterState(wdr.getSystemState())); + auto newState = std::make_unique<storage::lib::ClusterState>(wdr.getSystemState()); + std::lock_guard guard(_rw_lock); if (!_state || newState->getVersion() >= _state->getVersion()) { if (_state) { wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", @@ -256,4 +253,28 @@ ContentPolicy::updateStateFromReply(WrongDistributionReply& wdr) } } +ContentPolicy::StateSnapshot +ContentPolicy::internal_state_snapshot() +{ + std::shared_lock guard(_rw_lock); + return {_state, _distribution}; +} + +std::shared_ptr<const storage::lib::ClusterState> +ContentPolicy::getSystemState() const noexcept +{ + std::shared_lock guard(_rw_lock); + return _state; +} + +void +ContentPolicy::reset_state() +{ + // It's possible for the caller to race between checking and resetting the state, + // but this should never lead to a worse outcome than sending to a random distributor + // as if no state had been cached prior. + std::lock_guard guard(_rw_lock); + _state.reset(); +} + } // documentapi |