diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-08 13:42:12 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-09-14 13:22:14 +0000 |
commit | 54150406f3c490463bc5371c9480452da168bd5c (patch) | |
tree | 73721c84df79dd649c967a5fe9f0525431476af6 /documentapi | |
parent | ea22ec7cb6e35cf591d08ffe898388a7f08593cc (diff) |
Basic functionality for direct RPC for StorageAPI communication
This has several advantages:
* Completely bypasses all MessageBus indirections
* Explicit setup of RPC thread pool
* Direct dispatch from RPC thread to persistence queue pool
* Better control of encoding/decoding and buffer usage
Diffstat (limited to 'documentapi')
-rw-r--r-- | documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp | 26 |
1 files changed, 11 insertions, 15 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp index 472440d7863..166c3be5e66 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp @@ -51,11 +51,11 @@ namespace { string StoragePolicy::init() { string error = ExternSlobrokPolicy::init(); - if (error.length() > 0) { + if (!error.empty()) { return error; } - if (!_clusterConfigId.length()) { + if (_clusterConfigId.empty()) { _clusterConfigId = createConfigId(_clusterName); } @@ -72,9 +72,7 @@ string StoragePolicy::init() return ""; } -StoragePolicy::~StoragePolicy() -{ -} +StoragePolicy::~StoragePolicy() = default; string StoragePolicy::createConfigId(const string & clusterName) const @@ -102,7 +100,7 @@ void StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config) { try { - _nextDistribution.reset(new storage::lib::Distribution(*config)); + _nextDistribution = std::make_unique<storage::lib::Distribution>(*config); } catch (const std::exception& e) { LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str()); throw e; @@ -167,25 +165,23 @@ StoragePolicy::doSelect(mbus::RoutingContext &context) // Pick a distributor using ideal state algorithm try { - // Update distribution here, to make it not take lock in average case - if (_nextDistribution.get() != 0) { + // Update distribution here, to make it not take lock in average case + if (_nextDistribution) { _distribution = std::move(_nextDistribution); _nextDistribution.reset(); } assert(_distribution.get()); distributor = _distribution->getIdealDistributorNode(*_state, id); } catch (storage::lib::TooFewBucketBitsInUseException& e) { - mbus::Reply::UP reply( - new WrongDistributionReply(_state->toString())); + auto reply = std::make_unique<WrongDistributionReply>(_state->toString()); reply->addError(mbus::Error( DocumentProtocol::ERROR_WRONG_DISTRIBUTION, "Too few distribution bits used for given cluster state")); context.setReply(std::move(reply)); return; - } catch (storage::lib::NoDistributorsAvailableException& e) { - // No distributors available in current cluster state. Remove - // cluster state we cannot use and send to random target + // No distributors available in current cluster state. Remove + // cluster state we cannot use and send to random target _state.reset(); distributor = -1; } @@ -240,8 +236,8 @@ StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr) { std::unique_ptr<storage::lib::ClusterState> newState( new storage::lib::ClusterState(wdr.getSystemState())); - if (_state.get() == 0 || newState->getVersion() >= _state->getVersion()) { - if (_state.get()) { + if (!_state || newState->getVersion() >= _state->getVersion()) { + if (_state) { wdr.getTrace().trace(1, make_string("System state changed from version %u to %u", _state->getVersion(), newState->getVersion())); } else { |