diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-09-05 13:14:04 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-09-05 13:19:57 +0000 |
commit | 7b9ba34cd00ab2b9416fc55f1eef31a7fc43f717 (patch) | |
tree | 1f15ccb31d1bfdc8574e58fde742e3e856f25d2b /storage | |
parent | 58a82398bcfe66c119faced12bf68cdfc6407536 (diff) |
Enumerate pending message IDs on a bucket predicate basis
Lets a caller selectively enumerate all IDs of messages pending
towards buckets that match a caller-provided predicate function.
A separate message ID callback is invoked per distinct message.
Also remove hard-coded multi-index numeric indices in favor of
named constants.
Diffstat (limited to 'storage')
3 files changed, 155 insertions, 62 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 8277281206d..35dc072b953 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -1,19 +1,24 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> #include <vespa/document/base/testdocman.h> -#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> -#include <tests/common/dummystoragelink.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/gtest/gtest.h> #include <gmock/gmock.h> +#include <gtest/gtest.h> +#include <functional> +#include <vector> +using document::Bucket; +using document::BucketId; using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using namespace ::testing; @@ -23,8 +28,7 @@ namespace storage::distributor { using namespace std::chrono_literals; struct PendingMessageTrackerTest : Test { - void insertMessages(PendingMessageTracker& tracker); - + std::vector<std::shared_ptr<api::StorageCommand>> insertMessages(PendingMessageTracker& tracker); }; namespace { @@ -33,11 +37,7 @@ class RequestBuilder { uint16_t _toNode; std::chrono::milliseconds _atTime; public: - RequestBuilder() - : _toNode(0), - _atTime() - { - } + RequestBuilder() noexcept : _toNode(0), _atTime() {} RequestBuilder& atTime(std::chrono::milliseconds t) { _atTime = t; @@ -59,14 +59,12 @@ makeStorageAddress(uint16_t node) { return {&_storage, lib::NodeType::STORAGE, node}; } -class Fixture -{ - StorageComponentRegisterImpl _compReg; +class Fixture { + StorageComponentRegisterImpl _compReg; framework::defaultimplementation::FakeClock _clock; - std::unique_ptr<PendingMessageTracker> _tracker; - document::TestDocMan _testDocMan; + std::unique_ptr<PendingMessageTracker> _tracker; + document::TestDocMan _testDocMan; public: - Fixture(); ~Fixture(); @@ -110,21 +108,26 @@ public: PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } + std::vector<uint64_t> enumerate_msg_ids(const std::function<bool(const document::Bucket&)>& bucket_predicate) const { + std::vector<uint64_t> enumerated_ids; + auto insert_enumerated_ids = [&](uint64_t msg_id) { enumerated_ids.emplace_back(msg_id); }; + + _tracker->enumerate_matching_pending_bucket_ops(bucket_predicate, insert_enumerated_ids); + return enumerated_ids; + } + private: - std::string createDummyIdString(const document::BucketId& bucket) const { + static std::string createDummyIdString(const document::BucketId& bucket) { std::ostringstream id; id << "id:foo:testdoctype1:n=" << bucket.getId() << ":foo"; return id.str(); } - document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const - { + document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const { return _testDocMan.createDocument("foobar", createDummyIdString(bucket)); } - std::shared_ptr<api::RemoveCommand> createRemoveToNode( - uint16_t node) const - { + std::shared_ptr<api::RemoveCommand> createRemoveToNode(uint16_t node) const { document::BucketId bucket(16, 1234); auto cmd = std::make_shared<api::RemoveCommand>( makeDocumentBucket(bucket), @@ -195,9 +198,10 @@ TEST_F(PendingMessageTrackerTest, simple) { } } -void +std::vector<std::shared_ptr<api::StorageCommand>> PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) { + std::vector<std::shared_ptr<api::StorageCommand>> inserted; for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; ost << "id:footype:testdoc:n=1234:" << i; @@ -206,15 +210,19 @@ PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) document::DocumentId(ost.str()), 1000 + i); remove->setAddress(makeStorageAddress(i % 2)); tracker.insert(remove); + inserted.emplace_back(std::move(remove)); } for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; ost << "id:footype:testdoc:n=4567:" << i; - auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)), document::DocumentId(ost.str()), 2000 + i); + auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)), + document::DocumentId(ost.str()), 2000 + i); remove->setAddress(makeStorageAddress(i % 2)); tracker.insert(remove); + inserted.emplace_back(std::move(remove)); } + return inserted; } TEST_F(PendingMessageTrackerTest, start_page) { @@ -291,15 +299,13 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { namespace { -class TestChecker : public PendingMessageTracker::Checker -{ +class TestChecker : public PendingMessageTracker::Checker { public: uint8_t pri; - TestChecker() : pri(UINT8_MAX) {} + TestChecker() noexcept : pri(UINT8_MAX) {} - bool check(uint32_t msgType, uint16_t node, uint8_t p) override { - (void) node; + bool check(uint32_t msgType, [[maybe_unused]] uint16_t node, uint8_t p) override { if (msgType == api::MessageType::REMOVE_ID) { pri = p; return false; @@ -309,7 +315,6 @@ public: } }; - } TEST_F(PendingMessageTrackerTest, get_pending_message_types) { @@ -370,12 +375,10 @@ TEST_F(PendingMessageTrackerTest, has_pending_message) { namespace { -class OperationEnumerator : public PendingMessageTracker::Checker -{ +class OperationEnumerator : public PendingMessageTracker::Checker { std::ostringstream ss; public: - bool check(uint32_t msgType, uint16_t node, uint8_t p) override { - (void) p; + bool check(uint32_t msgType, uint16_t node, [[maybe_unused]] uint8_t p) override { ss << api::MessageType::get(static_cast<api::MessageType::Id>(msgType)).getName() << " -> " << node << "\n"; @@ -574,4 +577,58 @@ TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_s } } +namespace { + +auto bucket_id_eq_fn(BucketId matched_id) { + return [matched_id](auto& bucket) noexcept { + return bucket.getBucketId() == matched_id; + }; +} + +auto bucket_eq_fn(Bucket matched_bucket) { + return [matched_bucket](auto& bucket) noexcept { + return bucket == matched_bucket; + }; +} + +} + +TEST_F(PendingMessageTrackerTest, can_enumerate_all_message_ids_for_ops_to_matching_buckets) { + Fixture f; + BucketId bucket_id_a(16, 1234); + BucketId bucket_id_b(16, 4567); + // This inserts 4 ops for bucket {16, 1234} (bucket 'a') and 4 ops for {16, 4567} (bucket 'b') + auto inserted_ops = insertMessages(f.tracker()); + std::vector<uint64_t> bucket_a_msgs_ids, bucket_b_msgs_ids; + for (auto& op : inserted_ops) { + if (op->getBucketId() == bucket_id_a) { + bucket_a_msgs_ids.emplace_back(op->getMsgId()); + } else { + bucket_b_msgs_ids.emplace_back(op->getMsgId()); + } + } + + // Match all for bucket 'a' + EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_a)), UnorderedElementsAreArray(bucket_a_msgs_ids)); + // Match all for bucket 'b' + EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_b)), UnorderedElementsAreArray(bucket_b_msgs_ids)); + // Match no buckets + EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(BucketId(16, 7890))), IsEmpty()); + + const auto default_space = document::FixedBucketSpaces::default_space(); + const auto global_space = document::FixedBucketSpaces::global_space(); + + // Message to global bucket space (the former messages were all in the default space) + auto global_msg = std::make_shared<api::RequestBucketInfoCommand>(global_space, std::vector<BucketId>({bucket_id_a})); + global_msg->setAddress(makeStorageAddress(3)); + f.tracker().insert(global_msg); + + // Default space has the expected existing entries + EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(default_space, bucket_id_a))), + UnorderedElementsAreArray(bucket_a_msgs_ids)); + // Global space has only 1 entry + EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(global_space, bucket_id_a))), + ElementsAre(global_msg->getMsgId())); +} + } diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp index 7b3cdacf702..c32b3b83c05 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.cpp @@ -40,7 +40,8 @@ PendingMessageTracker::MessageEntry::toHtml() const { vespalib::asciistream ss; ss << "<li><i>Node " << nodeIdx << "</i>: " << "<b>" << vespalib::to_string(timeStamp) << "</b> " - << api::MessageType::get(api::MessageType::Id(msgType)).getName() << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n"; + << api::MessageType::get(api::MessageType::Id(msgType)).getName() + << "(" << bucket.getBucketId() << ", priority=" << priority << ")</li>\n"; return ss.str(); } @@ -83,7 +84,7 @@ std::vector<uint64_t> PendingMessageTracker::clearMessagesForNode(uint16_t node) { std::lock_guard guard(_lock); - MessagesByNodeAndBucket& idx(boost::multi_index::get<1>(_messages)); + auto& idx = boost::multi_index::get<IndexByNodeAndBucket>(_messages); auto range = pairAsRange(idx.equal_range(boost::make_tuple(node))); std::vector<uint64_t> erasedIds; @@ -97,6 +98,27 @@ PendingMessageTracker::clearMessagesForNode(uint16_t node) } void +PendingMessageTracker::enumerate_matching_pending_bucket_ops( + const std::function<bool(const document::Bucket&)>& bucket_predicate, + const std::function<void(uint64_t)>& msg_id_callback) const +{ + std::lock_guard guard(_lock); + const auto& idx = boost::multi_index::get<IndexByBucketAndType>(_messages); + auto iter = idx.begin(); + const auto last = idx.end(); + while (iter != last) { + const auto check_bucket = iter->bucket; + const bool match = bucket_predicate(check_bucket); + do { + if (match) { + msg_id_callback(iter->msgId); + } + ++iter; + } while ((iter != last) && (iter->bucket == check_bucket)); + } +} + +void PendingMessageTracker::insert(const std::shared_ptr<api::StorageMessage>& msg) { if (msg->getAddress()) { @@ -126,8 +148,8 @@ PendingMessageTracker::reply(const api::StorageReply& r) uint64_t msgId = r.getMsgId(); std::unique_lock guard(_lock); - MessagesByMsgId& msgs = boost::multi_index::get<0>(_messages); - MessagesByMsgId::iterator iter = msgs.find(msgId); + auto& msgs = boost::multi_index::get<IndexByMessageId>(_messages); + auto iter = msgs.find(msgId); if (iter != msgs.end()) { bucket = iter->bucket; _nodeInfo.decPending(r.getAddress()->getIndex()); @@ -184,7 +206,7 @@ bool range_is_empty_or_only_has_read_ops(const Range& range) noexcept { bool PendingMessageTracker::bucket_has_no_pending_write_ops(const document::Bucket& bucket) const noexcept { - auto& bucket_idx = boost::multi_index::get<2>(_messages); + auto& bucket_idx = boost::multi_index::get<IndexByBucketAndType>(_messages); auto pending_tasks_for_bucket = bucket_idx.equal_range(bucket); return range_is_empty_or_only_has_read_ops(pending_tasks_for_bucket); } @@ -243,30 +265,30 @@ runCheckerOnRange(PendingMessageTracker::Checker& checker, const Range& range) } void -PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucket &bucket, Checker& checker) const +PendingMessageTracker::checkPendingMessages(uint16_t node, const document::Bucket& bucket, Checker& checker) const { std::lock_guard guard(_lock); - const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); + const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages); auto range = pairAsRange(msgs.equal_range(boost::make_tuple(node, bucket))); runCheckerOnRange(checker, range); } void -PendingMessageTracker::checkPendingMessages(const document::Bucket &bucket, Checker& checker) const +PendingMessageTracker::checkPendingMessages(const document::Bucket& bucket, Checker& checker) const { std::lock_guard guard(_lock); - const MessagesByBucketAndType& msgs(boost::multi_index::get<2>(_messages)); + const auto& msgs = boost::multi_index::get<IndexByBucketAndType>(_messages); auto range = pairAsRange(msgs.equal_range(boost::make_tuple(bucket))); runCheckerOnRange(checker, range); } bool -PendingMessageTracker::hasPendingMessage(uint16_t node, const document::Bucket &bucket, uint32_t messageType) const +PendingMessageTracker::hasPendingMessage(uint16_t node, const document::Bucket& bucket, uint32_t messageType) const { std::lock_guard guard(_lock); - const MessagesByNodeAndBucket& msgs(boost::multi_index::get<1>(_messages)); + const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages); auto range = msgs.equal_range(boost::make_tuple(node, bucket, messageType)); return (range.first != range.second); @@ -283,7 +305,7 @@ void PendingMessageTracker::getStatusPerBucket(std::ostream& out) const { std::lock_guard guard(_lock); - const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages); + const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages); using BucketMap = std::map<document::Bucket, std::vector<vespalib::string>>; BucketMap perBucketMsgs; for (const auto& msg : msgs) { @@ -312,9 +334,9 @@ void PendingMessageTracker::getStatusPerNode(std::ostream& out) const { std::lock_guard guard(_lock); - const MessagesByNodeAndBucket& msgs = boost::multi_index::get<1>(_messages); + const auto& msgs = boost::multi_index::get<IndexByNodeAndBucket>(_messages); int lastNode = -1; - for (const auto & node : msgs) { + for (const auto& node : msgs) { if (node.nodeIdx != lastNode) { if (lastNode != -1) { out << "</ul>\n"; diff --git a/storage/src/vespa/storage/distributor/pendingmessagetracker.h b/storage/src/vespa/storage/distributor/pendingmessagetracker.h index 4b5655d3f3c..736f2918401 100644 --- a/storage/src/vespa/storage/distributor/pendingmessagetracker.h +++ b/storage/src/vespa/storage/distributor/pendingmessagetracker.h @@ -2,22 +2,23 @@ #pragma once #include "nodeinfo.h" -#include <vespa/storageframework/generic/status/htmlstatusreporter.h> -#include <vespa/storageframework/generic/component/componentregister.h> -#include <vespa/storageframework/generic/component/component.h> #include <vespa/storageapi/message/bucket.h> +#include <vespa/storageframework/generic/component/component.h> +#include <vespa/storageframework/generic/component/componentregister.h> +#include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/vespalib/stllike/hash_set.h> -#include <boost/multi_index_container.hpp> +#include <boost/multi_index/composite_key.hpp> #include <boost/multi_index/identity.hpp> -#include <boost/multi_index/member.hpp> #include <boost/multi_index/mem_fun.hpp> +#include <boost/multi_index/member.hpp> #include <boost/multi_index/ordered_index.hpp> #include <boost/multi_index/sequenced_index.hpp> -#include <boost/multi_index/composite_key.hpp> -#include <set> -#include <unordered_map> +#include <boost/multi_index_container.hpp> #include <chrono> +#include <functional> #include <mutex> +#include <set> +#include <unordered_map> namespace storage::distributor { @@ -84,20 +85,20 @@ public: * passing it to the given type checker. * Breaks when the checker returns false. */ - void checkPendingMessages(uint16_t node, const document::Bucket &bucket, Checker& checker) const; + void checkPendingMessages(uint16_t node, const document::Bucket& bucket, Checker& checker) const; /** * Goes through each pending message (across all nodes) for the given bucket * and invokes the given checker with the node, message type and priority. * Breaks when the checker returns false. */ - void checkPendingMessages(const document::Bucket &bucket, Checker& checker) const; + void checkPendingMessages(const document::Bucket& bucket, Checker& checker) const; /** * Utility function for checking if there's a message of type * messageType pending to bucket bid on the given node. */ - bool hasPendingMessage(uint16_t node, const document::Bucket &bucket, uint32_t messageType) const; + bool hasPendingMessage(uint16_t node, const document::Bucket& bucket, uint32_t messageType) const; /** * Returns a vector containing the number of pending messages to each storage node. @@ -119,6 +120,17 @@ public: void run_once_no_pending_for_bucket(const document::Bucket& bucket, std::unique_ptr<DeferredTask> task); void abort_deferred_tasks(); + + /** + * For each distinct bucket with at least one pending message towards it: + * + * Iff `bucket_predicate(bucket) == true`, `msg_id_callback` is invoked once for _each_ + * message towards `bucket`, with the message ID as the argument. + * + * Note: `bucket_predicate` is only invoked once per distinct bucket. + */ + void enumerate_matching_pending_bucket_ops(const std::function<bool(const document::Bucket&)>& bucket_predicate, + const std::function<void(uint64_t)>& msg_id_callback) const; private: struct MessageEntry { TimePoint timeStamp; @@ -169,9 +181,11 @@ private: > >; - using MessagesByMsgId = Messages::nth_index<0>::type; - using MessagesByNodeAndBucket = Messages::nth_index<1>::type; - using MessagesByBucketAndType = Messages::nth_index<2>::type; + // Must match Messages::nth_index<N> + static constexpr uint32_t IndexByMessageId = 0; + static constexpr uint32_t IndexByNodeAndBucket = 1; + static constexpr uint32_t IndexByBucketAndType = 2; + using DeferredBucketTaskMap = std::unordered_multimap< document::Bucket, std::unique_ptr<DeferredTask>, |