summaryrefslogtreecommitdiffstats
path: root/documentapi
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-08 13:42:12 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-09-14 13:22:14 +0000
commit54150406f3c490463bc5371c9480452da168bd5c (patch)
tree73721c84df79dd649c967a5fe9f0525431476af6 /documentapi
parentea22ec7cb6e35cf591d08ffe898388a7f08593cc (diff)
Basic functionality for direct RPC for StorageAPI communication
This has several advantages: * Completely bypasses all MessageBus indirections * Explicit setup of RPC thread pool * Direct dispatch from RPC thread to persistence queue pool * Better control of encoding/decoding and buffer usage
Diffstat (limited to 'documentapi')
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp26
1 files changed, 11 insertions, 15 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
index 472440d7863..166c3be5e66 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/storagepolicy.cpp
@@ -51,11 +51,11 @@ namespace {
string StoragePolicy::init()
{
string error = ExternSlobrokPolicy::init();
- if (error.length() > 0) {
+ if (!error.empty()) {
return error;
}
- if (!_clusterConfigId.length()) {
+ if (_clusterConfigId.empty()) {
_clusterConfigId = createConfigId(_clusterName);
}
@@ -72,9 +72,7 @@ string StoragePolicy::init()
return "";
}
-StoragePolicy::~StoragePolicy()
-{
-}
+StoragePolicy::~StoragePolicy() = default;
string
StoragePolicy::createConfigId(const string & clusterName) const
@@ -102,7 +100,7 @@ void
StoragePolicy::configure(std::unique_ptr<vespa::config::content::StorDistributionConfig> config)
{
try {
- _nextDistribution.reset(new storage::lib::Distribution(*config));
+ _nextDistribution = std::make_unique<storage::lib::Distribution>(*config);
} catch (const std::exception& e) {
LOG(warning, "Got exception when configuring distribution, config id was %s", _clusterConfigId.c_str());
throw e;
@@ -167,25 +165,23 @@ StoragePolicy::doSelect(mbus::RoutingContext &context)
// Pick a distributor using ideal state algorithm
try {
- // Update distribution here, to make it not take lock in average case
- if (_nextDistribution.get() != 0) {
+ // Update distribution here, to make it not take lock in average case
+ if (_nextDistribution) {
_distribution = std::move(_nextDistribution);
_nextDistribution.reset();
}
assert(_distribution.get());
distributor = _distribution->getIdealDistributorNode(*_state, id);
} catch (storage::lib::TooFewBucketBitsInUseException& e) {
- mbus::Reply::UP reply(
- new WrongDistributionReply(_state->toString()));
+ auto reply = std::make_unique<WrongDistributionReply>(_state->toString());
reply->addError(mbus::Error(
DocumentProtocol::ERROR_WRONG_DISTRIBUTION,
"Too few distribution bits used for given cluster state"));
context.setReply(std::move(reply));
return;
-
} catch (storage::lib::NoDistributorsAvailableException& e) {
- // No distributors available in current cluster state. Remove
- // cluster state we cannot use and send to random target
+ // No distributors available in current cluster state. Remove
+ // cluster state we cannot use and send to random target
_state.reset();
distributor = -1;
}
@@ -240,8 +236,8 @@ StoragePolicy::updateStateFromReply(WrongDistributionReply& wdr)
{
std::unique_ptr<storage::lib::ClusterState> newState(
new storage::lib::ClusterState(wdr.getSystemState()));
- if (_state.get() == 0 || newState->getVersion() >= _state->getVersion()) {
- if (_state.get()) {
+ if (!_state || newState->getVersion() >= _state->getVersion()) {
+ if (_state) {
wdr.getTrace().trace(1, make_string("System state changed from version %u to %u",
_state->getVersion(), newState->getVersion()));
} else {