summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@oath.com>2018-04-03 20:17:09 +0200
committerHenning Baldersheim <balder@oath.com>2018-04-03 20:17:09 +0200
commit26e56f2bca05f82d8aa0c9e5da59fbfc00918161 (patch)
tree68239daea817457e0d8a5b918f03d3fa8673b499 /storage
parent69c50d6c403f1857cbdd081b1d6bf67581e5cbc6 (diff)
Follow up on code review comments and move some shared_ptr's
Diffstat (limited to 'storage')
-rw-r--r--storage/src/vespa/storage/distributor/activecopy.cpp31
-rw-r--r--storage/src/vespa/storage/distributor/activecopy.h9
-rw-r--r--storage/src/vespa/storage/distributor/distributormessagesender.cpp18
-rw-r--r--storage/src/vespa/storage/distributor/externaloperationhandler.cpp94
-rw-r--r--storage/src/vespa/storage/distributor/messagetracker.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp51
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp202
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h41
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp186
-rw-r--r--storage/src/vespa/storage/distributor/statecheckers.cpp21
11 files changed, 201 insertions, 464 deletions
diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp
index 234fddacfdf..4174ddb726f 100644
--- a/storage/src/vespa/storage/distributor/activecopy.cpp
+++ b/storage/src/vespa/storage/distributor/activecopy.cpp
@@ -22,8 +22,8 @@ namespace std {
namespace storage::distributor {
-ActiveCopy::ActiveCopy(uint16_t node, BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState) :
- nodeIndex(node),
+ActiveCopy::ActiveCopy(uint16_t node, const BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState) :
+ _nodeIndex(node),
_ideal(0xffff)
{
const BucketCopy* copy = e->getNode(node);
@@ -32,7 +32,10 @@ ActiveCopy::ActiveCopy(uint16_t node, BucketDatabase::Entry& e, const std::vecto
_trusted = copy->trusted();
_active = copy->active();
for (uint32_t i=0; i<idealState.size(); ++i) {
- if (idealState[i] == node) _ideal = i;
+ if (idealState[i] == node) {
+ _ideal = i;
+ break;
+ }
}
}
@@ -63,7 +66,7 @@ ActiveCopy::getReason() const {
std::ostream&
operator<<(std::ostream& out, const ActiveCopy & e) {
- out << "Entry(Node " << e.nodeIndex;
+ out << "Entry(Node " << e._nodeIndex;
if (e._ready) out << ", ready";
if (e._trusted) out << ", trusted";
if (e._ideal < 0xffff) out << ", ideal pri " << e._ideal;
@@ -79,7 +82,7 @@ namespace {
if (e1._trusted != e2._trusted) return e1._trusted;
if (e1._ideal != e2._ideal) return e1._ideal < e2._ideal;
if (e1._active != e2._active) return e1._active;
- return e1.nodeIndex < e2.nodeIndex;
+ return e1._nodeIndex < e2._nodeIndex;
}
};
@@ -96,8 +99,8 @@ namespace {
const std::vector<uint16_t>& idealState,
std::vector<ActiveCopy>& result)
{
- for (uint32_t i=0; i<nodeIndexes.size(); ++i) {
- result.push_back(ActiveCopy(nodeIndexes[i], e, idealState));
+ for (uint16_t nodeIndex : nodeIndexes) {
+ result.emplace_back(nodeIndex, e, idealState);
}
}
}
@@ -115,11 +118,10 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState,
BucketDatabase::Entry& e)
{
DEBUG(std::cerr << "Ideal state is " << idealState << "\n");
- std::vector<ActiveCopy> result;
std::vector<uint16_t> validNodesWithCopy;
buildValidNodeIndexList(e, validNodesWithCopy);
if (validNodesWithCopy.empty()) {
- return result;
+ return ActiveList();
}
typedef std::vector<uint16_t> IndexList;
std::vector<IndexList> groups;
@@ -128,6 +130,7 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState,
} else {
groups.push_back(validNodesWithCopy);
}
+ std::vector<ActiveCopy> result;
for (uint32_t i=0; i<groups.size(); ++i) {
std::vector<ActiveCopy> entries;
buildNodeList(e, groups[i], idealState, entries);
@@ -136,7 +139,7 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState,
DEBUG(std::cerr << "Best copy " << *best << "\n");
result.push_back(ActiveCopy(*best));
}
- return ActiveList(result);
+ return ActiveList(std::move(result));
}
void
@@ -147,13 +150,13 @@ ActiveList::print(std::ostream& out, bool verbose,
if (verbose) {
for (size_t i=0; i<_v.size(); ++i) {
out << "\n" << indent << " "
- << _v[i].nodeIndex << " " << _v[i].getReason();
+ << _v[i]._nodeIndex << " " << _v[i].getReason();
}
if (!_v.empty()) out << "\n" << indent;
} else {
- if (!_v.empty()) out << _v[0].nodeIndex;
+ if (!_v.empty()) out << _v[0]._nodeIndex;
for (size_t i=1; i<_v.size(); ++i) {
- out << " " << _v[i].nodeIndex;
+ out << " " << _v[i]._nodeIndex;
}
}
out << "]";
@@ -163,7 +166,7 @@ bool
ActiveList::contains(uint16_t node) const
{
for (uint32_t i=0; i<_v.size(); ++i) {
- if (node == _v[i].nodeIndex) return true;
+ if (node == _v[i]._nodeIndex) return true;
}
return false;
}
diff --git a/storage/src/vespa/storage/distributor/activecopy.h b/storage/src/vespa/storage/distributor/activecopy.h
index 8fcaf814947..d9f83be3748 100644
--- a/storage/src/vespa/storage/distributor/activecopy.h
+++ b/storage/src/vespa/storage/distributor/activecopy.h
@@ -10,9 +10,8 @@ namespace storage::distributor {
class ActiveList;
struct ActiveCopy {
-public:
- ActiveCopy() : nodeIndex(-1), _ideal(-1), _ready(false), _trusted(false), _active(false) { }
- ActiveCopy(uint16_t node, BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState);
+ ActiveCopy() : _nodeIndex(-1), _ideal(-1), _ready(false), _trusted(false), _active(false) { }
+ ActiveCopy(uint16_t node, const BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState);
vespalib::string getReason() const;
friend std::ostream& operator<<(std::ostream& out, const ActiveCopy& e);
@@ -20,7 +19,7 @@ public:
static ActiveList calculate(const std::vector<uint16_t>& idealState,
const lib::Distribution&, BucketDatabase::Entry&);
- uint16_t nodeIndex;
+ uint16_t _nodeIndex;
uint16_t _ideal;
bool _ready;
bool _trusted;
@@ -32,7 +31,7 @@ class ActiveList : public vespalib::Printable {
public:
ActiveList() {}
- ActiveList(std::vector<ActiveCopy>& v) { _v.swap(v); }
+ ActiveList(std::vector<ActiveCopy>&& v) : _v(std::move(v)) { }
ActiveCopy& operator[](size_t i) { return _v[i]; }
const ActiveCopy& operator[](size_t i) const { return _v[i]; }
diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.cpp b/storage/src/vespa/storage/distributor/distributormessagesender.cpp
index d87488b76c4..d40bd4bd9c2 100644
--- a/storage/src/vespa/storage/distributor/distributormessagesender.cpp
+++ b/storage/src/vespa/storage/distributor/distributormessagesender.cpp
@@ -6,20 +6,14 @@
namespace storage::distributor {
uint64_t
-DistributorMessageSender::sendToNode(
- const lib::NodeType& nodeType,
- uint16_t node,
- const std::shared_ptr<api::StorageCommand> & cmd,
- bool useDocumentAPI)
+DistributorMessageSender::sendToNode(const lib::NodeType& nodeType, uint16_t node,
+ const std::shared_ptr<api::StorageCommand> & cmd, bool useDocumentAPI)
{
cmd->setSourceIndex(getDistributorIndex());
- cmd->setAddress(api::StorageMessageAddress(
- getClusterName(),
- nodeType,
- node,
- (useDocumentAPI
- ? api::StorageMessageAddress::DOCUMENT
- : api::StorageMessageAddress::STORAGE)));
+ cmd->setAddress(api::StorageMessageAddress(getClusterName(), nodeType, node,
+ (useDocumentAPI
+ ? api::StorageMessageAddress::DOCUMENT
+ : api::StorageMessageAddress::STORAGE)));
uint64_t msgId = cmd->getMsgId();
sendCommand(cmd);
return msgId;
diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
index 0c4c67bb94f..bb8c2b0608a 100644
--- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
+++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp
@@ -25,22 +25,18 @@ LOG_SETUP(".distributor.manager");
namespace storage::distributor {
-ExternalOperationHandler::ExternalOperationHandler(
- Distributor& owner,
- DistributorBucketSpaceRepo& bucketSpaceRepo,
- const MaintenanceOperationGenerator& gen,
- DistributorComponentRegister& compReg)
+ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo,
+ const MaintenanceOperationGenerator& gen,
+ DistributorComponentRegister& compReg)
: DistributorComponent(owner, bucketSpaceRepo, compReg, "External operation handler"),
_operationGenerator(gen),
_rejectFeedBeforeTimeReached() // At epoch
{ }
-ExternalOperationHandler::~ExternalOperationHandler() { }
+ExternalOperationHandler::~ExternalOperationHandler() = default;
bool
-ExternalOperationHandler::handleMessage(
- const std::shared_ptr<api::StorageMessage>& msg,
- Operation::SP& op)
+ExternalOperationHandler::handleMessage(const std::shared_ptr<api::StorageMessage>& msg, Operation::SP& op)
{
_op = Operation::SP();
bool retVal = msg->callHandler(*this, msg);
@@ -52,10 +48,8 @@ api::ReturnCode
ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime)
{
std::ostringstream ss;
- auto now_sec(std::chrono::duration_cast<std::chrono::seconds>(
- unsafeTime.time_since_epoch()));
- auto future_sec(std::chrono::duration_cast<std::chrono::seconds>(
- _rejectFeedBeforeTimeReached.time_since_epoch()));
+ auto now_sec(std::chrono::duration_cast<std::chrono::seconds>(unsafeTime.time_since_epoch()));
+ auto future_sec(std::chrono::duration_cast<std::chrono::seconds>(_rejectFeedBeforeTimeReached.time_since_epoch()));
ss << "Operation received at time " << now_sec.count()
<< ", which is before bucket ownership transfer safe time of "
<< future_sec.count();
@@ -65,8 +59,7 @@ ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime)
bool
ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
{
- const auto now = TimePoint(std::chrono::seconds(
- getClock().getTimeInSeconds().getTime()));
+ const auto now = TimePoint(std::chrono::seconds(getClock().getTimeInSeconds().getTime()));
if (now < _rejectFeedBeforeTimeReached) {
api::StorageReply::UP reply(cmd.makeReply());
reply->setResult(makeSafeTimeRejectionResult(now));
@@ -77,18 +70,14 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd)
}
bool
-ExternalOperationHandler::checkTimestampMutationPreconditions(
- api::StorageCommand& cmd,
- const document::BucketId &bucketId,
- PersistenceOperationMetricSet& persistenceMetrics)
+ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageCommand& cmd,
+ const document::BucketId &bucketId,
+ PersistenceOperationMetricSet& persistenceMetrics)
{
document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId);
if (!checkDistribution(cmd, bucket)) {
- LOG(debug,
- "Distributor manager received %s, bucket %s with wrong "
- "distribution",
- cmd.toString().c_str(),
- bucket.toString().c_str());
+ LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution",
+ cmd.toString().c_str(), bucket.toString().c_str());
persistenceMetrics.failures.wrongdistributor++;
return false;
@@ -101,13 +90,12 @@ ExternalOperationHandler::checkTimestampMutationPreconditions(
}
std::shared_ptr<api::StorageMessage>
-ExternalOperationHandler::makeConcurrentMutationRejectionReply(
- api::StorageCommand& cmd,
- const document::DocumentId& docId,
- PersistenceOperationMetricSet& persistenceMetrics) const {
- auto err_msg = vespalib::make_string(
- "A mutating operation for document '%s' is already in progress",
- docId.toString().c_str());
+ExternalOperationHandler::makeConcurrentMutationRejectionReply(api::StorageCommand& cmd,
+ const document::DocumentId& docId,
+ PersistenceOperationMetricSet& persistenceMetrics) const
+{
+ auto err_msg = vespalib::make_string("A mutating operation for document '%s' is already in progress",
+ docId.toString().c_str());
LOG(debug, "Aborting incoming %s operation: %s", cmd.getType().toString().c_str(), err_msg.c_str());
persistenceMetrics.failures.concurrent_mutations++;
api::StorageReply::UP reply(cmd.makeReply());
@@ -184,12 +172,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove)
auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId());
if (allowMutation(handle)) {
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = std::make_shared<RemoveOperation>(
- *this,
- distributorBucketSpace,
- cmd,
- getMetrics().removes[cmd->getLoadType()],
- std::move(handle));
+ _op = std::make_shared<RemoveOperation>(*this, distributorBucketSpace, cmd,
+ getMetrics().removes[cmd->getLoadType()], std::move(handle));
} else {
sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics));
}
@@ -204,20 +188,14 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation)
document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid);
if (!checkDistribution(*cmd, bucket)) {
- LOG(debug,
- "Distributor manager received %s with wrong distribution",
- cmd->toString().c_str());
+ LOG(debug, "Distributor manager received %s with wrong distribution", cmd->toString().c_str());
- getMetrics().removelocations[cmd->getLoadType()].
- failures.wrongdistributor++;
+ getMetrics().removelocations[cmd->getLoadType()].failures.wrongdistributor++;
return true;
}
- _op = Operation::SP(new RemoveLocationOperation(
- *this,
- _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
- cmd,
- getMetrics().removelocations[cmd->getLoadType()]));
+ _op = std::make_shared<RemoveLocationOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
+ cmd, getMetrics().removelocations[cmd->getLoadType()]);
return true;
}
@@ -225,21 +203,15 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get)
{
document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId()));
if (!checkDistribution(*cmd, bucket)) {
- LOG(debug,
- "Distributor manager received get for %s, "
- "bucket %s with wrong distribution",
- cmd->getDocumentId().toString().c_str(),
- bucket.toString().c_str());
+ LOG(debug, "Distributor manager received get for %s, bucket %s with wrong distribution",
+ cmd->getDocumentId().toString().c_str(), bucket.toString().c_str());
getMetrics().gets[cmd->getLoadType()].failures.wrongdistributor++;
return true;
}
- _op = Operation::SP(new GetOperation(
- *this,
- _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
- cmd,
- getMetrics().gets[cmd->getLoadType()]));
+ _op = std::make_shared<GetOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()),
+ cmd, getMetrics().gets[cmd->getLoadType()]);
return true;
}
@@ -249,7 +221,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket)
return true;
}
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
- _op = Operation::SP(new StatBucketOperation(*this, distributorBucketSpace, cmd));
+ _op = std::make_shared<StatBucketOperation>(*this, distributorBucketSpace, cmd);
return true;
}
@@ -261,16 +233,14 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList)
auto bucketSpace(cmd->getBucket().getBucketSpace());
auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace));
auto &bucketDatabase(distributorBucketSpace.getBucketDatabase());
- _op = Operation::SP(new StatBucketListOperation(
- bucketDatabase, _operationGenerator, getIndex(), cmd));
+ _op = std::make_shared<StatBucketListOperation>(bucketDatabase, _operationGenerator, getIndex(), cmd);
return true;
}
IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor)
{
const DistributorConfiguration& config(getDistributor().getConfig());
- VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(),
- config.getMaxVisitorsPerNodePerClientVisitor());
+ VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor());
auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()));
_op = Operation::SP(new VisitorOperation(*this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits[cmd->getLoadType()]));
return true;
diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h
index 017979c16c0..626335e1ba6 100644
--- a/storage/src/vespa/storage/distributor/messagetracker.h
+++ b/storage/src/vespa/storage/distributor/messagetracker.h
@@ -16,8 +16,8 @@ class MessageTracker {
public:
class ToSend {
public:
- ToSend(const std::shared_ptr<api::BucketCommand>& msg, uint16_t target) :
- _msg(msg), _target(target) {};
+ ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) :
+ _msg(std::move(msg)), _target(target) {};
std::shared_ptr<api::BucketCommand> _msg;
uint16_t _target;
@@ -29,7 +29,7 @@ public:
~MessageTracker();
void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) {
- _commandQueue.push_back(ToSend(msg, target));
+ _commandQueue.emplace_back(std::move(msg), target);
}
void flushQueue(MessageSender& sender);
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 246020c191c..5b1e585a6f8 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -14,9 +14,7 @@ using document::BucketSpace;
namespace storage::distributor {
-GetOperation::GroupId::GroupId(const document::BucketId& id,
- uint32_t checksum,
- int node)
+GetOperation::GroupId::GroupId(const document::BucketId& id, uint32_t checksum, int node)
: _id(id),
_checksum(checksum),
_node(node)
@@ -48,12 +46,12 @@ GetOperation::GroupId::operator==(const GroupId& other) const
GetOperation::GetOperation(DistributorComponent& manager,
DistributorBucketSpace &bucketSpace,
- const std::shared_ptr<api::GetCommand> & msg,
+ std::shared_ptr<api::GetCommand> msg,
PersistenceOperationMetricSet& metric)
: Operation(),
_manager(manager),
_bucketSpace(bucketSpace),
- _msg(msg),
+ _msg(std::move(msg)),
_returnCode(api::ReturnCode::OK),
_doc((document::Document*)NULL),
_lastModified(0),
@@ -66,8 +64,7 @@ GetOperation::GetOperation(DistributorComponent& manager,
void
GetOperation::onClose(DistributorMessageSender& sender)
{
- _returnCode = api::ReturnCode(api::ReturnCode::ABORTED,
- "Process is shutting down");
+ _returnCode = api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down");
sendReply(sender);
}
@@ -96,30 +93,19 @@ GetOperation::findBestUnsentTarget(const GroupVector& candidates) const
}
bool
-GetOperation::sendForChecksum(DistributorMessageSender& sender,
- const document::BucketId& id,
- GroupVector& res)
+GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res)
{
const int best = findBestUnsentTarget(res);
if (best != -1) {
document::Bucket bucket(_msg->getBucket().getBucketSpace(), id);
- std::shared_ptr<api::GetCommand> command(
- std::make_shared<api::GetCommand>(
- bucket,
- _msg->getDocumentId(),
- _msg->getFieldSet(),
- _msg->getBeforeTimestamp()));
+ auto command = std::make_shared<api::GetCommand>(bucket, _msg->getDocumentId(),
+ _msg->getFieldSet(), _msg->getBeforeTimestamp());
copyMessageSettings(*_msg, *command);
- LOG(spam,
- "Sending %s to node %d",
- command->toString(true).c_str(),
- res[best].copy.getNode());
+ LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode());
- res[best].sent = sender.sendToNode(lib::NodeType::STORAGE,
- res[best].copy.getNode(),
- command);
+ res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, res[best].copy.getNode(), command);
return true;
}
@@ -145,8 +131,7 @@ GetOperation::onStart(DistributorMessageSender& sender)
};
void
-GetOperation::onReceive(DistributorMessageSender& sender,
- const std::shared_ptr<api::StorageReply>& msg)
+GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
api::GetReply* getreply = dynamic_cast<api::GetReply*>(msg.get());
assert(getreply != nullptr);
@@ -179,9 +164,7 @@ GetOperation::onReceive(DistributorMessageSender& sender,
}
// Try to send to another node in this checksum group.
- bool sent = sendForChecksum(sender,
- iter->first.getBucketId(),
- iter->second);
+ bool sent = sendForChecksum(sender, iter->first.getBucketId(), iter->second);
if (sent) {
allDone = false;
}
@@ -197,8 +180,7 @@ GetOperation::onReceive(DistributorMessageSender& sender,
if (allDone) {
LOG(debug, "Get on %s done, returning reply %s",
- _msg->getDocumentId().toString().c_str(),
- _returnCode.toString().c_str());
+ _msg->getDocumentId().toString().c_str(), _returnCode.toString().c_str());
sendReply(sender);
}
}
@@ -207,8 +189,7 @@ void
GetOperation::sendReply(DistributorMessageSender& sender)
{
if (_msg.get()) {
- std::shared_ptr<api::GetReply> repl(
- std::make_shared<api::GetReply>(*_msg, _doc, _lastModified));
+ auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified);
repl->setResult(_returnCode);
if (_returnCode.success()) {
@@ -271,11 +252,9 @@ GetOperation::assignTargetNodeGroups()
const BucketCopy& copy = e->getNodeRef(i);
if (!copy.valid()) {
- _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].
- push_back(copy);
+ _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy);
} else if (!copy.empty()) {
- _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].
- push_back(copy);
+ _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].push_back(copy);
}
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 03279a87152..198c588dfd1 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -23,10 +23,8 @@ class DistributorBucketSpace;
class GetOperation : public Operation
{
public:
- GetOperation(DistributorComponent& manager,
- DistributorBucketSpace &bucketSpace,
- const std::shared_ptr<api::GetCommand> & msg,
- PersistenceOperationMetricSet& metric);
+ GetOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
+ std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric);
void onClose(DistributorMessageSender& sender) override;
void onStart(DistributorMessageSender& sender) override;
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index d08afdacfad..2b1baa1e0d6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -3,7 +3,6 @@
#include "putoperation.h"
#include <vespa/document/fieldvalue/document.h>
-#include <vespa/log/log.h>
#include <vespa/storage/distributor/activecopy.h>
#include <vespa/storage/distributor/operationtargetresolverimpl.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
@@ -11,6 +10,7 @@
#include <vespa/vdslib/distribution/idealnodecalculatorimpl.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/log/log.h>
LOG_SETUP(".distributor.callback.doc.put");
@@ -18,22 +18,17 @@ using namespace storage::distributor;
using namespace storage;
using document::BucketSpace;
-PutOperation::PutOperation(DistributorComponent& manager,
- DistributorBucketSpace &bucketSpace,
- const std::shared_ptr<api::PutCommand> & msg,
- PersistenceOperationMetricSet& metric,
- SequencingHandle sequencingHandle)
+PutOperation::PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
+ std::shared_ptr<api::PutCommand> msg,
+ PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle)
: SequencedOperation(std::move(sequencingHandle)),
- _trackerInstance(metric,
- std::shared_ptr<api::BucketInfoReply>(new api::PutReply(*msg)),
- manager,
- msg->getTimestamp()),
+ _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), manager, msg->getTimestamp()),
_tracker(_trackerInstance),
- _msg(msg),
+ _msg(std::move(msg)),
_manager(manager),
_bucketSpace(bucketSpace)
{
-};
+}
namespace {
@@ -50,11 +45,8 @@ bool hasNode(const std::vector<uint16_t>& vec, uint16_t value) {
}
void
-PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes,
- std::vector<uint16_t>& targetNodes,
- std::vector<uint16_t>& createNodes,
- const BucketInfo& bucketInfo,
- uint32_t redundancy)
+PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes, std::vector<uint16_t>& targetNodes,
+ std::vector<uint16_t>& createNodes, const BucketInfo& bucketInfo, uint32_t redundancy)
{
// First insert all nodes that are trusted or already in the ideal state.
for (uint32_t i = 0; i < bucketInfo.getNodeCount(); i++) {
@@ -78,8 +70,7 @@ PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes,
for (uint32_t i = 0; targetNodes.size() < redundancy && i < idealNodes.size(); i++) {
if (!hasNode(targetNodes, idealNodes[i])) {
targetNodes.push_back(idealNodes[i]);
- LOG(spam, "Adding target+create node %u it's in ideal state",
- idealNodes[i]);
+ LOG(spam, "Adding target+create node %u it's in ideal state", idealNodes[i]);
createNodes.push_back(idealNodes[i]);
}
}
@@ -88,100 +79,23 @@ PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes,
std::sort(createNodes.begin(), createNodes.end());
}
-// FIXME: deprecated! remove as soon as multoperationoperation is merely
-// a haunting memory of the past since it's only used by that component!
-bool
-PutOperation::checkCreateBucket(const lib::Distribution& dist,
- const lib::ClusterState& state,
- BucketDatabase::Entry& entry,
- std::vector<uint16_t>& targetNodes,
- std::vector<MessageTracker::ToSend>& messagesToSend,
- const api::StorageCommand& originalCommand)
-{
- BucketInfo& info = entry.getBucketInfo();
-
- std::vector<uint16_t> createNodes;
- std::vector<uint16_t> idealNodes(
- dist.getIdealStorageNodes(state, entry.getBucketId(), "ui"));
-
- getTargetNodes(idealNodes,
- targetNodes,
- createNodes,
- info,
- dist.getRedundancy());
-
- ActiveList active(ActiveCopy::calculate(idealNodes, dist, entry));
- LOG(debug, "Active copies for bucket %s: %s",
- entry.getBucketId().toString().c_str(), active.toString().c_str());
- // Send create buckets for all nodes in ideal state where we don't
- // currently have copies.
- for (uint32_t i = 0; i < createNodes.size(); i++) {
- document::Bucket bucket(originalCommand.getBucket().getBucketSpace(), entry.getBucketId());
- std::shared_ptr<api::CreateBucketCommand> cbc(
- new api::CreateBucketCommand(bucket));
- if (active.contains(createNodes[i])) {
- BucketCopy copy(*entry->getNode(createNodes[i]));
- copy.setActive(true);
- entry->updateNode(copy);
- cbc->setActive(true);
- }
- LOG(debug, "Creating bucket on node %u: %s",
- createNodes[i], cbc->toString().c_str());
-
- copyMessageSettings(originalCommand, *cbc);
- messagesToSend.push_back(MessageTracker::ToSend(cbc, createNodes[i]));
- }
-
- // All nodes that we are not feeding to now will no longer be trusted.
- // TODO: Refactor?
- bool mustWrite = false;
- for (uint32_t i = 0; i < info.getNodeCount(); i++) {
- bool found = false;
- for (uint32_t j = 0; j < targetNodes.size(); j++) {
- if (info.getNodeRef(i).getNode() == targetNodes[j]) {
- LOG(spam,
- "Found matching target node %u in %s",
- targetNodes[i],
- info.getNodeRef(i).toString().c_str());
- found = true;
- break;
- }
- }
-
- if (!found && info.getNodeRef(i).trusted()) {
- LOG(spam,
- "Setting mustWrite=true since %s is trusted",
- info.getNodeRef(i).toString().c_str());
-
- info.clearTrusted(info.getNodeRef(i).getNode());
- mustWrite = true;
- }
- }
-
- return mustWrite;
-}
-
void
-PutOperation::insertDatabaseEntryAndScheduleCreateBucket(
- const OperationTargetList& copies,
- bool setOneActive,
- const api::StorageCommand& originalCommand,
- std::vector<MessageTracker::ToSend>& messagesToSend)
+PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
+ const api::StorageCommand& originalCommand,
+ std::vector<MessageTracker::ToSend>& messagesToSend)
{
document::BucketId lastBucket;
bool multipleBuckets = false;
for (uint32_t i=0, n=copies.size(); i<n; ++i) {
if (!copies[i].isNewCopy()) continue;
- if (lastBucket.getRawId() != 0 && copies[i].getBucketId() != lastBucket)
- {
+ if (lastBucket.getRawId() != 0 && copies[i].getBucketId() != lastBucket) {
multipleBuckets = true;
}
lastBucket = copies[i].getBucketId();
// Fake that we have a non-empty bucket so it isn't deleted.
// Copy is inserted with timestamp 0 such that any actual bucket info
// subsequently arriving from the storage node will always overwrite it.
- BucketCopy copy(BucketCopy::recentlyCreatedCopy(
- 0, copies[i].getNode().getIndex()));
+ BucketCopy copy(BucketCopy::recentlyCreatedCopy(0, copies[i].getNode().getIndex()));
_manager.updateBucketDatabase(document::Bucket(originalCommand.getBucket().getBucketSpace(), lastBucket), copy,
DatabaseUpdate::CREATE_IF_NONEXISTING);
}
@@ -189,17 +103,13 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(
if (setOneActive) {
assert(!multipleBuckets);
(void) multipleBuckets;
- BucketDatabase::Entry entry(
- _bucketSpace.getBucketDatabase().get(lastBucket));
+ BucketDatabase::Entry entry(_bucketSpace.getBucketDatabase().get(lastBucket));
std::vector<uint16_t> idealState(
- _bucketSpace.getDistribution().getIdealStorageNodes(
- _bucketSpace.getClusterState(), lastBucket, "ui"));
- active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(),
- entry);
- LOG(debug, "Active copies for bucket %s: %s",
- entry.getBucketId().toString().c_str(), active.toString().c_str());
+ _bucketSpace.getDistribution().getIdealStorageNodes(_bucketSpace.getClusterState(), lastBucket, "ui"));
+ active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry);
+ LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str());
for (uint32_t i=0; i<active.size(); ++i) {
- BucketCopy copy(*entry->getNode(active[i].nodeIndex));
+ BucketCopy copy(*entry->getNode(active[i]._nodeIndex));
copy.setActive(true);
entry->updateNode(copy);
}
@@ -208,8 +118,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(
for (uint32_t i=0, n=copies.size(); i<n; ++i) {
if (!copies[i].isNewCopy()) continue;
document::Bucket bucket(originalCommand.getBucket().getBucketSpace(), copies[i].getBucketId());
- std::shared_ptr<api::CreateBucketCommand> cbc(
- new api::CreateBucketCommand(bucket));
+ auto cbc = std::make_shared<api::CreateBucketCommand>(bucket);
if (setOneActive && active.contains(copies[i].getNode().getIndex())) {
cbc->setActive(true);
}
@@ -217,33 +126,22 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(
copies[i].getNode().getIndex(), cbc->toString().c_str());
copyMessageSettings(originalCommand, *cbc);
- messagesToSend.push_back(MessageTracker::ToSend(
- cbc, copies[i].getNode().getIndex()));
+ messagesToSend.emplace_back(std::move(cbc), copies[i].getNode().getIndex());
}
}
void
-PutOperation::sendPutToBucketOnNode(
- document::BucketSpace bucketSpace,
- const document::BucketId& bucketId,
- const uint16_t node,
- std::vector<PersistenceMessageTracker::ToSend>& putBatch)
+PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
+ const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch)
{
document::Bucket bucket(bucketSpace, bucketId);
- std::shared_ptr<api::PutCommand> command(
- new api::PutCommand(
- bucket,
- _msg->getDocument(),
- _msg->getTimestamp()));
- LOG(debug,
- "Sending %s to node %u",
- command->toString().c_str(),
- node);
+ auto command = std::make_shared<api::PutCommand>(bucket, _msg->getDocument(), _msg->getTimestamp());
+ LOG(debug, "Sending %s to node %u", command->toString().c_str(), node);
copyMessageSettings(*_msg, *command);
command->setUpdateTimestamp(_msg->getUpdateTimestamp());
command->setCondition(_msg->getCondition());
- putBatch.push_back(MessageTracker::ToSend(command, node));
+ putBatch.emplace_back(std::move(command), node);
}
@@ -253,10 +151,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
document::BucketIdFactory bucketIdFactory;
document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId());
- LOG(debug,
- "Received PUT %s for bucket %s",
- _msg->getDocumentId().toString().c_str(),
- bid.toString().c_str());
+ LOG(debug, "Received PUT %s for bucket %s", _msg->getDocumentId().toString().c_str(), bid.toString().c_str());
lib::ClusterState systemState = _bucketSpace.getClusterState();
@@ -276,24 +171,19 @@ PutOperation::onStart(DistributorMessageSender& sender)
lib::IdealNodeCalculatorImpl idealNodeCalculator;
idealNodeCalculator.setDistribution(_bucketSpace.getDistribution());
idealNodeCalculator.setClusterState(_bucketSpace.getClusterState());
- OperationTargetResolverImpl targetResolver(
- _bucketSpace.getBucketDatabase(),
- idealNodeCalculator,
+ OperationTargetResolverImpl targetResolver(_bucketSpace.getBucketDatabase(), idealNodeCalculator,
_manager.getDistributor().getConfig().getMinimalBucketSplit(),
_bucketSpace.getDistribution().getRedundancy(),
_msg->getBucket().getBucketSpace());
- OperationTargetList targets(targetResolver.getTargets(
- OperationTargetResolver::PUT, bid));
+ OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid));
for (size_t i = 0; i < targets.size(); ++i) {
if (_manager.getDistributor().getPendingMessageTracker().
- hasPendingMessage(targets[i].getNode().getIndex(),
- targets[i].getBucket(),
+ hasPendingMessage(targets[i].getNode().getIndex(), targets[i].getBucket(),
api::MessageType::DELETEBUCKET_ID))
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
- "Bucket was being deleted while we got a PUT, failing "
- "operation to be safe"));
+ "Bucket was being deleted while we got a PUT, failing operation to be safe"));
return;
}
}
@@ -304,11 +194,8 @@ PutOperation::onStart(DistributorMessageSender& sender)
std::vector<PersistenceMessageTracker::ToSend> createBucketBatch;
if (targets.hasAnyNewCopies()) {
- insertDatabaseEntryAndScheduleCreateBucket(
- targets,
- shouldImplicitlyActivateReplica(targets),
- *_msg,
- createBucketBatch);
+ insertDatabaseEntryAndScheduleCreateBucket(targets, shouldImplicitlyActivateReplica(targets),
+ *_msg, createBucketBatch);
}
if (!createBucketBatch.empty()) {
@@ -320,9 +207,8 @@ PutOperation::onStart(DistributorMessageSender& sender)
// Now send PUTs
for (uint32_t i = 0; i < targets.size(); i++) {
const OperationTarget& target(targets[i]);
- sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(),
- target.getBucketId(), target.getNode().getIndex(),
- putBatch);
+ sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), target.getBucketId(),
+ target.getNode().getIndex(), putBatch);
}
if (putBatch.size()) {
@@ -330,18 +216,15 @@ PutOperation::onStart(DistributorMessageSender& sender)
} else {
const char* error = "Can't store document: No storage nodes available";
LOG(debug, "%s", error);
- _tracker.fail(sender,
- api::ReturnCode(api::ReturnCode::NOT_CONNECTED, error));
+ _tracker.fail(sender, api::ReturnCode(api::ReturnCode::NOT_CONNECTED, error));
return;
}
// Check whether buckets are large enough to be split.
// TODO(vekterli): only check entries for sendToExisting?
for (uint32_t i = 0; i < entries.size(); ++i) {
- _manager.getDistributor().checkBucketForSplit(
- _msg->getBucket().getBucketSpace(),
- entries[i],
- _msg->getPriority());
+ _manager.getDistributor().checkBucketForSplit(_msg->getBucket().getBucketSpace(),
+ entries[i], _msg->getPriority());
}
_tracker.flushQueue(sender);
@@ -355,8 +238,7 @@ PutOperation::onStart(DistributorMessageSender& sender)
}
bool
-PutOperation::shouldImplicitlyActivateReplica(
- const OperationTargetList& targets) const
+PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const
{
const auto& config(_manager.getDistributor().getConfig());
if (config.isBucketActivationDisabled()) {
@@ -366,8 +248,7 @@ PutOperation::shouldImplicitlyActivateReplica(
}
void
-PutOperation::onReceive(DistributorMessageSender& sender,
- const std::shared_ptr<api::StorageReply> & msg)
+PutOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
{
LOG(debug, "Received %s", msg->toString(true).c_str());
_tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg));
@@ -380,4 +261,3 @@ PutOperation::onClose(DistributorMessageSender& sender)
LOG(debug, "%s", error);
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, error));
}
-
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index c27f2ee2266..745a4f57a35 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -24,10 +24,8 @@ class OperationTargetList;
class PutOperation : public SequencedOperation
{
public:
- PutOperation(DistributorComponent& manager,
- DistributorBucketSpace &bucketSpace,
- const std::shared_ptr<api::PutCommand> & msg,
- PersistenceOperationMetricSet& metric,
+ PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace,
+ std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle = SequencingHandle());
void onStart(DistributorMessageSender& sender) override;
@@ -36,39 +34,18 @@ public:
void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override;
void onClose(DistributorMessageSender& sender) override;
- /**
- * Gets the ideal state of the given bucket, and adds all nodes from the
- * ideal state to targetNodes. Also schedules create bucket messages for
- * all buckets currently not in the nodes list, and sets nodes in the node
- * list not in the ideal state to untrusted.
- */
- static bool checkCreateBucket(const lib::Distribution& distribution,
- const lib::ClusterState& state,
- BucketDatabase::Entry& e,
- std::vector<uint16_t>& targetNodes,
- std::vector<MessageTracker::ToSend>& messagesToSend,
- const api::StorageCommand& originalCommand);
-
- static void getTargetNodes(const std::vector<uint16_t>& idealNodes,
- std::vector<uint16_t>& targetNodes,
- std::vector<uint16_t>& createNodes,
- const BucketInfo& bucketInfo,
- uint32_t redundancy);
+ static void getTargetNodes(const std::vector<uint16_t>& idealNodes, std::vector<uint16_t>& targetNodes,
+ std::vector<uint16_t>& createNodes, const BucketInfo& bucketInfo, uint32_t redundancy);
private:
PersistenceMessageTrackerImpl _trackerInstance;
PersistenceMessageTracker& _tracker;
- void insertDatabaseEntryAndScheduleCreateBucket(
- const OperationTargetList& copies,
- bool setOneActive,
- const api::StorageCommand& originalCommand,
- std::vector<MessageTracker::ToSend>& messagesToSend);
+ void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
+ const api::StorageCommand& originalCommand,
+ std::vector<MessageTracker::ToSend>& messagesToSend);
- void sendPutToBucketOnNode(
- document::BucketSpace bucketSpace,
- const document::BucketId& bucketId,
- const uint16_t node,
- std::vector<PersistenceMessageTracker::ToSend>& putBatch);
+ void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
+ const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch);
bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index db120880267..0b0fc05763f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -51,9 +51,7 @@ struct IntermediateMessageSender : DistributorMessageSender {
DistributorMessageSender& forward;
std::shared_ptr<api::StorageReply> _reply;
- IntermediateMessageSender(SentMessageMap& mm,
- const std::shared_ptr<Operation>& cb,
- DistributorMessageSender & fwd);
+ IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorMessageSender & fwd);
~IntermediateMessageSender();
void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override {
@@ -79,14 +77,13 @@ struct IntermediateMessageSender : DistributorMessageSender {
};
IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
- const std::shared_ptr<Operation>& cb,
+ std::shared_ptr<Operation> cb,
DistributorMessageSender & fwd)
: msgMap(mm),
- callback(cb),
+ callback(std::move(cb)),
forward(fwd)
-{
-}
-IntermediateMessageSender::~IntermediateMessageSender() { }
+{ }
+IntermediateMessageSender::~IntermediateMessageSender() = default;
}
@@ -161,13 +158,10 @@ void
TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
{
_mode = Mode::FAST_PATH;
- std::shared_ptr<UpdateOperation> updateOperation(
- new UpdateOperation(_manager, _bucketSpace, _updateCmd, _updateMetric));
-
- IntermediateMessageSender intermediate(
- _sentMessageMap, updateOperation, sender);
- updateOperation->start(intermediate,
- _manager.getClock().getTimeInMillis());
+ auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric);
+ UpdateOperation & op = *updateOperation;
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender);
+ op.start(intermediate, _manager.getClock().getTimeInMillis());
transitionTo(SendState::UPDATES_SENT);
if (intermediate._reply.get()) {
@@ -178,30 +172,21 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender)
void
TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender)
{
- LOG(debug, "Update(%s) safe path: sending Get commands",
- _updateCmd->getDocumentId().toString().c_str());
+ LOG(debug, "Update(%s) safe path: sending Get commands", _updateCmd->getDocumentId().toString().c_str());
_mode = Mode::SLOW_PATH;
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
- std::shared_ptr<api::GetCommand> get(
- std::make_shared<api::GetCommand>(
- bucket,
- _updateCmd->getDocumentId(),
- "[all]"));
+ auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]");
copyMessageSettings(*_updateCmd, *get);
- std::shared_ptr<GetOperation> getOperation(
- std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric));
-
- IntermediateMessageSender intermediate(
- _sentMessageMap, getOperation, sender);
- getOperation->start(intermediate,
- _manager.getClock().getTimeInMillis());
+ auto getOperation = std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric);
+ GetOperation & op = *getOperation;
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender);
+ op.start(intermediate, _manager.getClock().getTimeInMillis());
transitionTo(SendState::GETS_SENT);
if (intermediate._reply.get()) {
assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
- handleSafePathReceivedGet(
- sender, static_cast<api::GetReply&>(*intermediate._reply));
+ handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply));
}
}
@@ -226,17 +211,14 @@ bool
TwoPhaseUpdateOperation::lostBucketOwnershipBetweenPhases() const
{
document::Bucket updateDocBucket(_updateCmd->getBucket().getBucketSpace(), _updateDocBucketId);
- BucketOwnership bo(_manager.checkOwnershipInPendingAndCurrentState(
- updateDocBucket));
+ BucketOwnership bo(_manager.checkOwnershipInPendingAndCurrentState(updateDocBucket));
return !bo.isOwned();
}
void
-TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(
- DistributorMessageSender& sender)
+TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorMessageSender& sender)
{
- sendReplyWithResult(
- sender,
+ sendReplyWithResult(sender,
api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
"Distributor lost ownership of bucket between "
"executing the read and write phases of a two-"
@@ -244,31 +226,24 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(
}
void
-TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(
- std::shared_ptr<document::Document> doc,
- api::Timestamp putTimestamp,
- DistributorMessageSender& sender)
+TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc,
+ api::Timestamp putTimestamp, DistributorMessageSender& sender)
{
if (lostBucketOwnershipBetweenPhases()) {
sendLostOwnershipTransientErrorReply(sender);
return;
}
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0));
- std::shared_ptr<api::PutCommand> put(
- new api::PutCommand(bucket, doc, putTimestamp));
+ auto put = std::make_shared<api::PutCommand>(bucket, doc, putTimestamp);
copyMessageSettings(*_updateCmd, *put);
- std::shared_ptr<PutOperation> putOperation(
- new PutOperation(_manager, _bucketSpace, put, _putMetric));
-
- IntermediateMessageSender intermediate(
- _sentMessageMap, putOperation, sender);
- putOperation->start(intermediate,
- _manager.getClock().getTimeInMillis());
+ auto putOperation = std::make_shared<PutOperation>(_manager, _bucketSpace, std::move(put), _putMetric);
+ PutOperation & op = *putOperation;
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender);
+ op.start(intermediate, _manager.getClock().getTimeInMillis());
transitionTo(SendState::PUTS_SENT);
LOG(debug, "Update(%s): sending Put commands with doc %s",
- _updateCmd->getDocumentId().toString().c_str(),
- doc->toString(true).c_str());
+ _updateCmd->getDocumentId().toString().c_str(), doc->toString(true).c_str());
if (intermediate._reply.get()) {
sendReplyWithResult(sender, intermediate._reply->getResult());
@@ -276,9 +251,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(
}
void
-TwoPhaseUpdateOperation::onReceive(
- DistributorMessageSender& sender,
- const std::shared_ptr<api::StorageReply>& msg)
+TwoPhaseUpdateOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
if (_mode == Mode::FAST_PATH) {
handleFastPathReceive(sender, msg);
@@ -288,9 +261,8 @@ TwoPhaseUpdateOperation::onReceive(
}
void
-TwoPhaseUpdateOperation::handleFastPathReceive(
- DistributorMessageSender& sender,
- const std::shared_ptr<api::StorageReply>& msg)
+TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender,
+ const std::shared_ptr<api::StorageReply>& msg)
{
if (msg->getType() == api::MessageType::GET_REPLY) {
assert(_sendState == SendState::GETS_SENT);
@@ -308,50 +280,40 @@ TwoPhaseUpdateOperation::handleFastPathReceive(
if (!getReply.getDocument().get()) {
// Weird, document is no longer there ... Just fail.
- sendReplyWithResult(sender, api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, ""));
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, ""));
return;
}
- schedulePutsWithUpdatedDocument(getReply.getDocument(),
- _manager.getUniqueTimestamp(),
- sender);
+ schedulePutsWithUpdatedDocument(getReply.getDocument(), _manager.getUniqueTimestamp(), sender);
return;
}
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
- IntermediateMessageSender intermediate(_sentMessageMap, callback, sender);
- callback->receive(intermediate, msg);
+ Operation & callbackOp = *callback;
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender);
+ callbackOp.receive(intermediate, msg);
if (msg->getType() == api::MessageType::UPDATE_REPLY) {
if (intermediate._reply.get()) {
assert(_sendState == SendState::UPDATES_SENT);
addTraceFromReply(*intermediate._reply);
- UpdateOperation& cb = static_cast<UpdateOperation&> (*callback);
+ UpdateOperation& cb = static_cast<UpdateOperation&> (callbackOp);
- std::pair<document::BucketId, uint16_t> bestNode =
- cb.getNewestTimestampLocation();
+ std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation();
if (!intermediate._reply->getResult().success() ||
bestNode.first == document::BucketId(0)) {
// Failed or was consistent
sendReply(sender, intermediate._reply);
} else {
- LOG(debug, "Update(%s) fast path: was inconsistent!",
- _updateCmd->getDocumentId().toString().c_str());
+ LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str());
_updateReply = intermediate._reply;
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first);
- std::shared_ptr<api::GetCommand> cmd(
- new api::GetCommand(bucket,
- _updateCmd->getDocumentId(),
- "[all]"));
+ auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]");
copyMessageSettings(*_updateCmd, *cmd);
- sender.sendToNode(
- lib::NodeType::STORAGE,
- bestNode.second,
- cmd);
+ sender.sendToNode(lib::NodeType::STORAGE, bestNode.second, cmd);
transitionTo(SendState::GETS_SENT);
}
}
@@ -365,15 +327,15 @@ TwoPhaseUpdateOperation::handleFastPathReceive(
}
void
-TwoPhaseUpdateOperation::handleSafePathReceive(
- DistributorMessageSender& sender,
- const std::shared_ptr<api::StorageReply>& msg)
+TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
+ const std::shared_ptr<api::StorageReply>& msg)
{
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
assert(callback.get());
+ Operation & callbackOp = *callback;
- IntermediateMessageSender intermediate(_sentMessageMap, callback, sender);
- callback->receive(intermediate, msg);
+ IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender);
+ callbackOp.receive(intermediate, msg);
if (!intermediate._reply.get()) {
return; // Not enough replies received yet or we're draining callbacks.
@@ -381,21 +343,17 @@ TwoPhaseUpdateOperation::handleSafePathReceive(
addTraceFromReply(*intermediate._reply);
if (_sendState == SendState::GETS_SENT) {
assert(intermediate._reply->getType() == api::MessageType::GET_REPLY);
- handleSafePathReceivedGet(
- sender, static_cast<api::GetReply&>(*intermediate._reply));
+ handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply));
} else if (_sendState == SendState::PUTS_SENT) {
assert(intermediate._reply->getType() == api::MessageType::PUT_REPLY);
- handleSafePathReceivedPut(
- sender, static_cast<api::PutReply&>(*intermediate._reply));
+ handleSafePathReceivedPut(sender, static_cast<api::PutReply&>(*intermediate._reply));
} else {
assert(!"Unknown state");
}
}
void
-TwoPhaseUpdateOperation::handleSafePathReceivedGet(
- DistributorMessageSender& sender,
- api::GetReply& reply)
+TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sender, api::GetReply& reply)
{
LOG(debug, "Update(%s): got Get reply with code %s",
_updateCmd->getDocumentId().toString().c_str(),
@@ -411,9 +369,8 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(
if (reply.getDocument().get()) {
api::Timestamp receivedTimestamp = reply.getLastModifiedTimestamp();
if (!satisfiesUpdateTimestampConstraint(receivedTimestamp)) {
- sendReplyWithResult(sender, api::ReturnCode(
- api::ReturnCode::OK,
- "No document with requested timestamp found"));
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::OK,
+ "No document with requested timestamp found"));
return;
}
if (!processAndMatchTasCondition(sender, *reply.getDocument())) {
@@ -425,9 +382,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(
replyWithTasFailure(sender, "Document did not exist");
return;
} else if (shouldCreateIfNonExistent()) {
- LOG(debug,
- "No existing documents found for %s, creating blank "
- "document to update",
+ LOG(debug, "No existing documents found for %s, creating blank document to update",
_updateCmd->getUpdate()->getId().toString().c_str());
docToUpdate = createBlankDocument();
setUpdatedForTimestamp(putTimestamp);
@@ -439,22 +394,19 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(
applyUpdateToDocument(*docToUpdate);
schedulePutsWithUpdatedDocument(docToUpdate, putTimestamp, sender);
} catch (vespalib::Exception& e) {
- sendReplyWithResult(sender, api::ReturnCode(
- api::ReturnCode::INTERNAL_FAILURE, e.getMessage()));
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.getMessage()));
}
}
bool
-TwoPhaseUpdateOperation::processAndMatchTasCondition(
- DistributorMessageSender& sender,
- const document::Document& candidateDoc)
+TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorMessageSender& sender,
+ const document::Document& candidateDoc)
{
if (!hasTasCondition()) {
return true; // No condition; nothing to do here.
}
- document::select::Parser parser(*_manager.getTypeRepo(),
- _manager.getBucketIdFactory());
+ document::select::Parser parser(*_manager.getTypeRepo(), _manager.getBucketIdFactory());
std::unique_ptr<document::select::Node> selection;
try {
selection = parser.parse(_updateCmd->getCondition().getSelection());
@@ -479,12 +431,9 @@ TwoPhaseUpdateOperation::hasTasCondition() const noexcept
}
void
-TwoPhaseUpdateOperation::replyWithTasFailure(
- DistributorMessageSender& sender,
- vespalib::stringref message)
+TwoPhaseUpdateOperation::replyWithTasFailure(DistributorMessageSender& sender, vespalib::stringref message)
{
- sendReplyWithResult(sender, api::ReturnCode(
- api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, message));
+ sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, message));
}
void
@@ -502,9 +451,7 @@ TwoPhaseUpdateOperation::createBlankDocument() const
}
void
-TwoPhaseUpdateOperation::handleSafePathReceivedPut(
- DistributorMessageSender& sender,
- const api::PutReply& reply)
+TwoPhaseUpdateOperation::handleSafePathReceivedPut(DistributorMessageSender& sender, const api::PutReply& reply)
{
sendReplyWithResult(sender, reply.getResult());
}
@@ -522,11 +469,9 @@ TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const
}
bool
-TwoPhaseUpdateOperation::satisfiesUpdateTimestampConstraint(
- api::Timestamp ts) const
+TwoPhaseUpdateOperation::satisfiesUpdateTimestampConstraint(api::Timestamp ts) const
{
- return (_updateCmd->getOldTimestamp() == 0
- || _updateCmd->getOldTimestamp() == ts);
+ return (_updateCmd->getOldTimestamp() == 0 || _updateCmd->getOldTimestamp() == ts);
}
void
@@ -540,11 +485,8 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
while (true) {
std::shared_ptr<Operation> cb = _sentMessageMap.pop();
- if (cb.get()) {
- IntermediateMessageSender intermediate(
- _sentMessageMap,
- std::shared_ptr<Operation > (),
- sender);
+ if (cb) {
+ IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation > (), sender);
cb->onClose(intermediate);
// We will _only_ forward UpdateReply instances up, since those
// are created by UpdateOperation and are bound to the original
@@ -552,9 +494,7 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) {
// to synthetic commands created for gets/puts and should never be
// propagated to the outside world.
auto candidateReply = std::move(intermediate._reply);
- if (candidateReply
- && candidateReply->getType() == api::MessageType::UPDATE_REPLY)
- {
+ if (candidateReply && candidateReply->getType() == api::MessageType::UPDATE_REPLY) {
assert(_mode == Mode::FAST_PATH);
sendReply(sender, candidateReply); // Sets _replySent
}
diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp
index cafac5b67a5..0a0abb8d417 100644
--- a/storage/src/vespa/storage/distributor/statecheckers.cpp
+++ b/storage/src/vespa/storage/distributor/statecheckers.cpp
@@ -1003,7 +1003,7 @@ BucketStateStateChecker::shouldSkipActivationDueToMaintenance(
const StateChecker::Context& c) const
{
for (uint32_t i = 0; i < activeNodes.size(); ++i) {
- const BucketCopy* cp(c.entry->getNode(activeNodes[i].nodeIndex));
+ const BucketCopy* cp(c.entry->getNode(activeNodes[i]._nodeIndex));
if (!cp || cp->active()) {
continue;
}
@@ -1050,12 +1050,12 @@ BucketStateStateChecker::check(StateChecker::Context& c)
vespalib::asciistream reason;
std::vector<uint16_t> operationNodes;
for (uint32_t i=0; i<activeNodes.size(); ++i) {
- const BucketCopy* cp = c.entry->getNode(activeNodes[i].nodeIndex);
+ const BucketCopy* cp = c.entry->getNode(activeNodes[i]._nodeIndex);
if (cp == 0 || cp->active()) {
continue;
}
- operationNodes.push_back(activeNodes[i].nodeIndex);
- reason << "[Setting node " << activeNodes[i].nodeIndex << " as active: "
+ operationNodes.push_back(activeNodes[i]._nodeIndex);
+ reason << "[Setting node " << activeNodes[i]._nodeIndex << " as active: "
<< activeNodes[i].getReason() << "]";
}
@@ -1067,7 +1067,7 @@ BucketStateStateChecker::check(StateChecker::Context& c)
}
bool shouldBeActive = false;
for (uint32_t j=0; j<activeNodes.size(); ++j) {
- if (activeNodes[j].nodeIndex == cp.getNode()) {
+ if (activeNodes[j]._nodeIndex == cp.getNode()) {
shouldBeActive = true;
}
}
@@ -1083,7 +1083,7 @@ BucketStateStateChecker::check(StateChecker::Context& c)
std::vector<uint16_t> activeNodeIndexes;
for (uint32_t i=0; i<activeNodes.size(); ++i) {
- activeNodeIndexes.push_back(activeNodes[i].nodeIndex);
+ activeNodeIndexes.push_back(activeNodes[i]._nodeIndex);
}
auto op = std::make_unique<SetBucketStateOperation>(
c.component.getClusterName(),
@@ -1094,11 +1094,9 @@ BucketStateStateChecker::check(StateChecker::Context& c)
// we currently always send high pri activations.
// Otherwise, only > 1 operationNodes if we have copies to deactivate.
if (activeNodes.size() > 1 || operationNodes.size() == 1) {
- op->setPriority(c.distributorConfig.getMaintenancePriorities()
- .activateNoExistingActive);
+ op->setPriority(c.distributorConfig.getMaintenancePriorities().activateNoExistingActive);
} else {
- op->setPriority(c.distributorConfig.getMaintenancePriorities()
- .activateWithExistingActive);
+ op->setPriority(c.distributorConfig.getMaintenancePriorities().activateWithExistingActive);
}
op->setDetailedReason(reason.str());
return Result::createStoredResult(std::move(op), MaintenancePriority::VERY_HIGH);
@@ -1137,8 +1135,7 @@ GarbageCollectionStateChecker::check(Context& c)
<< ", configured interval "
<< c.distributorConfig.getGarbageCollectionInterval() << "]";
- op->setPriority(c.distributorConfig.getMaintenancePriorities()
- .garbageCollection);
+ op->setPriority(c.distributorConfig.getMaintenancePriorities().garbageCollection);
op->setDetailedReason(reason.c_str());
return Result::createStoredResult(std::move(op), MaintenancePriority::VERY_LOW);
} else {