diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-08-10 10:57:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-10 10:57:54 +0200 |
commit | 16ade10496858b0046e45d44052170be381e503f (patch) | |
tree | 37920210efcda4acc590f6f7be676985bbdfe540 | |
parent | 2e58bbfce1044058a098e7ec7430c126cd0a01ee (diff) | |
parent | 465b85e3855897d39faa27511033500307c46a41 (diff) |
Merge pull request #28008 from vespa-engine/balder/use-hash_map-for-message-tracking
Use faster and lighter hash_map for message tracking in distributor.
13 files changed, 57 insertions, 68 deletions
diff --git a/storage/src/vespa/storage/distributor/messagetracker.cpp b/storage/src/vespa/storage/distributor/messagetracker.cpp index 8830e5ecabc..28fbaad4619 100644 --- a/storage/src/vespa/storage/distributor/messagetracker.cpp +++ b/storage/src/vespa/storage/distributor/messagetracker.cpp @@ -3,6 +3,7 @@ #include "messagetracker.h" #include <vespa/storageapi/messageapi/bucketcommand.h> #include <vespa/storageapi/messageapi/bucketreply.h> +#include <vespa/vespalib/stllike/hash_map.hpp> #include <cinttypes> #include <vespa/log/log.h> @@ -19,10 +20,11 @@ MessageTracker::~MessageTracker() = default; void MessageTracker::flushQueue(MessageSender& sender) { - for (uint32_t i = 0; i < _commandQueue.size(); i++) { - _commandQueue[i]._msg->setAddress(api::StorageMessageAddress::create(_cluster_ctx.cluster_name_ptr(), lib::NodeType::STORAGE, _commandQueue[i]._target)); - _sentMessages[_commandQueue[i]._msg->getMsgId()] = _commandQueue[i]._target; - sender.sendCommand(_commandQueue[i]._msg); + _sentMessages.resize(_commandQueue.size()); + for (const auto & toSend : _commandQueue) { + toSend._msg->setAddress(api::StorageMessageAddress::create(_cluster_ctx.cluster_name_ptr(), lib::NodeType::STORAGE, toSend._target)); + _sentMessages[toSend._msg->getMsgId()] = toSend._target; + sender.sendCommand(toSend._msg); } _commandQueue.clear(); @@ -31,21 +33,14 @@ MessageTracker::flushQueue(MessageSender& sender) uint16_t MessageTracker::handleReply(api::BucketReply& reply) { - std::map<uint64_t, uint16_t>::iterator found = _sentMessages.find(reply.getMsgId()); - if (found == _sentMessages.end()) { + const auto found = _sentMessages.find(reply.getMsgId()); + if (found == _sentMessages.end()) [[unlikely]] { LOG(warning, "Received reply %" PRIu64 " for callback which we have no recollection of", reply.getMsgId()); return (uint16_t)-1; - } else { - uint16_t node = found->second; - _sentMessages.erase(found); - return node; } -} - -bool -MessageTracker::finished() -{ - return _sentMessages.empty(); + uint16_t node = found->second; + _sentMessages.erase(found); + return node; } } diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h index 73e2461eb7a..51d29551de0 100644 --- a/storage/src/vespa/storage/distributor/messagetracker.h +++ b/storage/src/vespa/storage/distributor/messagetracker.h @@ -4,8 +4,7 @@ #include <vespa/storage/common/cluster_context.h> #include <vespa/storage/common/messagesender.h> #include <vespa/vespalib/stllike/string.h> -#include <vector> -#include <map> +#include <vespa/vespalib/stllike/hash_map.h> namespace storage::api { class BucketCommand; @@ -18,8 +17,9 @@ class MessageTracker { public: class ToSend { public: - ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) noexcept : - _msg(std::move(msg)), _target(target) {}; + ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) noexcept + : _msg(std::move(msg)), _target(target) + {} std::shared_ptr<api::BucketCommand> _msg; uint16_t _target; @@ -35,6 +35,9 @@ public: void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) { _commandQueue.emplace_back(std::move(msg), target); } + void reserve_more_commands(size_t sz) { + _commandQueue.reserve(_commandQueue.size() + sz); + } void flushQueue(MessageSender& sender); @@ -46,13 +49,15 @@ public: /** Returns true if all messages sent have been received. */ - bool finished(); + bool finished() const noexcept { + return _sentMessages.empty(); + } protected: - std::vector<ToSend> _commandQueue; + std::vector<ToSend> _commandQueue; // Keeps track of which node a message was sent to. - std::map<uint64_t, uint16_t> _sentMessages; - const ClusterContext& _cluster_ctx; + vespalib::hash_map<uint64_t, uint16_t> _sentMessages; + const ClusterContext& _cluster_ctx; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 324cb3691e4..f76b6495443 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -210,11 +210,11 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen } if (!createBucketBatch.empty()) { - _tracker.queueMessageBatch(createBucketBatch); + _tracker.queueMessageBatch(std::move(createBucketBatch)); } std::vector<PersistenceMessageTracker::ToSend> putBatch; - + putBatch.reserve(targets.size()); // Now send PUTs for (const auto& target : targets) { sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), target.getBucketId(), @@ -222,7 +222,7 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen } if (!putBatch.empty()) { - _tracker.queueMessageBatch(putBatch); + _tracker.queueMessageBatch(std::move(putBatch)); } else { const char* error = "Can't store document: No storage nodes available"; LOG(debug, "%s", error); diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp index dd6e1e93791..5f52a8208fc 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp @@ -79,13 +79,11 @@ RemoveLocationOperation::onStart(DistributorStripeMessageSender& sender) std::vector<uint16_t> nodes = e->getNodes(); for (uint32_t i = 0; i < nodes.size(); i++) { - std::shared_ptr<api::RemoveLocationCommand> command( - new api::RemoveLocationCommand( - _msg->getDocumentSelection(), - document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()))); + auto command = std::make_shared<api::RemoveLocationCommand>(_msg->getDocumentSelection(), + document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId())); copyMessageSettings(*_msg, *command); - _tracker.queueCommand(command, nodes[i]); + _tracker.queueCommand(std::move(command), nodes[i]); sent = true; } } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index 96182b0744f..42d8e318f47 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -65,9 +65,7 @@ void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& s void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSender& sender) { LOG(spam, "Started remove on document %s", _msg->getDocumentId().toString().c_str()); - document::BucketId bucketId( - _node_ctx.bucket_id_factory().getBucketId( - _msg->getDocumentId())); + document::BucketId bucketId(_node_ctx.bucket_id_factory().getBucketId(_msg->getDocumentId())); std::vector<BucketDatabase::Entry> entries; _bucket_space.getBucketDatabase().getParents(bucketId, entries); @@ -79,8 +77,7 @@ void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSende messages.reserve(e->getNodeCount()); for (uint32_t i = 0; i < e->getNodeCount(); i++) { auto command = std::make_shared<api::RemoveCommand>(document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()), - _msg->getDocumentId(), - _msg->getTimestamp()); + _msg->getDocumentId(), _msg->getTimestamp()); copyMessageSettings(*_msg, *command); command->getTrace().setLevel(_msg->getTrace().getLevel()); @@ -90,7 +87,7 @@ void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSende sent = true; } - _tracker.queueMessageBatch(messages); + _tracker.queueMessageBatch(std::move(messages)); } if (!sent) { diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 8988f2589ce..f43a6092372 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -106,19 +106,18 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) const std::vector<uint16_t>& nodes = entry->getNodes(); std::vector<MessageTracker::ToSend> messages; + messages.reserve(nodes.size()); for (uint16_t node : nodes) { - auto command = std::make_shared<api::UpdateCommand>( - document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()), - _msg->getUpdate(), - _msg->getTimestamp()); + auto command = std::make_shared<api::UpdateCommand>(document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()), + _msg->getUpdate(), _msg->getTimestamp()); copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); messages.emplace_back(std::move(command), node); } - _tracker.queueMessageBatch(messages); + _tracker.queueMessageBatch(std::move(messages)); } _tracker.flushQueue(sender); diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp index 5599f9fb51e..2e6d0e95ec9 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp @@ -87,7 +87,7 @@ void GarbageCollectionOperation::send_current_phase_remove_locations(Distributor command->setPriority((_phase != Phase::WriteRemovesPhase) ? _priority : _manager->operation_context().distributor_config().default_external_feed_priority()); - _tracker.queueCommand(command, nodes[i]); + _tracker.queueCommand(std::move(command), nodes[i]); } _tracker.flushQueue(sender); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp index 0e9873f3434..616c4962dca 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp @@ -81,12 +81,11 @@ JoinOperation::enqueueJoinMessagePerTargetNode( return false; } for (const auto& node : nodeToBuckets) { - std::shared_ptr<api::JoinBucketsCommand> msg( - new api::JoinBucketsCommand(getBucket())); + auto msg = std::make_shared<api::JoinBucketsCommand>(getBucket()); msg->getSourceBuckets() = node.second; msg->setTimeout(MAX_TIMEOUT); setCommandMeta(*msg); - _tracker.queueCommand(msg, node.first); + _tracker.queueCommand(std::move(msg), node.first); } return true; } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp index e46ccebffba..7bec6bbe53a 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp @@ -24,16 +24,11 @@ RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender) uint16_t node = getNodes()[i]; const BucketCopy* copy(entry->getNode(node)); if (!copy) { - LOG(debug, "Node %u was removed between scheduling remove " - "operation and starting it; not sending DeleteBucket to it", - node); + LOG(debug, "Node %u was removed between scheduling remove operation and starting it; not sending DeleteBucket to it", node); continue; } - LOG(debug, "Sending DeleteBucket for %s to node %u", - getBucketId().toString().c_str(), - node); - std::shared_ptr<api::DeleteBucketCommand> msg( - new api::DeleteBucketCommand(getBucket())); + LOG(debug, "Sending DeleteBucket for %s to node %u", getBucketId().toString().c_str(), node); + auto msg = std::make_shared<api::DeleteBucketCommand>(getBucket()); setCommandMeta(*msg); msg->setBucketInfo(copy->getBucketInfo()); msgs.push_back(std::make_pair(node, msg)); @@ -42,8 +37,8 @@ RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender) _ok = true; if (!getNodes().empty()) { _manager->operation_context().remove_nodes_from_bucket_database(getBucket(), getNodes()); - for (uint32_t i = 0; i < msgs.size(); ++i) { - _tracker.queueCommand(msgs[i].second, msgs[i].first); + for (auto & msg : msgs) { + _tracker.queueCommand(std::move(msg.second), msg.first); } _tracker.flushQueue(sender); } diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp index 00906d22ea4..9547bee6583 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp @@ -26,11 +26,9 @@ SetBucketStateOperation::enqueueSetBucketStateCommand(uint16_t node, bool active active ? api::SetBucketStateCommand::ACTIVE : api::SetBucketStateCommand::INACTIVE); - LOG(debug, "Enqueuing %s for %s to node %u", - active ? "Activate" : "Deactivate", - getBucketId().toString().c_str(), node); + LOG(debug, "Enqueuing %s for %s to node %u", active ? "Activate" : "Deactivate", getBucketId().toString().c_str(), node); setCommandMeta(*msg); - _tracker.queueCommand(msg, node); + _tracker.queueCommand(std::move(msg), node); } bool diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp index 8e64fb227a7..d704a42e96b 100644 --- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp @@ -35,7 +35,7 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender) msg->setMinByteSize(_splitSize); msg->setTimeout(MAX_TIMEOUT); setCommandMeta(*msg); - _tracker.queueCommand(msg, entry->getNodeRef(i).getNode()); + _tracker.queueCommand(std::move(msg), entry->getNodeRef(i).getNode()); _ok = true; } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index a30663bde2f..fe384a68d72 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -101,15 +101,18 @@ PersistenceMessageTrackerImpl::revert( } void -PersistenceMessageTrackerImpl::queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) { +PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) { _messageBatches.emplace_back(); - for (const auto & message : messages) { + auto & batch = _messageBatches.back(); + batch.reserve(messages.size()); + reserve_more_commands(messages.size()); + for (auto & message : messages) { if (_reply) { message._msg->getTrace().setLevel(_reply->getTrace().getLevel()); } - _messageBatches.back().push_back(message._msg->getMsgId()); - queueCommand(message._msg, message._target); + batch.push_back(message._msg->getMsgId()); + queueCommand(std::move(message._msg), message._target); } } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index 923ecf45649..ecc4732696b 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -16,7 +16,7 @@ struct PersistenceMessageTracker { using ToSend = MessageTracker::ToSend; virtual void fail(MessageSender&, const api::ReturnCode&) = 0; - virtual void queueMessageBatch(const std::vector<ToSend>&) = 0; + virtual void queueMessageBatch(std::vector<ToSend> messages) = 0; virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0; virtual std::shared_ptr<api::BucketInfoReply>& getReply() = 0; virtual void updateFromReply(MessageSender&, api::BucketInfoReply&, uint16_t node) = 0; @@ -65,7 +65,7 @@ public: have at most (messages.size() - initial redundancy) messages left in the queue and have it's first message be done. */ - void queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) override; + void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override; private: using MessageBatch = std::vector<uint64_t>; |