diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-08-09 22:29:01 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2023-08-09 22:29:01 +0000 |
commit | 465b85e3855897d39faa27511033500307c46a41 (patch) | |
tree | cd4535b4cf790dd28e9d2c447c4e15e37cb01d6d | |
parent | 8e9c8cea1f11ef7f07ef4a084b170f180b662e2d (diff) |
Move shared_ptrs where possible and reserve space in the vectors once.
13 files changed, 47 insertions, 59 deletions
diff --git a/storage/src/vespa/storage/distributor/messagetracker.cpp b/storage/src/vespa/storage/distributor/messagetracker.cpp index a9c1a56212c..28fbaad4619 100644 --- a/storage/src/vespa/storage/distributor/messagetracker.cpp +++ b/storage/src/vespa/storage/distributor/messagetracker.cpp @@ -33,21 +33,14 @@ MessageTracker::flushQueue(MessageSender& sender) uint16_t MessageTracker::handleReply(api::BucketReply& reply) { - std::map<uint64_t, uint16_t>::iterator found = _sentMessages.find(reply.getMsgId()); - if (found == _sentMessages.end()) { + const auto found = _sentMessages.find(reply.getMsgId()); + if (found == _sentMessages.end()) [[unlikely]] { LOG(warning, "Received reply %" PRIu64 " for callback which we have no recollection of", reply.getMsgId()); return (uint16_t)-1; - } else { - uint16_t node = found->second; - _sentMessages.erase(found); - return node; } -} - -bool -MessageTracker::finished() -{ - return _sentMessages.empty(); + uint16_t node = found->second; + _sentMessages.erase(found); + return node; } } diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h index 07afc37ffa2..51d29551de0 100644 --- a/storage/src/vespa/storage/distributor/messagetracker.h +++ b/storage/src/vespa/storage/distributor/messagetracker.h @@ -17,8 +17,9 @@ class MessageTracker { public: class ToSend { public: - ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) noexcept : - _msg(std::move(msg)), _target(target) {}; + ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) noexcept + : _msg(std::move(msg)), _target(target) + {} std::shared_ptr<api::BucketCommand> _msg; uint16_t _target; @@ -34,6 +35,9 @@ public: void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) { _commandQueue.emplace_back(std::move(msg), target); } + void reserve_more_commands(size_t sz) { + _commandQueue.reserve(_commandQueue.size() + sz); + } void flushQueue(MessageSender& sender); @@ -45,7 +49,9 @@ public: /** Returns true if all messages sent have been received. */ - bool finished(); + bool finished() const noexcept { + return _sentMessages.empty(); + } protected: std::vector<ToSend> _commandQueue; diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 8c6fdb314f3..6e5a06591fd 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -211,11 +211,11 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen } if (!createBucketBatch.empty()) { - _tracker.queueMessageBatch(createBucketBatch); + _tracker.queueMessageBatch(std::move(createBucketBatch)); } std::vector<PersistenceMessageTracker::ToSend> putBatch; - + putBatch.reserve(targets.size()); // Now send PUTs for (const auto& target : targets) { sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), target.getBucketId(), @@ -223,7 +223,7 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen } if (!putBatch.empty()) { - _tracker.queueMessageBatch(putBatch); + _tracker.queueMessageBatch(std::move(putBatch)); } else { const char* error = "Can't store document: No storage nodes available"; LOG(debug, "%s", error); diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp index dd6e1e93791..5f52a8208fc 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp @@ -79,13 +79,11 @@ RemoveLocationOperation::onStart(DistributorStripeMessageSender& sender) std::vector<uint16_t> nodes = e->getNodes(); for (uint32_t i = 0; i < nodes.size(); i++) { - std::shared_ptr<api::RemoveLocationCommand> command( - new api::RemoveLocationCommand( - _msg->getDocumentSelection(), - document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()))); + auto command = std::make_shared<api::RemoveLocationCommand>(_msg->getDocumentSelection(), + document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId())); copyMessageSettings(*_msg, *command); - _tracker.queueCommand(command, nodes[i]); + _tracker.queueCommand(std::move(command), nodes[i]); sent = true; } } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 96182b0744f..42d8e318f47 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -65,9 +65,7 @@ void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& s void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSender& sender) { LOG(spam, "Started remove on document %s", _msg->getDocumentId().toString().c_str()); - document::BucketId bucketId( - _node_ctx.bucket_id_factory().getBucketId( - _msg->getDocumentId())); + document::BucketId bucketId(_node_ctx.bucket_id_factory().getBucketId(_msg->getDocumentId())); std::vector<BucketDatabase::Entry> entries; _bucket_space.getBucketDatabase().getParents(bucketId, entries); @@ -79,8 +77,7 @@ void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSende messages.reserve(e->getNodeCount()); for (uint32_t i = 0; i < e->getNodeCount(); i++) { auto command = std::make_shared<api::RemoveCommand>(document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()), - _msg->getDocumentId(), - _msg->getTimestamp()); + _msg->getDocumentId(), _msg->getTimestamp()); copyMessageSettings(*_msg, *command); command->getTrace().setLevel(_msg->getTrace().getLevel()); @@ -90,7 +87,7 @@ void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSende sent = true; } - _tracker.queueMessageBatch(messages); + _tracker.queueMessageBatch(std::move(messages)); } if (!sent) { diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 8988f2589ce..f43a6092372 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -106,19 +106,18 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) const std::vector<uint16_t>& nodes = entry->getNodes(); std::vector<MessageTracker::ToSend> messages; + messages.reserve(nodes.size()); for (uint16_t node : nodes) { - auto command = std::make_shared<api::UpdateCommand>( - document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()), - _msg->getUpdate(), - _msg->getTimestamp()); + auto command = std::make_shared<api::UpdateCommand>(document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()), + _msg->getUpdate(), _msg->getTimestamp()); copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); messages.emplace_back(std::move(command), node); } - _tracker.queueMessageBatch(messages); + _tracker.queueMessageBatch(std::move(messages)); } _tracker.flushQueue(sender); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 5599f9fb51e..2e6d0e95ec9 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -87,7 +87,7 @@ void GarbageCollectionOperation::send_current_phase_remove_locations(Distributor command->setPriority((_phase != Phase::WriteRemovesPhase) ? _priority : _manager->operation_context().distributor_config().default_external_feed_priority()); - _tracker.queueCommand(command, nodes[i]); + _tracker.queueCommand(std::move(command), nodes[i]); } _tracker.flushQueue(sender); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 0e9873f3434..616c4962dca 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -81,12 +81,11 @@ JoinOperation::enqueueJoinMessagePerTargetNode( return false; } for (const auto& node : nodeToBuckets) { - std::shared_ptr<api::JoinBucketsCommand> msg( - new api::JoinBucketsCommand(getBucket())); + auto msg = std::make_shared<api::JoinBucketsCommand>(getBucket()); msg->getSourceBuckets() = node.second; msg->setTimeout(MAX_TIMEOUT); setCommandMeta(*msg); - _tracker.queueCommand(msg, node.first); + _tracker.queueCommand(std::move(msg), node.first); } return true; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index e46ccebffba..7bec6bbe53a 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -24,16 +24,11 @@ RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender) uint16_t node = getNodes()[i]; const BucketCopy* copy(entry->getNode(node)); if (!copy) { - LOG(debug, "Node %u was removed between scheduling remove " - "operation and starting it; not sending DeleteBucket to it", - node); + LOG(debug, "Node %u was removed between scheduling remove operation and starting it; not sending DeleteBucket to it", node); continue; } - LOG(debug, "Sending DeleteBucket for %s to node %u", - getBucketId().toString().c_str(), - node); - std::shared_ptr<api::DeleteBucketCommand> msg( - new api::DeleteBucketCommand(getBucket())); + LOG(debug, "Sending DeleteBucket for %s to node %u", getBucketId().toString().c_str(), node); + auto msg = std::make_shared<api::DeleteBucketCommand>(getBucket()); setCommandMeta(*msg); msg->setBucketInfo(copy->getBucketInfo()); msgs.push_back(std::make_pair(node, msg)); @@ -42,8 +37,8 @@ RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender) _ok = true; if (!getNodes().empty()) { _manager->operation_context().remove_nodes_from_bucket_database(getBucket(), getNodes()); - for (uint32_t i = 0; i < msgs.size(); ++i) { - _tracker.queueCommand(msgs[i].second, msgs[i].first); + for (auto & msg : msgs) { + _tracker.queueCommand(std::move(msg.second), msg.first); } _tracker.flushQueue(sender); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp index 00906d22ea4..9547bee6583 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp @@ -26,11 +26,9 @@ SetBucketStateOperation::enqueueSetBucketStateCommand(uint16_t node, bool active active ? api::SetBucketStateCommand::ACTIVE : api::SetBucketStateCommand::INACTIVE); - LOG(debug, "Enqueuing %s for %s to node %u", - active ? "Activate" : "Deactivate", - getBucketId().toString().c_str(), node); + LOG(debug, "Enqueuing %s for %s to node %u", active ? "Activate" : "Deactivate", getBucketId().toString().c_str(), node); setCommandMeta(*msg); - _tracker.queueCommand(msg, node); + _tracker.queueCommand(std::move(msg), node); } bool diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 8e64fb227a7..d704a42e96b 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -35,7 +35,7 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender) msg->setMinByteSize(_splitSize); msg->setTimeout(MAX_TIMEOUT); setCommandMeta(*msg); - _tracker.queueCommand(msg, entry->getNodeRef(i).getNode()); + _tracker.queueCommand(std::move(msg), entry->getNodeRef(i).getNode()); _ok = true; } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index a30663bde2f..fe384a68d72 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -101,15 +101,18 @@ PersistenceMessageTrackerImpl::revert( } void -PersistenceMessageTrackerImpl::queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) { +PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) { _messageBatches.emplace_back(); - for (const auto & message : messages) { + auto & batch = _messageBatches.back(); + batch.reserve(messages.size()); + reserve_more_commands(messages.size()); + for (auto & message : messages) { if (_reply) { message._msg->getTrace().setLevel(_reply->getTrace().getLevel()); } - _messageBatches.back().push_back(message._msg->getMsgId()); - queueCommand(message._msg, message._target); + batch.push_back(message._msg->getMsgId()); + queueCommand(std::move(message._msg), message._target); } } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 923ecf45649..ecc4732696b 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -16,7 +16,7 @@ struct PersistenceMessageTracker { using ToSend = MessageTracker::ToSend; virtual void fail(MessageSender&, const api::ReturnCode&) = 0; - virtual void queueMessageBatch(const std::vector<ToSend>&) = 0; + virtual void queueMessageBatch(std::vector<ToSend> messages) = 0; virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0; virtual std::shared_ptr<api::BucketInfoReply>& getReply() = 0; virtual void updateFromReply(MessageSender&, api::BucketInfoReply&, uint16_t node) = 0; @@ -65,7 +65,7 @@ public: have at most (messages.size() - initial redundancy) messages left in the queue and have it's first message be done. */ - void queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) override; + void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override; private: using MessageBatch = std::vector<uint64_t>; |