diff options
Diffstat (limited to 'storage/src/tests/distributor/pendingmessagetrackertest.cpp')
-rw-r--r-- | storage/src/tests/distributor/pendingmessagetrackertest.cpp | 125 |
1 files changed, 91 insertions, 34 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp index 8277281206d..35dc072b953 100644 --- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp +++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp @@ -1,19 +1,24 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> #include <vespa/document/base/testdocman.h> -#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> #include <vespa/storage/distributor/pendingmessagetracker.h> #include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageframework/defaultimplementation/clock/fakeclock.h> -#include <tests/common/dummystoragelink.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/gtest/gtest.h> #include <gmock/gmock.h> +#include <gtest/gtest.h> +#include <functional> +#include <vector> +using document::Bucket; +using document::BucketId; using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using namespace ::testing; @@ -23,8 +28,7 @@ namespace storage::distributor { using namespace std::chrono_literals; struct PendingMessageTrackerTest : Test { - void insertMessages(PendingMessageTracker& tracker); - + std::vector<std::shared_ptr<api::StorageCommand>> insertMessages(PendingMessageTracker& tracker); }; namespace { @@ -33,11 +37,7 @@ class RequestBuilder { uint16_t _toNode; std::chrono::milliseconds _atTime; public: - RequestBuilder() - : _toNode(0), - _atTime() - { - } + RequestBuilder() noexcept : _toNode(0), _atTime() {} RequestBuilder& atTime(std::chrono::milliseconds t) { _atTime = t; @@ -59,14 +59,12 @@ makeStorageAddress(uint16_t node) { return {&_storage, lib::NodeType::STORAGE, node}; } -class Fixture -{ - StorageComponentRegisterImpl _compReg; +class Fixture { + StorageComponentRegisterImpl _compReg; framework::defaultimplementation::FakeClock _clock; - std::unique_ptr<PendingMessageTracker> _tracker; - document::TestDocMan _testDocMan; + std::unique_ptr<PendingMessageTracker> _tracker; + document::TestDocMan _testDocMan; public: - Fixture(); ~Fixture(); @@ -110,21 +108,26 @@ public: PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } + std::vector<uint64_t> enumerate_msg_ids(const std::function<bool(const document::Bucket&)>& bucket_predicate) const { + std::vector<uint64_t> enumerated_ids; + auto insert_enumerated_ids = [&](uint64_t msg_id) { enumerated_ids.emplace_back(msg_id); }; + + _tracker->enumerate_matching_pending_bucket_ops(bucket_predicate, insert_enumerated_ids); + return enumerated_ids; + } + private: - std::string createDummyIdString(const document::BucketId& bucket) const { + static std::string createDummyIdString(const document::BucketId& bucket) { std::ostringstream id; id << "id:foo:testdoctype1:n=" << bucket.getId() << ":foo"; return id.str(); } - document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const - { + document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const { return _testDocMan.createDocument("foobar", createDummyIdString(bucket)); } - std::shared_ptr<api::RemoveCommand> createRemoveToNode( - uint16_t node) const - { + std::shared_ptr<api::RemoveCommand> createRemoveToNode(uint16_t node) const { document::BucketId bucket(16, 1234); auto cmd = std::make_shared<api::RemoveCommand>( makeDocumentBucket(bucket), @@ -195,9 +198,10 @@ TEST_F(PendingMessageTrackerTest, simple) { } } -void +std::vector<std::shared_ptr<api::StorageCommand>> PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) { + std::vector<std::shared_ptr<api::StorageCommand>> inserted; for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; ost << "id:footype:testdoc:n=1234:" << i; @@ -206,15 +210,19 @@ PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) document::DocumentId(ost.str()), 1000 + i); remove->setAddress(makeStorageAddress(i % 2)); tracker.insert(remove); + inserted.emplace_back(std::move(remove)); } for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; ost << "id:footype:testdoc:n=4567:" << i; - auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)), document::DocumentId(ost.str()), 2000 + i); + auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)), + document::DocumentId(ost.str()), 2000 + i); remove->setAddress(makeStorageAddress(i % 2)); tracker.insert(remove); + inserted.emplace_back(std::move(remove)); } + return inserted; } TEST_F(PendingMessageTrackerTest, start_page) { @@ -291,15 +299,13 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) { namespace { -class TestChecker : public PendingMessageTracker::Checker -{ +class TestChecker : public PendingMessageTracker::Checker { public: uint8_t pri; - TestChecker() : pri(UINT8_MAX) {} + TestChecker() noexcept : pri(UINT8_MAX) {} - bool check(uint32_t msgType, uint16_t node, uint8_t p) override { - (void) node; + bool check(uint32_t msgType, [[maybe_unused]] uint16_t node, uint8_t p) override { if (msgType == api::MessageType::REMOVE_ID) { pri = p; return false; @@ -309,7 +315,6 @@ public: } }; - } TEST_F(PendingMessageTrackerTest, get_pending_message_types) { @@ -370,12 +375,10 @@ TEST_F(PendingMessageTrackerTest, has_pending_message) { namespace { -class OperationEnumerator : public PendingMessageTracker::Checker -{ +class OperationEnumerator : public PendingMessageTracker::Checker { std::ostringstream ss; public: - bool check(uint32_t msgType, uint16_t node, uint8_t p) override { - (void) p; + bool check(uint32_t msgType, uint16_t node, [[maybe_unused]] uint8_t p) override { ss << api::MessageType::get(static_cast<api::MessageType::Id>(msgType)).getName() << " -> " << node << "\n"; @@ -574,4 +577,58 @@ TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_s } } +namespace { + +auto bucket_id_eq_fn(BucketId matched_id) { + return [matched_id](auto& bucket) noexcept { + return bucket.getBucketId() == matched_id; + }; +} + +auto bucket_eq_fn(Bucket matched_bucket) { + return [matched_bucket](auto& bucket) noexcept { + return bucket == matched_bucket; + }; +} + +} + +TEST_F(PendingMessageTrackerTest, can_enumerate_all_message_ids_for_ops_to_matching_buckets) { + Fixture f; + BucketId bucket_id_a(16, 1234); + BucketId bucket_id_b(16, 4567); + // This inserts 4 ops for bucket {16, 1234} (bucket 'a') and 4 ops for {16, 4567} (bucket 'b') + auto inserted_ops = insertMessages(f.tracker()); + std::vector<uint64_t> bucket_a_msgs_ids, bucket_b_msgs_ids; + for (auto& op : inserted_ops) { + if (op->getBucketId() == bucket_id_a) { + bucket_a_msgs_ids.emplace_back(op->getMsgId()); + } else { + bucket_b_msgs_ids.emplace_back(op->getMsgId()); + } + } + + // Match all for bucket 'a' + EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_a)), UnorderedElementsAreArray(bucket_a_msgs_ids)); + // Match all for bucket 'b' + EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_b)), UnorderedElementsAreArray(bucket_b_msgs_ids)); + // Match no buckets + EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(BucketId(16, 7890))), IsEmpty()); + + const auto default_space = document::FixedBucketSpaces::default_space(); + const auto global_space = document::FixedBucketSpaces::global_space(); + + // Message to global bucket space (the former messages were all in the default space) + auto global_msg = std::make_shared<api::RequestBucketInfoCommand>(global_space, std::vector<BucketId>({bucket_id_a})); + global_msg->setAddress(makeStorageAddress(3)); + f.tracker().insert(global_msg); + + // Default space has the expected existing entries + EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(default_space, bucket_id_a))), + UnorderedElementsAreArray(bucket_a_msgs_ids)); + // Global space has only 1 entry + EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(global_space, bucket_id_a))), + ElementsAre(global_msg->getMsgId())); +} + } |