diff options
author | Henning Baldersheim <balder@oath.com> | 2018-04-03 20:17:09 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@oath.com> | 2018-04-03 20:17:09 +0200 |
commit | 26e56f2bca05f82d8aa0c9e5da59fbfc00918161 (patch) | |
tree | 68239daea817457e0d8a5b918f03d3fa8673b499 /storage | |
parent | 69c50d6c403f1857cbdd081b1d6bf67581e5cbc6 (diff) |
Follow up on code review comments and move some shared_ptr's
Diffstat (limited to 'storage')
11 files changed, 201 insertions, 464 deletions
diff --git a/storage/src/vespa/storage/distributor/activecopy.cpp b/storage/src/vespa/storage/distributor/activecopy.cpp index 234fddacfdf..4174ddb726f 100644 --- a/storage/src/vespa/storage/distributor/activecopy.cpp +++ b/storage/src/vespa/storage/distributor/activecopy.cpp @@ -22,8 +22,8 @@ namespace std { namespace storage::distributor { -ActiveCopy::ActiveCopy(uint16_t node, BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState) : - nodeIndex(node), +ActiveCopy::ActiveCopy(uint16_t node, const BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState) : + _nodeIndex(node), _ideal(0xffff) { const BucketCopy* copy = e->getNode(node); @@ -32,7 +32,10 @@ ActiveCopy::ActiveCopy(uint16_t node, BucketDatabase::Entry& e, const std::vecto _trusted = copy->trusted(); _active = copy->active(); for (uint32_t i=0; i<idealState.size(); ++i) { - if (idealState[i] == node) _ideal = i; + if (idealState[i] == node) { + _ideal = i; + break; + } } } @@ -63,7 +66,7 @@ ActiveCopy::getReason() const { std::ostream& operator<<(std::ostream& out, const ActiveCopy & e) { - out << "Entry(Node " << e.nodeIndex; + out << "Entry(Node " << e._nodeIndex; if (e._ready) out << ", ready"; if (e._trusted) out << ", trusted"; if (e._ideal < 0xffff) out << ", ideal pri " << e._ideal; @@ -79,7 +82,7 @@ namespace { if (e1._trusted != e2._trusted) return e1._trusted; if (e1._ideal != e2._ideal) return e1._ideal < e2._ideal; if (e1._active != e2._active) return e1._active; - return e1.nodeIndex < e2.nodeIndex; + return e1._nodeIndex < e2._nodeIndex; } }; @@ -96,8 +99,8 @@ namespace { const std::vector<uint16_t>& idealState, std::vector<ActiveCopy>& result) { - for (uint32_t i=0; i<nodeIndexes.size(); ++i) { - result.push_back(ActiveCopy(nodeIndexes[i], e, idealState)); + for (uint16_t nodeIndex : nodeIndexes) { + result.emplace_back(nodeIndex, e, idealState); } } } @@ -115,11 +118,10 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState, BucketDatabase::Entry& e) { DEBUG(std::cerr << "Ideal state is " << idealState << "\n"); - std::vector<ActiveCopy> result; std::vector<uint16_t> validNodesWithCopy; buildValidNodeIndexList(e, validNodesWithCopy); if (validNodesWithCopy.empty()) { - return result; + return ActiveList(); } typedef std::vector<uint16_t> IndexList; std::vector<IndexList> groups; @@ -128,6 +130,7 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState, } else { groups.push_back(validNodesWithCopy); } + std::vector<ActiveCopy> result; for (uint32_t i=0; i<groups.size(); ++i) { std::vector<ActiveCopy> entries; buildNodeList(e, groups[i], idealState, entries); @@ -136,7 +139,7 @@ ActiveCopy::calculate(const std::vector<uint16_t>& idealState, DEBUG(std::cerr << "Best copy " << *best << "\n"); result.push_back(ActiveCopy(*best)); } - return ActiveList(result); + return ActiveList(std::move(result)); } void @@ -147,13 +150,13 @@ ActiveList::print(std::ostream& out, bool verbose, if (verbose) { for (size_t i=0; i<_v.size(); ++i) { out << "\n" << indent << " " - << _v[i].nodeIndex << " " << _v[i].getReason(); + << _v[i]._nodeIndex << " " << _v[i].getReason(); } if (!_v.empty()) out << "\n" << indent; } else { - if (!_v.empty()) out << _v[0].nodeIndex; + if (!_v.empty()) out << _v[0]._nodeIndex; for (size_t i=1; i<_v.size(); ++i) { - out << " " << _v[i].nodeIndex; + out << " " << _v[i]._nodeIndex; } } out << "]"; @@ -163,7 +166,7 @@ bool ActiveList::contains(uint16_t node) const { for (uint32_t i=0; i<_v.size(); ++i) { - if (node == _v[i].nodeIndex) return true; + if (node == _v[i]._nodeIndex) return true; } return false; } diff --git a/storage/src/vespa/storage/distributor/activecopy.h b/storage/src/vespa/storage/distributor/activecopy.h index 8fcaf814947..d9f83be3748 100644 --- a/storage/src/vespa/storage/distributor/activecopy.h +++ b/storage/src/vespa/storage/distributor/activecopy.h @@ -10,9 +10,8 @@ namespace storage::distributor { class ActiveList; struct ActiveCopy { -public: - ActiveCopy() : nodeIndex(-1), _ideal(-1), _ready(false), _trusted(false), _active(false) { } - ActiveCopy(uint16_t node, BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState); + ActiveCopy() : _nodeIndex(-1), _ideal(-1), _ready(false), _trusted(false), _active(false) { } + ActiveCopy(uint16_t node, const BucketDatabase::Entry& e, const std::vector<uint16_t>& idealState); vespalib::string getReason() const; friend std::ostream& operator<<(std::ostream& out, const ActiveCopy& e); @@ -20,7 +19,7 @@ public: static ActiveList calculate(const std::vector<uint16_t>& idealState, const lib::Distribution&, BucketDatabase::Entry&); - uint16_t nodeIndex; + uint16_t _nodeIndex; uint16_t _ideal; bool _ready; bool _trusted; @@ -32,7 +31,7 @@ class ActiveList : public vespalib::Printable { public: ActiveList() {} - ActiveList(std::vector<ActiveCopy>& v) { _v.swap(v); } + ActiveList(std::vector<ActiveCopy>&& v) : _v(std::move(v)) { } ActiveCopy& operator[](size_t i) { return _v[i]; } const ActiveCopy& operator[](size_t i) const { return _v[i]; } diff --git a/storage/src/vespa/storage/distributor/distributormessagesender.cpp b/storage/src/vespa/storage/distributor/distributormessagesender.cpp index d87488b76c4..d40bd4bd9c2 100644 --- a/storage/src/vespa/storage/distributor/distributormessagesender.cpp +++ b/storage/src/vespa/storage/distributor/distributormessagesender.cpp @@ -6,20 +6,14 @@ namespace storage::distributor { uint64_t -DistributorMessageSender::sendToNode( - const lib::NodeType& nodeType, - uint16_t node, - const std::shared_ptr<api::StorageCommand> & cmd, - bool useDocumentAPI) +DistributorMessageSender::sendToNode(const lib::NodeType& nodeType, uint16_t node, + const std::shared_ptr<api::StorageCommand> & cmd, bool useDocumentAPI) { cmd->setSourceIndex(getDistributorIndex()); - cmd->setAddress(api::StorageMessageAddress( - getClusterName(), - nodeType, - node, - (useDocumentAPI - ? api::StorageMessageAddress::DOCUMENT - : api::StorageMessageAddress::STORAGE))); + cmd->setAddress(api::StorageMessageAddress(getClusterName(), nodeType, node, + (useDocumentAPI + ? api::StorageMessageAddress::DOCUMENT + : api::StorageMessageAddress::STORAGE))); uint64_t msgId = cmd->getMsgId(); sendCommand(cmd); return msgId; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 0c4c67bb94f..bb8c2b0608a 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -25,22 +25,18 @@ LOG_SETUP(".distributor.manager"); namespace storage::distributor { -ExternalOperationHandler::ExternalOperationHandler( - Distributor& owner, - DistributorBucketSpaceRepo& bucketSpaceRepo, - const MaintenanceOperationGenerator& gen, - DistributorComponentRegister& compReg) +ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, + const MaintenanceOperationGenerator& gen, + DistributorComponentRegister& compReg) : DistributorComponent(owner, bucketSpaceRepo, compReg, "External operation handler"), _operationGenerator(gen), _rejectFeedBeforeTimeReached() // At epoch { } -ExternalOperationHandler::~ExternalOperationHandler() { } +ExternalOperationHandler::~ExternalOperationHandler() = default; bool -ExternalOperationHandler::handleMessage( - const std::shared_ptr<api::StorageMessage>& msg, - Operation::SP& op) +ExternalOperationHandler::handleMessage(const std::shared_ptr<api::StorageMessage>& msg, Operation::SP& op) { _op = Operation::SP(); bool retVal = msg->callHandler(*this, msg); @@ -52,10 +48,8 @@ api::ReturnCode ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime) { std::ostringstream ss; - auto now_sec(std::chrono::duration_cast<std::chrono::seconds>( - unsafeTime.time_since_epoch())); - auto future_sec(std::chrono::duration_cast<std::chrono::seconds>( - _rejectFeedBeforeTimeReached.time_since_epoch())); + auto now_sec(std::chrono::duration_cast<std::chrono::seconds>(unsafeTime.time_since_epoch())); + auto future_sec(std::chrono::duration_cast<std::chrono::seconds>(_rejectFeedBeforeTimeReached.time_since_epoch())); ss << "Operation received at time " << now_sec.count() << ", which is before bucket ownership transfer safe time of " << future_sec.count(); @@ -65,8 +59,7 @@ ExternalOperationHandler::makeSafeTimeRejectionResult(TimePoint unsafeTime) bool ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) { - const auto now = TimePoint(std::chrono::seconds( - getClock().getTimeInSeconds().getTime())); + const auto now = TimePoint(std::chrono::seconds(getClock().getTimeInSeconds().getTime())); if (now < _rejectFeedBeforeTimeReached) { api::StorageReply::UP reply(cmd.makeReply()); reply->setResult(makeSafeTimeRejectionResult(now)); @@ -77,18 +70,14 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) } bool -ExternalOperationHandler::checkTimestampMutationPreconditions( - api::StorageCommand& cmd, - const document::BucketId &bucketId, - PersistenceOperationMetricSet& persistenceMetrics) +ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageCommand& cmd, + const document::BucketId &bucketId, + PersistenceOperationMetricSet& persistenceMetrics) { document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId); if (!checkDistribution(cmd, bucket)) { - LOG(debug, - "Distributor manager received %s, bucket %s with wrong " - "distribution", - cmd.toString().c_str(), - bucket.toString().c_str()); + LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", + cmd.toString().c_str(), bucket.toString().c_str()); persistenceMetrics.failures.wrongdistributor++; return false; @@ -101,13 +90,12 @@ ExternalOperationHandler::checkTimestampMutationPreconditions( } std::shared_ptr<api::StorageMessage> -ExternalOperationHandler::makeConcurrentMutationRejectionReply( - api::StorageCommand& cmd, - const document::DocumentId& docId, - PersistenceOperationMetricSet& persistenceMetrics) const { - auto err_msg = vespalib::make_string( - "A mutating operation for document '%s' is already in progress", - docId.toString().c_str()); +ExternalOperationHandler::makeConcurrentMutationRejectionReply(api::StorageCommand& cmd, + const document::DocumentId& docId, + PersistenceOperationMetricSet& persistenceMetrics) const +{ + auto err_msg = vespalib::make_string("A mutating operation for document '%s' is already in progress", + docId.toString().c_str()); LOG(debug, "Aborting incoming %s operation: %s", cmd.getType().toString().c_str(), err_msg.c_str()); persistenceMetrics.failures.concurrent_mutations++; api::StorageReply::UP reply(cmd.makeReply()); @@ -184,12 +172,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Remove) auto handle = _mutationSequencer.try_acquire(cmd->getDocumentId()); if (allowMutation(handle)) { auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = std::make_shared<RemoveOperation>( - *this, - distributorBucketSpace, - cmd, - getMetrics().removes[cmd->getLoadType()], - std::move(handle)); + _op = std::make_shared<RemoveOperation>(*this, distributorBucketSpace, cmd, + getMetrics().removes[cmd->getLoadType()], std::move(handle)); } else { sendUp(makeConcurrentMutationRejectionReply(*cmd, cmd->getDocumentId(), metrics)); } @@ -204,20 +188,14 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid); if (!checkDistribution(*cmd, bucket)) { - LOG(debug, - "Distributor manager received %s with wrong distribution", - cmd->toString().c_str()); + LOG(debug, "Distributor manager received %s with wrong distribution", cmd->toString().c_str()); - getMetrics().removelocations[cmd->getLoadType()]. - failures.wrongdistributor++; + getMetrics().removelocations[cmd->getLoadType()].failures.wrongdistributor++; return true; } - _op = Operation::SP(new RemoveLocationOperation( - *this, - _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), - cmd, - getMetrics().removelocations[cmd->getLoadType()])); + _op = std::make_shared<RemoveLocationOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + cmd, getMetrics().removelocations[cmd->getLoadType()]); return true; } @@ -225,21 +203,15 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); if (!checkDistribution(*cmd, bucket)) { - LOG(debug, - "Distributor manager received get for %s, " - "bucket %s with wrong distribution", - cmd->getDocumentId().toString().c_str(), - bucket.toString().c_str()); + LOG(debug, "Distributor manager received get for %s, bucket %s with wrong distribution", + cmd->getDocumentId().toString().c_str(), bucket.toString().c_str()); getMetrics().gets[cmd->getLoadType()].failures.wrongdistributor++; return true; } - _op = Operation::SP(new GetOperation( - *this, - _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), - cmd, - getMetrics().gets[cmd->getLoadType()])); + _op = std::make_shared<GetOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), + cmd, getMetrics().gets[cmd->getLoadType()]); return true; } @@ -249,7 +221,7 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) return true; } auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = Operation::SP(new StatBucketOperation(*this, distributorBucketSpace, cmd)); + _op = std::make_shared<StatBucketOperation>(*this, distributorBucketSpace, cmd); return true; } @@ -261,16 +233,14 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList) auto bucketSpace(cmd->getBucket().getBucketSpace()); auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace)); auto &bucketDatabase(distributorBucketSpace.getBucketDatabase()); - _op = Operation::SP(new StatBucketListOperation( - bucketDatabase, _operationGenerator, getIndex(), cmd)); + _op = std::make_shared<StatBucketListOperation>(bucketDatabase, _operationGenerator, getIndex(), cmd); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) { const DistributorConfiguration& config(getDistributor().getConfig()); - VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), - config.getMaxVisitorsPerNodePerClientVisitor()); + VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); _op = Operation::SP(new VisitorOperation(*this, distributorBucketSpace, cmd, visitorConfig, getMetrics().visits[cmd->getLoadType()])); return true; diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h index 017979c16c0..626335e1ba6 100644 --- a/storage/src/vespa/storage/distributor/messagetracker.h +++ b/storage/src/vespa/storage/distributor/messagetracker.h @@ -16,8 +16,8 @@ class MessageTracker { public: class ToSend { public: - ToSend(const std::shared_ptr<api::BucketCommand>& msg, uint16_t target) : - _msg(msg), _target(target) {}; + ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) : + _msg(std::move(msg)), _target(target) {}; std::shared_ptr<api::BucketCommand> _msg; uint16_t _target; @@ -29,7 +29,7 @@ public: ~MessageTracker(); void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) { - _commandQueue.push_back(ToSend(msg, target)); + _commandQueue.emplace_back(std::move(msg), target); } void flushQueue(MessageSender& sender); diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 246020c191c..5b1e585a6f8 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -14,9 +14,7 @@ using document::BucketSpace; namespace storage::distributor { -GetOperation::GroupId::GroupId(const document::BucketId& id, - uint32_t checksum, - int node) +GetOperation::GroupId::GroupId(const document::BucketId& id, uint32_t checksum, int node) : _id(id), _checksum(checksum), _node(node) @@ -48,12 +46,12 @@ GetOperation::GroupId::operator==(const GroupId& other) const GetOperation::GetOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::GetCommand> & msg, + std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric) : Operation(), _manager(manager), _bucketSpace(bucketSpace), - _msg(msg), + _msg(std::move(msg)), _returnCode(api::ReturnCode::OK), _doc((document::Document*)NULL), _lastModified(0), @@ -66,8 +64,7 @@ GetOperation::GetOperation(DistributorComponent& manager, void GetOperation::onClose(DistributorMessageSender& sender) { - _returnCode = api::ReturnCode(api::ReturnCode::ABORTED, - "Process is shutting down"); + _returnCode = api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"); sendReply(sender); } @@ -96,30 +93,19 @@ GetOperation::findBestUnsentTarget(const GroupVector& candidates) const } bool -GetOperation::sendForChecksum(DistributorMessageSender& sender, - const document::BucketId& id, - GroupVector& res) +GetOperation::sendForChecksum(DistributorMessageSender& sender, const document::BucketId& id, GroupVector& res) { const int best = findBestUnsentTarget(res); if (best != -1) { document::Bucket bucket(_msg->getBucket().getBucketSpace(), id); - std::shared_ptr<api::GetCommand> command( - std::make_shared<api::GetCommand>( - bucket, - _msg->getDocumentId(), - _msg->getFieldSet(), - _msg->getBeforeTimestamp())); + auto command = std::make_shared<api::GetCommand>(bucket, _msg->getDocumentId(), + _msg->getFieldSet(), _msg->getBeforeTimestamp()); copyMessageSettings(*_msg, *command); - LOG(spam, - "Sending %s to node %d", - command->toString(true).c_str(), - res[best].copy.getNode()); + LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode()); - res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, - res[best].copy.getNode(), - command); + res[best].sent = sender.sendToNode(lib::NodeType::STORAGE, res[best].copy.getNode(), command); return true; } @@ -145,8 +131,7 @@ GetOperation::onStart(DistributorMessageSender& sender) }; void -GetOperation::onReceive(DistributorMessageSender& sender, - const std::shared_ptr<api::StorageReply>& msg) +GetOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { api::GetReply* getreply = dynamic_cast<api::GetReply*>(msg.get()); assert(getreply != nullptr); @@ -179,9 +164,7 @@ GetOperation::onReceive(DistributorMessageSender& sender, } // Try to send to another node in this checksum group. - bool sent = sendForChecksum(sender, - iter->first.getBucketId(), - iter->second); + bool sent = sendForChecksum(sender, iter->first.getBucketId(), iter->second); if (sent) { allDone = false; } @@ -197,8 +180,7 @@ GetOperation::onReceive(DistributorMessageSender& sender, if (allDone) { LOG(debug, "Get on %s done, returning reply %s", - _msg->getDocumentId().toString().c_str(), - _returnCode.toString().c_str()); + _msg->getDocumentId().toString().c_str(), _returnCode.toString().c_str()); sendReply(sender); } } @@ -207,8 +189,7 @@ void GetOperation::sendReply(DistributorMessageSender& sender) { if (_msg.get()) { - std::shared_ptr<api::GetReply> repl( - std::make_shared<api::GetReply>(*_msg, _doc, _lastModified)); + auto repl = std::make_shared<api::GetReply>(*_msg, _doc, _lastModified); repl->setResult(_returnCode); if (_returnCode.success()) { @@ -271,11 +252,9 @@ GetOperation::assignTargetNodeGroups() const BucketCopy& copy = e->getNodeRef(i); if (!copy.valid()) { - _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())]. - push_back(copy); + _responses[GroupId(e.getBucketId(), copy.getChecksum(), copy.getNode())].push_back(copy); } else if (!copy.empty()) { - _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)]. - push_back(copy); + _responses[GroupId(e.getBucketId(), copy.getChecksum(), -1)].push_back(copy); } } } diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 03279a87152..198c588dfd1 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -23,10 +23,8 @@ class DistributorBucketSpace; class GetOperation : public Operation { public: - GetOperation(DistributorComponent& manager, - DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::GetCommand> & msg, - PersistenceOperationMetricSet& metric); + GetOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, + std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric); void onClose(DistributorMessageSender& sender) override; void onStart(DistributorMessageSender& sender) override; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index d08afdacfad..2b1baa1e0d6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -3,7 +3,6 @@ #include "putoperation.h" #include <vespa/document/fieldvalue/document.h> -#include <vespa/log/log.h> #include <vespa/storage/distributor/activecopy.h> #include <vespa/storage/distributor/operationtargetresolverimpl.h> #include <vespa/storage/distributor/pendingmessagetracker.h> @@ -11,6 +10,7 @@ #include <vespa/vdslib/distribution/idealnodecalculatorimpl.h> #include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/log/log.h> LOG_SETUP(".distributor.callback.doc.put"); @@ -18,22 +18,17 @@ using namespace storage::distributor; using namespace storage; using document::BucketSpace; -PutOperation::PutOperation(DistributorComponent& manager, - DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::PutCommand> & msg, - PersistenceOperationMetricSet& metric, - SequencingHandle sequencingHandle) +PutOperation::PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, + std::shared_ptr<api::PutCommand> msg, + PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) : SequencedOperation(std::move(sequencingHandle)), - _trackerInstance(metric, - std::shared_ptr<api::BucketInfoReply>(new api::PutReply(*msg)), - manager, - msg->getTimestamp()), + _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), manager, msg->getTimestamp()), _tracker(_trackerInstance), - _msg(msg), + _msg(std::move(msg)), _manager(manager), _bucketSpace(bucketSpace) { -}; +} namespace { @@ -50,11 +45,8 @@ bool hasNode(const std::vector<uint16_t>& vec, uint16_t value) { } void -PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes, - std::vector<uint16_t>& targetNodes, - std::vector<uint16_t>& createNodes, - const BucketInfo& bucketInfo, - uint32_t redundancy) +PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes, std::vector<uint16_t>& targetNodes, + std::vector<uint16_t>& createNodes, const BucketInfo& bucketInfo, uint32_t redundancy) { // First insert all nodes that are trusted or already in the ideal state. for (uint32_t i = 0; i < bucketInfo.getNodeCount(); i++) { @@ -78,8 +70,7 @@ PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes, for (uint32_t i = 0; targetNodes.size() < redundancy && i < idealNodes.size(); i++) { if (!hasNode(targetNodes, idealNodes[i])) { targetNodes.push_back(idealNodes[i]); - LOG(spam, "Adding target+create node %u it's in ideal state", - idealNodes[i]); + LOG(spam, "Adding target+create node %u it's in ideal state", idealNodes[i]); createNodes.push_back(idealNodes[i]); } } @@ -88,100 +79,23 @@ PutOperation::getTargetNodes(const std::vector<uint16_t>& idealNodes, std::sort(createNodes.begin(), createNodes.end()); } -// FIXME: deprecated! remove as soon as multoperationoperation is merely -// a haunting memory of the past since it's only used by that component! -bool -PutOperation::checkCreateBucket(const lib::Distribution& dist, - const lib::ClusterState& state, - BucketDatabase::Entry& entry, - std::vector<uint16_t>& targetNodes, - std::vector<MessageTracker::ToSend>& messagesToSend, - const api::StorageCommand& originalCommand) -{ - BucketInfo& info = entry.getBucketInfo(); - - std::vector<uint16_t> createNodes; - std::vector<uint16_t> idealNodes( - dist.getIdealStorageNodes(state, entry.getBucketId(), "ui")); - - getTargetNodes(idealNodes, - targetNodes, - createNodes, - info, - dist.getRedundancy()); - - ActiveList active(ActiveCopy::calculate(idealNodes, dist, entry)); - LOG(debug, "Active copies for bucket %s: %s", - entry.getBucketId().toString().c_str(), active.toString().c_str()); - // Send create buckets for all nodes in ideal state where we don't - // currently have copies. - for (uint32_t i = 0; i < createNodes.size(); i++) { - document::Bucket bucket(originalCommand.getBucket().getBucketSpace(), entry.getBucketId()); - std::shared_ptr<api::CreateBucketCommand> cbc( - new api::CreateBucketCommand(bucket)); - if (active.contains(createNodes[i])) { - BucketCopy copy(*entry->getNode(createNodes[i])); - copy.setActive(true); - entry->updateNode(copy); - cbc->setActive(true); - } - LOG(debug, "Creating bucket on node %u: %s", - createNodes[i], cbc->toString().c_str()); - - copyMessageSettings(originalCommand, *cbc); - messagesToSend.push_back(MessageTracker::ToSend(cbc, createNodes[i])); - } - - // All nodes that we are not feeding to now will no longer be trusted. - // TODO: Refactor? - bool mustWrite = false; - for (uint32_t i = 0; i < info.getNodeCount(); i++) { - bool found = false; - for (uint32_t j = 0; j < targetNodes.size(); j++) { - if (info.getNodeRef(i).getNode() == targetNodes[j]) { - LOG(spam, - "Found matching target node %u in %s", - targetNodes[i], - info.getNodeRef(i).toString().c_str()); - found = true; - break; - } - } - - if (!found && info.getNodeRef(i).trusted()) { - LOG(spam, - "Setting mustWrite=true since %s is trusted", - info.getNodeRef(i).toString().c_str()); - - info.clearTrusted(info.getNodeRef(i).getNode()); - mustWrite = true; - } - } - - return mustWrite; -} - void -PutOperation::insertDatabaseEntryAndScheduleCreateBucket( - const OperationTargetList& copies, - bool setOneActive, - const api::StorageCommand& originalCommand, - std::vector<MessageTracker::ToSend>& messagesToSend) +PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive, + const api::StorageCommand& originalCommand, + std::vector<MessageTracker::ToSend>& messagesToSend) { document::BucketId lastBucket; bool multipleBuckets = false; for (uint32_t i=0, n=copies.size(); i<n; ++i) { if (!copies[i].isNewCopy()) continue; - if (lastBucket.getRawId() != 0 && copies[i].getBucketId() != lastBucket) - { + if (lastBucket.getRawId() != 0 && copies[i].getBucketId() != lastBucket) { multipleBuckets = true; } lastBucket = copies[i].getBucketId(); // Fake that we have a non-empty bucket so it isn't deleted. // Copy is inserted with timestamp 0 such that any actual bucket info // subsequently arriving from the storage node will always overwrite it. - BucketCopy copy(BucketCopy::recentlyCreatedCopy( - 0, copies[i].getNode().getIndex())); + BucketCopy copy(BucketCopy::recentlyCreatedCopy(0, copies[i].getNode().getIndex())); _manager.updateBucketDatabase(document::Bucket(originalCommand.getBucket().getBucketSpace(), lastBucket), copy, DatabaseUpdate::CREATE_IF_NONEXISTING); } @@ -189,17 +103,13 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( if (setOneActive) { assert(!multipleBuckets); (void) multipleBuckets; - BucketDatabase::Entry entry( - _bucketSpace.getBucketDatabase().get(lastBucket)); + BucketDatabase::Entry entry(_bucketSpace.getBucketDatabase().get(lastBucket)); std::vector<uint16_t> idealState( - _bucketSpace.getDistribution().getIdealStorageNodes( - _bucketSpace.getClusterState(), lastBucket, "ui")); - active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), - entry); - LOG(debug, "Active copies for bucket %s: %s", - entry.getBucketId().toString().c_str(), active.toString().c_str()); + _bucketSpace.getDistribution().getIdealStorageNodes(_bucketSpace.getClusterState(), lastBucket, "ui")); + active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry); + LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str()); for (uint32_t i=0; i<active.size(); ++i) { - BucketCopy copy(*entry->getNode(active[i].nodeIndex)); + BucketCopy copy(*entry->getNode(active[i]._nodeIndex)); copy.setActive(true); entry->updateNode(copy); } @@ -208,8 +118,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( for (uint32_t i=0, n=copies.size(); i<n; ++i) { if (!copies[i].isNewCopy()) continue; document::Bucket bucket(originalCommand.getBucket().getBucketSpace(), copies[i].getBucketId()); - std::shared_ptr<api::CreateBucketCommand> cbc( - new api::CreateBucketCommand(bucket)); + auto cbc = std::make_shared<api::CreateBucketCommand>(bucket); if (setOneActive && active.contains(copies[i].getNode().getIndex())) { cbc->setActive(true); } @@ -217,33 +126,22 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket( copies[i].getNode().getIndex(), cbc->toString().c_str()); copyMessageSettings(originalCommand, *cbc); - messagesToSend.push_back(MessageTracker::ToSend( - cbc, copies[i].getNode().getIndex())); + messagesToSend.emplace_back(std::move(cbc), copies[i].getNode().getIndex()); } } void -PutOperation::sendPutToBucketOnNode( - document::BucketSpace bucketSpace, - const document::BucketId& bucketId, - const uint16_t node, - std::vector<PersistenceMessageTracker::ToSend>& putBatch) +PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, + const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch) { document::Bucket bucket(bucketSpace, bucketId); - std::shared_ptr<api::PutCommand> command( - new api::PutCommand( - bucket, - _msg->getDocument(), - _msg->getTimestamp())); - LOG(debug, - "Sending %s to node %u", - command->toString().c_str(), - node); + auto command = std::make_shared<api::PutCommand>(bucket, _msg->getDocument(), _msg->getTimestamp()); + LOG(debug, "Sending %s to node %u", command->toString().c_str(), node); copyMessageSettings(*_msg, *command); command->setUpdateTimestamp(_msg->getUpdateTimestamp()); command->setCondition(_msg->getCondition()); - putBatch.push_back(MessageTracker::ToSend(command, node)); + putBatch.emplace_back(std::move(command), node); } @@ -253,10 +151,7 @@ PutOperation::onStart(DistributorMessageSender& sender) document::BucketIdFactory bucketIdFactory; document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId()); - LOG(debug, - "Received PUT %s for bucket %s", - _msg->getDocumentId().toString().c_str(), - bid.toString().c_str()); + LOG(debug, "Received PUT %s for bucket %s", _msg->getDocumentId().toString().c_str(), bid.toString().c_str()); lib::ClusterState systemState = _bucketSpace.getClusterState(); @@ -276,24 +171,19 @@ PutOperation::onStart(DistributorMessageSender& sender) lib::IdealNodeCalculatorImpl idealNodeCalculator; idealNodeCalculator.setDistribution(_bucketSpace.getDistribution()); idealNodeCalculator.setClusterState(_bucketSpace.getClusterState()); - OperationTargetResolverImpl targetResolver( - _bucketSpace.getBucketDatabase(), - idealNodeCalculator, + OperationTargetResolverImpl targetResolver(_bucketSpace.getBucketDatabase(), idealNodeCalculator, _manager.getDistributor().getConfig().getMinimalBucketSplit(), _bucketSpace.getDistribution().getRedundancy(), _msg->getBucket().getBucketSpace()); - OperationTargetList targets(targetResolver.getTargets( - OperationTargetResolver::PUT, bid)); + OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid)); for (size_t i = 0; i < targets.size(); ++i) { if (_manager.getDistributor().getPendingMessageTracker(). - hasPendingMessage(targets[i].getNode().getIndex(), - targets[i].getBucket(), + hasPendingMessage(targets[i].getNode().getIndex(), targets[i].getBucket(), api::MessageType::DELETEBUCKET_ID)) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, - "Bucket was being deleted while we got a PUT, failing " - "operation to be safe")); + "Bucket was being deleted while we got a PUT, failing operation to be safe")); return; } } @@ -304,11 +194,8 @@ PutOperation::onStart(DistributorMessageSender& sender) std::vector<PersistenceMessageTracker::ToSend> createBucketBatch; if (targets.hasAnyNewCopies()) { - insertDatabaseEntryAndScheduleCreateBucket( - targets, - shouldImplicitlyActivateReplica(targets), - *_msg, - createBucketBatch); + insertDatabaseEntryAndScheduleCreateBucket(targets, shouldImplicitlyActivateReplica(targets), + *_msg, createBucketBatch); } if (!createBucketBatch.empty()) { @@ -320,9 +207,8 @@ PutOperation::onStart(DistributorMessageSender& sender) // Now send PUTs for (uint32_t i = 0; i < targets.size(); i++) { const OperationTarget& target(targets[i]); - sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), - target.getBucketId(), target.getNode().getIndex(), - putBatch); + sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), target.getBucketId(), + target.getNode().getIndex(), putBatch); } if (putBatch.size()) { @@ -330,18 +216,15 @@ PutOperation::onStart(DistributorMessageSender& sender) } else { const char* error = "Can't store document: No storage nodes available"; LOG(debug, "%s", error); - _tracker.fail(sender, - api::ReturnCode(api::ReturnCode::NOT_CONNECTED, error)); + _tracker.fail(sender, api::ReturnCode(api::ReturnCode::NOT_CONNECTED, error)); return; } // Check whether buckets are large enough to be split. // TODO(vekterli): only check entries for sendToExisting? for (uint32_t i = 0; i < entries.size(); ++i) { - _manager.getDistributor().checkBucketForSplit( - _msg->getBucket().getBucketSpace(), - entries[i], - _msg->getPriority()); + _manager.getDistributor().checkBucketForSplit(_msg->getBucket().getBucketSpace(), + entries[i], _msg->getPriority()); } _tracker.flushQueue(sender); @@ -355,8 +238,7 @@ PutOperation::onStart(DistributorMessageSender& sender) } bool -PutOperation::shouldImplicitlyActivateReplica( - const OperationTargetList& targets) const +PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const { const auto& config(_manager.getDistributor().getConfig()); if (config.isBucketActivationDisabled()) { @@ -366,8 +248,7 @@ PutOperation::shouldImplicitlyActivateReplica( } void -PutOperation::onReceive(DistributorMessageSender& sender, - const std::shared_ptr<api::StorageReply> & msg) +PutOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg) { LOG(debug, "Received %s", msg->toString(true).c_str()); _tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg)); @@ -380,4 +261,3 @@ PutOperation::onClose(DistributorMessageSender& sender) LOG(debug, "%s", error); _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, error)); } - diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index c27f2ee2266..745a4f57a35 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -24,10 +24,8 @@ class OperationTargetList; class PutOperation : public SequencedOperation { public: - PutOperation(DistributorComponent& manager, - DistributorBucketSpace &bucketSpace, - const std::shared_ptr<api::PutCommand> & msg, - PersistenceOperationMetricSet& metric, + PutOperation(DistributorComponent& manager, DistributorBucketSpace &bucketSpace, + std::shared_ptr<api::PutCommand> msg, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle = SequencingHandle()); void onStart(DistributorMessageSender& sender) override; @@ -36,39 +34,18 @@ public: void onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply> &) override; void onClose(DistributorMessageSender& sender) override; - /** - * Gets the ideal state of the given bucket, and adds all nodes from the - * ideal state to targetNodes. Also schedules create bucket messages for - * all buckets currently not in the nodes list, and sets nodes in the node - * list not in the ideal state to untrusted. - */ - static bool checkCreateBucket(const lib::Distribution& distribution, - const lib::ClusterState& state, - BucketDatabase::Entry& e, - std::vector<uint16_t>& targetNodes, - std::vector<MessageTracker::ToSend>& messagesToSend, - const api::StorageCommand& originalCommand); - - static void getTargetNodes(const std::vector<uint16_t>& idealNodes, - std::vector<uint16_t>& targetNodes, - std::vector<uint16_t>& createNodes, - const BucketInfo& bucketInfo, - uint32_t redundancy); + static void getTargetNodes(const std::vector<uint16_t>& idealNodes, std::vector<uint16_t>& targetNodes, + std::vector<uint16_t>& createNodes, const BucketInfo& bucketInfo, uint32_t redundancy); private: PersistenceMessageTrackerImpl _trackerInstance; PersistenceMessageTracker& _tracker; - void insertDatabaseEntryAndScheduleCreateBucket( - const OperationTargetList& copies, - bool setOneActive, - const api::StorageCommand& originalCommand, - std::vector<MessageTracker::ToSend>& messagesToSend); + void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive, + const api::StorageCommand& originalCommand, + std::vector<MessageTracker::ToSend>& messagesToSend); - void sendPutToBucketOnNode( - document::BucketSpace bucketSpace, - const document::BucketId& bucketId, - const uint16_t node, - std::vector<PersistenceMessageTracker::ToSend>& putBatch); + void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, + const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch); bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const; diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index db120880267..0b0fc05763f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -51,9 +51,7 @@ struct IntermediateMessageSender : DistributorMessageSender { DistributorMessageSender& forward; std::shared_ptr<api::StorageReply> _reply; - IntermediateMessageSender(SentMessageMap& mm, - const std::shared_ptr<Operation>& cb, - DistributorMessageSender & fwd); + IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorMessageSender & fwd); ~IntermediateMessageSender(); void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override { @@ -79,14 +77,13 @@ struct IntermediateMessageSender : DistributorMessageSender { }; IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm, - const std::shared_ptr<Operation>& cb, + std::shared_ptr<Operation> cb, DistributorMessageSender & fwd) : msgMap(mm), - callback(cb), + callback(std::move(cb)), forward(fwd) -{ -} -IntermediateMessageSender::~IntermediateMessageSender() { } +{ } +IntermediateMessageSender::~IntermediateMessageSender() = default; } @@ -161,13 +158,10 @@ void TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) { _mode = Mode::FAST_PATH; - std::shared_ptr<UpdateOperation> updateOperation( - new UpdateOperation(_manager, _bucketSpace, _updateCmd, _updateMetric)); - - IntermediateMessageSender intermediate( - _sentMessageMap, updateOperation, sender); - updateOperation->start(intermediate, - _manager.getClock().getTimeInMillis()); + auto updateOperation = std::make_shared<UpdateOperation>(_manager, _bucketSpace, _updateCmd, _updateMetric); + UpdateOperation & op = *updateOperation; + IntermediateMessageSender intermediate(_sentMessageMap, std::move(updateOperation), sender); + op.start(intermediate, _manager.getClock().getTimeInMillis()); transitionTo(SendState::UPDATES_SENT); if (intermediate._reply.get()) { @@ -178,30 +172,21 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorMessageSender& sender) void TwoPhaseUpdateOperation::startSafePathUpdate(DistributorMessageSender& sender) { - LOG(debug, "Update(%s) safe path: sending Get commands", - _updateCmd->getDocumentId().toString().c_str()); + LOG(debug, "Update(%s) safe path: sending Get commands", _updateCmd->getDocumentId().toString().c_str()); _mode = Mode::SLOW_PATH; document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); - std::shared_ptr<api::GetCommand> get( - std::make_shared<api::GetCommand>( - bucket, - _updateCmd->getDocumentId(), - "[all]")); + auto get = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(),"[all]"); copyMessageSettings(*_updateCmd, *get); - std::shared_ptr<GetOperation> getOperation( - std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric)); - - IntermediateMessageSender intermediate( - _sentMessageMap, getOperation, sender); - getOperation->start(intermediate, - _manager.getClock().getTimeInMillis()); + auto getOperation = std::make_shared<GetOperation>(_manager, _bucketSpace, get, _getMetric); + GetOperation & op = *getOperation; + IntermediateMessageSender intermediate(_sentMessageMap, std::move(getOperation), sender); + op.start(intermediate, _manager.getClock().getTimeInMillis()); transitionTo(SendState::GETS_SENT); if (intermediate._reply.get()) { assert(intermediate._reply->getType() == api::MessageType::GET_REPLY); - handleSafePathReceivedGet( - sender, static_cast<api::GetReply&>(*intermediate._reply)); + handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply)); } } @@ -226,17 +211,14 @@ bool TwoPhaseUpdateOperation::lostBucketOwnershipBetweenPhases() const { document::Bucket updateDocBucket(_updateCmd->getBucket().getBucketSpace(), _updateDocBucketId); - BucketOwnership bo(_manager.checkOwnershipInPendingAndCurrentState( - updateDocBucket)); + BucketOwnership bo(_manager.checkOwnershipInPendingAndCurrentState(updateDocBucket)); return !bo.isOwned(); } void -TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply( - DistributorMessageSender& sender) +TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply(DistributorMessageSender& sender) { - sendReplyWithResult( - sender, + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, "Distributor lost ownership of bucket between " "executing the read and write phases of a two-" @@ -244,31 +226,24 @@ TwoPhaseUpdateOperation::sendLostOwnershipTransientErrorReply( } void -TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument( - std::shared_ptr<document::Document> doc, - api::Timestamp putTimestamp, - DistributorMessageSender& sender) +TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument(std::shared_ptr<document::Document> doc, + api::Timestamp putTimestamp, DistributorMessageSender& sender) { if (lostBucketOwnershipBetweenPhases()) { sendLostOwnershipTransientErrorReply(sender); return; } document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), document::BucketId(0)); - std::shared_ptr<api::PutCommand> put( - new api::PutCommand(bucket, doc, putTimestamp)); + auto put = std::make_shared<api::PutCommand>(bucket, doc, putTimestamp); copyMessageSettings(*_updateCmd, *put); - std::shared_ptr<PutOperation> putOperation( - new PutOperation(_manager, _bucketSpace, put, _putMetric)); - - IntermediateMessageSender intermediate( - _sentMessageMap, putOperation, sender); - putOperation->start(intermediate, - _manager.getClock().getTimeInMillis()); + auto putOperation = std::make_shared<PutOperation>(_manager, _bucketSpace, std::move(put), _putMetric); + PutOperation & op = *putOperation; + IntermediateMessageSender intermediate(_sentMessageMap, std::move(putOperation), sender); + op.start(intermediate, _manager.getClock().getTimeInMillis()); transitionTo(SendState::PUTS_SENT); LOG(debug, "Update(%s): sending Put commands with doc %s", - _updateCmd->getDocumentId().toString().c_str(), - doc->toString(true).c_str()); + _updateCmd->getDocumentId().toString().c_str(), doc->toString(true).c_str()); if (intermediate._reply.get()) { sendReplyWithResult(sender, intermediate._reply->getResult()); @@ -276,9 +251,7 @@ TwoPhaseUpdateOperation::schedulePutsWithUpdatedDocument( } void -TwoPhaseUpdateOperation::onReceive( - DistributorMessageSender& sender, - const std::shared_ptr<api::StorageReply>& msg) +TwoPhaseUpdateOperation::onReceive(DistributorMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { if (_mode == Mode::FAST_PATH) { handleFastPathReceive(sender, msg); @@ -288,9 +261,8 @@ TwoPhaseUpdateOperation::onReceive( } void -TwoPhaseUpdateOperation::handleFastPathReceive( - DistributorMessageSender& sender, - const std::shared_ptr<api::StorageReply>& msg) +TwoPhaseUpdateOperation::handleFastPathReceive(DistributorMessageSender& sender, + const std::shared_ptr<api::StorageReply>& msg) { if (msg->getType() == api::MessageType::GET_REPLY) { assert(_sendState == SendState::GETS_SENT); @@ -308,50 +280,40 @@ TwoPhaseUpdateOperation::handleFastPathReceive( if (!getReply.getDocument().get()) { // Weird, document is no longer there ... Just fail. - sendReplyWithResult(sender, api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, "")); + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "")); return; } - schedulePutsWithUpdatedDocument(getReply.getDocument(), - _manager.getUniqueTimestamp(), - sender); + schedulePutsWithUpdatedDocument(getReply.getDocument(), _manager.getUniqueTimestamp(), sender); return; } std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId()); assert(callback.get()); - IntermediateMessageSender intermediate(_sentMessageMap, callback, sender); - callback->receive(intermediate, msg); + Operation & callbackOp = *callback; + IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender); + callbackOp.receive(intermediate, msg); if (msg->getType() == api::MessageType::UPDATE_REPLY) { if (intermediate._reply.get()) { assert(_sendState == SendState::UPDATES_SENT); addTraceFromReply(*intermediate._reply); - UpdateOperation& cb = static_cast<UpdateOperation&> (*callback); + UpdateOperation& cb = static_cast<UpdateOperation&> (callbackOp); - std::pair<document::BucketId, uint16_t> bestNode = - cb.getNewestTimestampLocation(); + std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation(); if (!intermediate._reply->getResult().success() || bestNode.first == document::BucketId(0)) { // Failed or was consistent sendReply(sender, intermediate._reply); } else { - LOG(debug, "Update(%s) fast path: was inconsistent!", - _updateCmd->getDocumentId().toString().c_str()); + LOG(debug, "Update(%s) fast path: was inconsistent!", _updateCmd->getDocumentId().toString().c_str()); _updateReply = intermediate._reply; document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first); - std::shared_ptr<api::GetCommand> cmd( - new api::GetCommand(bucket, - _updateCmd->getDocumentId(), - "[all]")); + auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), "[all]"); copyMessageSettings(*_updateCmd, *cmd); - sender.sendToNode( - lib::NodeType::STORAGE, - bestNode.second, - cmd); + sender.sendToNode(lib::NodeType::STORAGE, bestNode.second, cmd); transitionTo(SendState::GETS_SENT); } } @@ -365,15 +327,15 @@ TwoPhaseUpdateOperation::handleFastPathReceive( } void -TwoPhaseUpdateOperation::handleSafePathReceive( - DistributorMessageSender& sender, - const std::shared_ptr<api::StorageReply>& msg) +TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender, + const std::shared_ptr<api::StorageReply>& msg) { std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId()); assert(callback.get()); + Operation & callbackOp = *callback; - IntermediateMessageSender intermediate(_sentMessageMap, callback, sender); - callback->receive(intermediate, msg); + IntermediateMessageSender intermediate(_sentMessageMap, std::move(callback), sender); + callbackOp.receive(intermediate, msg); if (!intermediate._reply.get()) { return; // Not enough replies received yet or we're draining callbacks. @@ -381,21 +343,17 @@ TwoPhaseUpdateOperation::handleSafePathReceive( addTraceFromReply(*intermediate._reply); if (_sendState == SendState::GETS_SENT) { assert(intermediate._reply->getType() == api::MessageType::GET_REPLY); - handleSafePathReceivedGet( - sender, static_cast<api::GetReply&>(*intermediate._reply)); + handleSafePathReceivedGet(sender, static_cast<api::GetReply&>(*intermediate._reply)); } else if (_sendState == SendState::PUTS_SENT) { assert(intermediate._reply->getType() == api::MessageType::PUT_REPLY); - handleSafePathReceivedPut( - sender, static_cast<api::PutReply&>(*intermediate._reply)); + handleSafePathReceivedPut(sender, static_cast<api::PutReply&>(*intermediate._reply)); } else { assert(!"Unknown state"); } } void -TwoPhaseUpdateOperation::handleSafePathReceivedGet( - DistributorMessageSender& sender, - api::GetReply& reply) +TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorMessageSender& sender, api::GetReply& reply) { LOG(debug, "Update(%s): got Get reply with code %s", _updateCmd->getDocumentId().toString().c_str(), @@ -411,9 +369,8 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet( if (reply.getDocument().get()) { api::Timestamp receivedTimestamp = reply.getLastModifiedTimestamp(); if (!satisfiesUpdateTimestampConstraint(receivedTimestamp)) { - sendReplyWithResult(sender, api::ReturnCode( - api::ReturnCode::OK, - "No document with requested timestamp found")); + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::OK, + "No document with requested timestamp found")); return; } if (!processAndMatchTasCondition(sender, *reply.getDocument())) { @@ -425,9 +382,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet( replyWithTasFailure(sender, "Document did not exist"); return; } else if (shouldCreateIfNonExistent()) { - LOG(debug, - "No existing documents found for %s, creating blank " - "document to update", + LOG(debug, "No existing documents found for %s, creating blank document to update", _updateCmd->getUpdate()->getId().toString().c_str()); docToUpdate = createBlankDocument(); setUpdatedForTimestamp(putTimestamp); @@ -439,22 +394,19 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet( applyUpdateToDocument(*docToUpdate); schedulePutsWithUpdatedDocument(docToUpdate, putTimestamp, sender); } catch (vespalib::Exception& e) { - sendReplyWithResult(sender, api::ReturnCode( - api::ReturnCode::INTERNAL_FAILURE, e.getMessage())); + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, e.getMessage())); } } bool -TwoPhaseUpdateOperation::processAndMatchTasCondition( - DistributorMessageSender& sender, - const document::Document& candidateDoc) +TwoPhaseUpdateOperation::processAndMatchTasCondition(DistributorMessageSender& sender, + const document::Document& candidateDoc) { if (!hasTasCondition()) { return true; // No condition; nothing to do here. } - document::select::Parser parser(*_manager.getTypeRepo(), - _manager.getBucketIdFactory()); + document::select::Parser parser(*_manager.getTypeRepo(), _manager.getBucketIdFactory()); std::unique_ptr<document::select::Node> selection; try { selection = parser.parse(_updateCmd->getCondition().getSelection()); @@ -479,12 +431,9 @@ TwoPhaseUpdateOperation::hasTasCondition() const noexcept } void -TwoPhaseUpdateOperation::replyWithTasFailure( - DistributorMessageSender& sender, - vespalib::stringref message) +TwoPhaseUpdateOperation::replyWithTasFailure(DistributorMessageSender& sender, vespalib::stringref message) { - sendReplyWithResult(sender, api::ReturnCode( - api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, message)); + sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, message)); } void @@ -502,9 +451,7 @@ TwoPhaseUpdateOperation::createBlankDocument() const } void -TwoPhaseUpdateOperation::handleSafePathReceivedPut( - DistributorMessageSender& sender, - const api::PutReply& reply) +TwoPhaseUpdateOperation::handleSafePathReceivedPut(DistributorMessageSender& sender, const api::PutReply& reply) { sendReplyWithResult(sender, reply.getResult()); } @@ -522,11 +469,9 @@ TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const } bool -TwoPhaseUpdateOperation::satisfiesUpdateTimestampConstraint( - api::Timestamp ts) const +TwoPhaseUpdateOperation::satisfiesUpdateTimestampConstraint(api::Timestamp ts) const { - return (_updateCmd->getOldTimestamp() == 0 - || _updateCmd->getOldTimestamp() == ts); + return (_updateCmd->getOldTimestamp() == 0 || _updateCmd->getOldTimestamp() == ts); } void @@ -540,11 +485,8 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) { while (true) { std::shared_ptr<Operation> cb = _sentMessageMap.pop(); - if (cb.get()) { - IntermediateMessageSender intermediate( - _sentMessageMap, - std::shared_ptr<Operation > (), - sender); + if (cb) { + IntermediateMessageSender intermediate(_sentMessageMap, std::shared_ptr<Operation > (), sender); cb->onClose(intermediate); // We will _only_ forward UpdateReply instances up, since those // are created by UpdateOperation and are bound to the original @@ -552,9 +494,7 @@ TwoPhaseUpdateOperation::onClose(DistributorMessageSender& sender) { // to synthetic commands created for gets/puts and should never be // propagated to the outside world. auto candidateReply = std::move(intermediate._reply); - if (candidateReply - && candidateReply->getType() == api::MessageType::UPDATE_REPLY) - { + if (candidateReply && candidateReply->getType() == api::MessageType::UPDATE_REPLY) { assert(_mode == Mode::FAST_PATH); sendReply(sender, candidateReply); // Sets _replySent } diff --git a/storage/src/vespa/storage/distributor/statecheckers.cpp b/storage/src/vespa/storage/distributor/statecheckers.cpp index cafac5b67a5..0a0abb8d417 100644 --- a/storage/src/vespa/storage/distributor/statecheckers.cpp +++ b/storage/src/vespa/storage/distributor/statecheckers.cpp @@ -1003,7 +1003,7 @@ BucketStateStateChecker::shouldSkipActivationDueToMaintenance( const StateChecker::Context& c) const { for (uint32_t i = 0; i < activeNodes.size(); ++i) { - const BucketCopy* cp(c.entry->getNode(activeNodes[i].nodeIndex)); + const BucketCopy* cp(c.entry->getNode(activeNodes[i]._nodeIndex)); if (!cp || cp->active()) { continue; } @@ -1050,12 +1050,12 @@ BucketStateStateChecker::check(StateChecker::Context& c) vespalib::asciistream reason; std::vector<uint16_t> operationNodes; for (uint32_t i=0; i<activeNodes.size(); ++i) { - const BucketCopy* cp = c.entry->getNode(activeNodes[i].nodeIndex); + const BucketCopy* cp = c.entry->getNode(activeNodes[i]._nodeIndex); if (cp == 0 || cp->active()) { continue; } - operationNodes.push_back(activeNodes[i].nodeIndex); - reason << "[Setting node " << activeNodes[i].nodeIndex << " as active: " + operationNodes.push_back(activeNodes[i]._nodeIndex); + reason << "[Setting node " << activeNodes[i]._nodeIndex << " as active: " << activeNodes[i].getReason() << "]"; } @@ -1067,7 +1067,7 @@ BucketStateStateChecker::check(StateChecker::Context& c) } bool shouldBeActive = false; for (uint32_t j=0; j<activeNodes.size(); ++j) { - if (activeNodes[j].nodeIndex == cp.getNode()) { + if (activeNodes[j]._nodeIndex == cp.getNode()) { shouldBeActive = true; } } @@ -1083,7 +1083,7 @@ BucketStateStateChecker::check(StateChecker::Context& c) std::vector<uint16_t> activeNodeIndexes; for (uint32_t i=0; i<activeNodes.size(); ++i) { - activeNodeIndexes.push_back(activeNodes[i].nodeIndex); + activeNodeIndexes.push_back(activeNodes[i]._nodeIndex); } auto op = std::make_unique<SetBucketStateOperation>( c.component.getClusterName(), @@ -1094,11 +1094,9 @@ BucketStateStateChecker::check(StateChecker::Context& c) // we currently always send high pri activations. // Otherwise, only > 1 operationNodes if we have copies to deactivate. if (activeNodes.size() > 1 || operationNodes.size() == 1) { - op->setPriority(c.distributorConfig.getMaintenancePriorities() - .activateNoExistingActive); + op->setPriority(c.distributorConfig.getMaintenancePriorities().activateNoExistingActive); } else { - op->setPriority(c.distributorConfig.getMaintenancePriorities() - .activateWithExistingActive); + op->setPriority(c.distributorConfig.getMaintenancePriorities().activateWithExistingActive); } op->setDetailedReason(reason.str()); return Result::createStoredResult(std::move(op), MaintenancePriority::VERY_HIGH); @@ -1137,8 +1135,7 @@ GarbageCollectionStateChecker::check(Context& c) << ", configured interval " << c.distributorConfig.getGarbageCollectionInterval() << "]"; - op->setPriority(c.distributorConfig.getMaintenancePriorities() - .garbageCollection); + op->setPriority(c.distributorConfig.getMaintenancePriorities().garbageCollection); op->setDetailedReason(reason.c_str()); return Result::createStoredResult(std::move(op), MaintenancePriority::VERY_LOW); } else { |