summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-08-10 10:57:54 +0200
committerGitHub <noreply@github.com>2023-08-10 10:57:54 +0200
commit16ade10496858b0046e45d44052170be381e503f (patch)
tree37920210efcda4acc590f6f7be676985bbdfe540
parent2e58bbfce1044058a098e7ec7430c126cd0a01ee (diff)
parent465b85e3855897d39faa27511033500307c46a41 (diff)
Merge pull request #28008 from vespa-engine/balder/use-hash_map-for-message-tracking
Use faster and lighter hash_map for message tracking in distributor.
-rw-r--r--storage/src/vespa/storage/distributor/messagetracker.cpp27
-rw-r--r--storage/src/vespa/storage/distributor/messagetracker.h21
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp8
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp5
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp15
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h4
13 files changed, 57 insertions, 68 deletions
diff --git a/storage/src/vespa/storage/distributor/messagetracker.cpp b/storage/src/vespa/storage/distributor/messagetracker.cpp
index 8830e5ecabc..28fbaad4619 100644
--- a/storage/src/vespa/storage/distributor/messagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/messagetracker.cpp
@@ -3,6 +3,7 @@
#include "messagetracker.h"
#include <vespa/storageapi/messageapi/bucketcommand.h>
#include <vespa/storageapi/messageapi/bucketreply.h>
+#include <vespa/vespalib/stllike/hash_map.hpp>
#include <cinttypes>
#include <vespa/log/log.h>
@@ -19,10 +20,11 @@ MessageTracker::~MessageTracker() = default;
void
MessageTracker::flushQueue(MessageSender& sender)
{
- for (uint32_t i = 0; i < _commandQueue.size(); i++) {
- _commandQueue[i]._msg->setAddress(api::StorageMessageAddress::create(_cluster_ctx.cluster_name_ptr(), lib::NodeType::STORAGE, _commandQueue[i]._target));
- _sentMessages[_commandQueue[i]._msg->getMsgId()] = _commandQueue[i]._target;
- sender.sendCommand(_commandQueue[i]._msg);
+ _sentMessages.resize(_commandQueue.size());
+ for (const auto & toSend : _commandQueue) {
+ toSend._msg->setAddress(api::StorageMessageAddress::create(_cluster_ctx.cluster_name_ptr(), lib::NodeType::STORAGE, toSend._target));
+ _sentMessages[toSend._msg->getMsgId()] = toSend._target;
+ sender.sendCommand(toSend._msg);
}
_commandQueue.clear();
@@ -31,21 +33,14 @@ MessageTracker::flushQueue(MessageSender& sender)
uint16_t
MessageTracker::handleReply(api::BucketReply& reply)
{
- std::map<uint64_t, uint16_t>::iterator found = _sentMessages.find(reply.getMsgId());
- if (found == _sentMessages.end()) {
+ const auto found = _sentMessages.find(reply.getMsgId());
+ if (found == _sentMessages.end()) [[unlikely]] {
LOG(warning, "Received reply %" PRIu64 " for callback which we have no recollection of", reply.getMsgId());
return (uint16_t)-1;
- } else {
- uint16_t node = found->second;
- _sentMessages.erase(found);
- return node;
}
-}
-
-bool
-MessageTracker::finished()
-{
- return _sentMessages.empty();
+ uint16_t node = found->second;
+ _sentMessages.erase(found);
+ return node;
}
}
diff --git a/storage/src/vespa/storage/distributor/messagetracker.h b/storage/src/vespa/storage/distributor/messagetracker.h
index 73e2461eb7a..51d29551de0 100644
--- a/storage/src/vespa/storage/distributor/messagetracker.h
+++ b/storage/src/vespa/storage/distributor/messagetracker.h
@@ -4,8 +4,7 @@
#include <vespa/storage/common/cluster_context.h>
#include <vespa/storage/common/messagesender.h>
#include <vespa/vespalib/stllike/string.h>
-#include <vector>
-#include <map>
+#include <vespa/vespalib/stllike/hash_map.h>
namespace storage::api {
class BucketCommand;
@@ -18,8 +17,9 @@ class MessageTracker {
public:
class ToSend {
public:
- ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) noexcept :
- _msg(std::move(msg)), _target(target) {};
+ ToSend(std::shared_ptr<api::BucketCommand> msg, uint16_t target) noexcept
+ : _msg(std::move(msg)), _target(target)
+ {}
std::shared_ptr<api::BucketCommand> _msg;
uint16_t _target;
@@ -35,6 +35,9 @@ public:
void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) {
_commandQueue.emplace_back(std::move(msg), target);
}
+ void reserve_more_commands(size_t sz) {
+ _commandQueue.reserve(_commandQueue.size() + sz);
+ }
void flushQueue(MessageSender& sender);
@@ -46,13 +49,15 @@ public:
/**
Returns true if all messages sent have been received.
*/
- bool finished();
+ bool finished() const noexcept {
+ return _sentMessages.empty();
+ }
protected:
- std::vector<ToSend> _commandQueue;
+ std::vector<ToSend> _commandQueue;
// Keeps track of which node a message was sent to.
- std::map<uint64_t, uint16_t> _sentMessages;
- const ClusterContext& _cluster_ctx;
+ vespalib::hash_map<uint64_t, uint16_t> _sentMessages;
+ const ClusterContext& _cluster_ctx;
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 324cb3691e4..f76b6495443 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -210,11 +210,11 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
}
if (!createBucketBatch.empty()) {
- _tracker.queueMessageBatch(createBucketBatch);
+ _tracker.queueMessageBatch(std::move(createBucketBatch));
}
std::vector<PersistenceMessageTracker::ToSend> putBatch;
-
+ putBatch.reserve(targets.size());
// Now send PUTs
for (const auto& target : targets) {
sendPutToBucketOnNode(_msg->getBucket().getBucketSpace(), target.getBucketId(),
@@ -222,7 +222,7 @@ void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sen
}
if (!putBatch.empty()) {
- _tracker.queueMessageBatch(putBatch);
+ _tracker.queueMessageBatch(std::move(putBatch));
} else {
const char* error = "Can't store document: No storage nodes available";
LOG(debug, "%s", error);
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
index dd6e1e93791..5f52a8208fc 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
@@ -79,13 +79,11 @@ RemoveLocationOperation::onStart(DistributorStripeMessageSender& sender)
std::vector<uint16_t> nodes = e->getNodes();
for (uint32_t i = 0; i < nodes.size(); i++) {
- std::shared_ptr<api::RemoveLocationCommand> command(
- new api::RemoveLocationCommand(
- _msg->getDocumentSelection(),
- document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId())));
+ auto command = std::make_shared<api::RemoveLocationCommand>(_msg->getDocumentSelection(),
+ document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()));
copyMessageSettings(*_msg, *command);
- _tracker.queueCommand(command, nodes[i]);
+ _tracker.queueCommand(std::move(command), nodes[i]);
sent = true;
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index 96182b0744f..42d8e318f47 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -65,9 +65,7 @@ void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& s
void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSender& sender) {
LOG(spam, "Started remove on document %s", _msg->getDocumentId().toString().c_str());
- document::BucketId bucketId(
- _node_ctx.bucket_id_factory().getBucketId(
- _msg->getDocumentId()));
+ document::BucketId bucketId(_node_ctx.bucket_id_factory().getBucketId(_msg->getDocumentId()));
std::vector<BucketDatabase::Entry> entries;
_bucket_space.getBucketDatabase().getParents(bucketId, entries);
@@ -79,8 +77,7 @@ void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSende
messages.reserve(e->getNodeCount());
for (uint32_t i = 0; i < e->getNodeCount(); i++) {
auto command = std::make_shared<api::RemoveCommand>(document::Bucket(_msg->getBucket().getBucketSpace(), e.getBucketId()),
- _msg->getDocumentId(),
- _msg->getTimestamp());
+ _msg->getDocumentId(), _msg->getTimestamp());
copyMessageSettings(*_msg, *command);
command->getTrace().setLevel(_msg->getTrace().getLevel());
@@ -90,7 +87,7 @@ void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSende
sent = true;
}
- _tracker.queueMessageBatch(messages);
+ _tracker.queueMessageBatch(std::move(messages));
}
if (!sent) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 8988f2589ce..f43a6092372 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -106,19 +106,18 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender)
const std::vector<uint16_t>& nodes = entry->getNodes();
std::vector<MessageTracker::ToSend> messages;
+ messages.reserve(nodes.size());
for (uint16_t node : nodes) {
- auto command = std::make_shared<api::UpdateCommand>(
- document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()),
- _msg->getUpdate(),
- _msg->getTimestamp());
+ auto command = std::make_shared<api::UpdateCommand>(document::Bucket(_msg->getBucket().getBucketSpace(), entry.getBucketId()),
+ _msg->getUpdate(), _msg->getTimestamp());
copyMessageSettings(*_msg, *command);
command->setOldTimestamp(_msg->getOldTimestamp());
command->setCondition(_msg->getCondition());
messages.emplace_back(std::move(command), node);
}
- _tracker.queueMessageBatch(messages);
+ _tracker.queueMessageBatch(std::move(messages));
}
_tracker.flushQueue(sender);
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
index 5599f9fb51e..2e6d0e95ec9 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/garbagecollectionoperation.cpp
@@ -87,7 +87,7 @@ void GarbageCollectionOperation::send_current_phase_remove_locations(Distributor
command->setPriority((_phase != Phase::WriteRemovesPhase)
? _priority
: _manager->operation_context().distributor_config().default_external_feed_priority());
- _tracker.queueCommand(command, nodes[i]);
+ _tracker.queueCommand(std::move(command), nodes[i]);
}
_tracker.flushQueue(sender);
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
index 0e9873f3434..616c4962dca 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/joinoperation.cpp
@@ -81,12 +81,11 @@ JoinOperation::enqueueJoinMessagePerTargetNode(
return false;
}
for (const auto& node : nodeToBuckets) {
- std::shared_ptr<api::JoinBucketsCommand> msg(
- new api::JoinBucketsCommand(getBucket()));
+ auto msg = std::make_shared<api::JoinBucketsCommand>(getBucket());
msg->getSourceBuckets() = node.second;
msg->setTimeout(MAX_TIMEOUT);
setCommandMeta(*msg);
- _tracker.queueCommand(msg, node.first);
+ _tracker.queueCommand(std::move(msg), node.first);
}
return true;
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
index e46ccebffba..7bec6bbe53a 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/removebucketoperation.cpp
@@ -24,16 +24,11 @@ RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender)
uint16_t node = getNodes()[i];
const BucketCopy* copy(entry->getNode(node));
if (!copy) {
- LOG(debug, "Node %u was removed between scheduling remove "
- "operation and starting it; not sending DeleteBucket to it",
- node);
+ LOG(debug, "Node %u was removed between scheduling remove operation and starting it; not sending DeleteBucket to it", node);
continue;
}
- LOG(debug, "Sending DeleteBucket for %s to node %u",
- getBucketId().toString().c_str(),
- node);
- std::shared_ptr<api::DeleteBucketCommand> msg(
- new api::DeleteBucketCommand(getBucket()));
+ LOG(debug, "Sending DeleteBucket for %s to node %u", getBucketId().toString().c_str(), node);
+ auto msg = std::make_shared<api::DeleteBucketCommand>(getBucket());
setCommandMeta(*msg);
msg->setBucketInfo(copy->getBucketInfo());
msgs.push_back(std::make_pair(node, msg));
@@ -42,8 +37,8 @@ RemoveBucketOperation::onStartInternal(DistributorStripeMessageSender& sender)
_ok = true;
if (!getNodes().empty()) {
_manager->operation_context().remove_nodes_from_bucket_database(getBucket(), getNodes());
- for (uint32_t i = 0; i < msgs.size(); ++i) {
- _tracker.queueCommand(msgs[i].second, msgs[i].first);
+ for (auto & msg : msgs) {
+ _tracker.queueCommand(std::move(msg.second), msg.first);
}
_tracker.flushQueue(sender);
}
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
index 00906d22ea4..9547bee6583 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/setbucketstateoperation.cpp
@@ -26,11 +26,9 @@ SetBucketStateOperation::enqueueSetBucketStateCommand(uint16_t node, bool active
active
? api::SetBucketStateCommand::ACTIVE
: api::SetBucketStateCommand::INACTIVE);
- LOG(debug, "Enqueuing %s for %s to node %u",
- active ? "Activate" : "Deactivate",
- getBucketId().toString().c_str(), node);
+ LOG(debug, "Enqueuing %s for %s to node %u", active ? "Activate" : "Deactivate", getBucketId().toString().c_str(), node);
setCommandMeta(*msg);
- _tracker.queueCommand(msg, node);
+ _tracker.queueCommand(std::move(msg), node);
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
index 8e64fb227a7..d704a42e96b 100644
--- a/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/idealstate/splitoperation.cpp
@@ -35,7 +35,7 @@ SplitOperation::onStart(DistributorStripeMessageSender& sender)
msg->setMinByteSize(_splitSize);
msg->setTimeout(MAX_TIMEOUT);
setCommandMeta(*msg);
- _tracker.queueCommand(msg, entry->getNodeRef(i).getNode());
+ _tracker.queueCommand(std::move(msg), entry->getNodeRef(i).getNode());
_ok = true;
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index a30663bde2f..fe384a68d72 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -101,15 +101,18 @@ PersistenceMessageTrackerImpl::revert(
}
void
-PersistenceMessageTrackerImpl::queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) {
+PersistenceMessageTrackerImpl::queueMessageBatch(std::vector<MessageTracker::ToSend> messages) {
_messageBatches.emplace_back();
- for (const auto & message : messages) {
+ auto & batch = _messageBatches.back();
+ batch.reserve(messages.size());
+ reserve_more_commands(messages.size());
+ for (auto & message : messages) {
if (_reply) {
message._msg->getTrace().setLevel(_reply->getTrace().getLevel());
}
- _messageBatches.back().push_back(message._msg->getMsgId());
- queueCommand(message._msg, message._target);
+ batch.push_back(message._msg->getMsgId());
+ queueCommand(std::move(message._msg), message._target);
}
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index 923ecf45649..ecc4732696b 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -16,7 +16,7 @@ struct PersistenceMessageTracker {
using ToSend = MessageTracker::ToSend;
virtual void fail(MessageSender&, const api::ReturnCode&) = 0;
- virtual void queueMessageBatch(const std::vector<ToSend>&) = 0;
+ virtual void queueMessageBatch(std::vector<ToSend> messages) = 0;
virtual uint16_t receiveReply(MessageSender&, api::BucketInfoReply&) = 0;
virtual std::shared_ptr<api::BucketInfoReply>& getReply() = 0;
virtual void updateFromReply(MessageSender&, api::BucketInfoReply&, uint16_t node) = 0;
@@ -65,7 +65,7 @@ public:
have at most (messages.size() - initial redundancy) messages left in the
queue and have it's first message be done.
*/
- void queueMessageBatch(const std::vector<MessageTracker::ToSend>& messages) override;
+ void queueMessageBatch(std::vector<MessageTracker::ToSend> messages) override;
private:
using MessageBatch = std::vector<uint64_t>;