summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/pendingmessagetrackertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/pendingmessagetrackertest.cpp')
-rw-r--r--storage/src/tests/distributor/pendingmessagetrackertest.cpp125
1 files changed, 91 insertions, 34 deletions
diff --git a/storage/src/tests/distributor/pendingmessagetrackertest.cpp b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
index 8277281206d..35dc072b953 100644
--- a/storage/src/tests/distributor/pendingmessagetrackertest.cpp
+++ b/storage/src/tests/distributor/pendingmessagetrackertest.cpp
@@ -1,19 +1,24 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
#include <vespa/document/base/testdocman.h>
-#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
#include <vespa/storage/distributor/pendingmessagetracker.h>
#include <vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageframework/defaultimplementation/clock/fakeclock.h>
-#include <tests/common/dummystoragelink.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/util/lambdatask.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+#include <functional>
+#include <vector>
+using document::Bucket;
+using document::BucketId;
using document::test::makeDocumentBucket;
using document::test::makeBucketSpace;
using namespace ::testing;
@@ -23,8 +28,7 @@ namespace storage::distributor {
using namespace std::chrono_literals;
struct PendingMessageTrackerTest : Test {
- void insertMessages(PendingMessageTracker& tracker);
-
+ std::vector<std::shared_ptr<api::StorageCommand>> insertMessages(PendingMessageTracker& tracker);
};
namespace {
@@ -33,11 +37,7 @@ class RequestBuilder {
uint16_t _toNode;
std::chrono::milliseconds _atTime;
public:
- RequestBuilder()
- : _toNode(0),
- _atTime()
- {
- }
+ RequestBuilder() noexcept : _toNode(0), _atTime() {}
RequestBuilder& atTime(std::chrono::milliseconds t) {
_atTime = t;
@@ -59,14 +59,12 @@ makeStorageAddress(uint16_t node) {
return {&_storage, lib::NodeType::STORAGE, node};
}
-class Fixture
-{
- StorageComponentRegisterImpl _compReg;
+class Fixture {
+ StorageComponentRegisterImpl _compReg;
framework::defaultimplementation::FakeClock _clock;
- std::unique_ptr<PendingMessageTracker> _tracker;
- document::TestDocMan _testDocMan;
+ std::unique_ptr<PendingMessageTracker> _tracker;
+ document::TestDocMan _testDocMan;
public:
-
Fixture();
~Fixture();
@@ -110,21 +108,26 @@ public:
PendingMessageTracker& tracker() { return *_tracker; }
auto& clock() { return _clock; }
+ std::vector<uint64_t> enumerate_msg_ids(const std::function<bool(const document::Bucket&)>& bucket_predicate) const {
+ std::vector<uint64_t> enumerated_ids;
+ auto insert_enumerated_ids = [&](uint64_t msg_id) { enumerated_ids.emplace_back(msg_id); };
+
+ _tracker->enumerate_matching_pending_bucket_ops(bucket_predicate, insert_enumerated_ids);
+ return enumerated_ids;
+ }
+
private:
- std::string createDummyIdString(const document::BucketId& bucket) const {
+ static std::string createDummyIdString(const document::BucketId& bucket) {
std::ostringstream id;
id << "id:foo:testdoctype1:n=" << bucket.getId() << ":foo";
return id.str();
}
- document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const
- {
+ document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const {
return _testDocMan.createDocument("foobar", createDummyIdString(bucket));
}
- std::shared_ptr<api::RemoveCommand> createRemoveToNode(
- uint16_t node) const
- {
+ std::shared_ptr<api::RemoveCommand> createRemoveToNode(uint16_t node) const {
document::BucketId bucket(16, 1234);
auto cmd = std::make_shared<api::RemoveCommand>(
makeDocumentBucket(bucket),
@@ -195,9 +198,10 @@ TEST_F(PendingMessageTrackerTest, simple) {
}
}
-void
+std::vector<std::shared_ptr<api::StorageCommand>>
PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
{
+ std::vector<std::shared_ptr<api::StorageCommand>> inserted;
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
ost << "id:footype:testdoc:n=1234:" << i;
@@ -206,15 +210,19 @@ PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker)
document::DocumentId(ost.str()), 1000 + i);
remove->setAddress(makeStorageAddress(i % 2));
tracker.insert(remove);
+ inserted.emplace_back(std::move(remove));
}
for (uint32_t i = 0; i < 4; i++) {
std::ostringstream ost;
ost << "id:footype:testdoc:n=4567:" << i;
- auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)), document::DocumentId(ost.str()), 2000 + i);
+ auto remove = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(16, 4567)),
+ document::DocumentId(ost.str()), 2000 + i);
remove->setAddress(makeStorageAddress(i % 2));
tracker.insert(remove);
+ inserted.emplace_back(std::move(remove));
}
+ return inserted;
}
TEST_F(PendingMessageTrackerTest, start_page) {
@@ -291,15 +299,13 @@ TEST_F(PendingMessageTrackerTest, multiple_messages) {
namespace {
-class TestChecker : public PendingMessageTracker::Checker
-{
+class TestChecker : public PendingMessageTracker::Checker {
public:
uint8_t pri;
- TestChecker() : pri(UINT8_MAX) {}
+ TestChecker() noexcept : pri(UINT8_MAX) {}
- bool check(uint32_t msgType, uint16_t node, uint8_t p) override {
- (void) node;
+ bool check(uint32_t msgType, [[maybe_unused]] uint16_t node, uint8_t p) override {
if (msgType == api::MessageType::REMOVE_ID) {
pri = p;
return false;
@@ -309,7 +315,6 @@ public:
}
};
-
}
TEST_F(PendingMessageTrackerTest, get_pending_message_types) {
@@ -370,12 +375,10 @@ TEST_F(PendingMessageTrackerTest, has_pending_message) {
namespace {
-class OperationEnumerator : public PendingMessageTracker::Checker
-{
+class OperationEnumerator : public PendingMessageTracker::Checker {
std::ostringstream ss;
public:
- bool check(uint32_t msgType, uint16_t node, uint8_t p) override {
- (void) p;
+ bool check(uint32_t msgType, uint16_t node, [[maybe_unused]] uint8_t p) override {
ss << api::MessageType::get(static_cast<api::MessageType::Id>(msgType)).getName()
<< " -> " << node << "\n";
@@ -574,4 +577,58 @@ TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_s
}
}
+namespace {
+
+auto bucket_id_eq_fn(BucketId matched_id) {
+ return [matched_id](auto& bucket) noexcept {
+ return bucket.getBucketId() == matched_id;
+ };
+}
+
+auto bucket_eq_fn(Bucket matched_bucket) {
+ return [matched_bucket](auto& bucket) noexcept {
+ return bucket == matched_bucket;
+ };
+}
+
+}
+
+TEST_F(PendingMessageTrackerTest, can_enumerate_all_message_ids_for_ops_to_matching_buckets) {
+ Fixture f;
+ BucketId bucket_id_a(16, 1234);
+ BucketId bucket_id_b(16, 4567);
+ // This inserts 4 ops for bucket {16, 1234} (bucket 'a') and 4 ops for {16, 4567} (bucket 'b')
+ auto inserted_ops = insertMessages(f.tracker());
+ std::vector<uint64_t> bucket_a_msgs_ids, bucket_b_msgs_ids;
+ for (auto& op : inserted_ops) {
+ if (op->getBucketId() == bucket_id_a) {
+ bucket_a_msgs_ids.emplace_back(op->getMsgId());
+ } else {
+ bucket_b_msgs_ids.emplace_back(op->getMsgId());
+ }
+ }
+
+ // Match all for bucket 'a'
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_a)), UnorderedElementsAreArray(bucket_a_msgs_ids));
+ // Match all for bucket 'b'
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_b)), UnorderedElementsAreArray(bucket_b_msgs_ids));
+ // Match no buckets
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(BucketId(16, 7890))), IsEmpty());
+
+ const auto default_space = document::FixedBucketSpaces::default_space();
+ const auto global_space = document::FixedBucketSpaces::global_space();
+
+ // Message to global bucket space (the former messages were all in the default space)
+ auto global_msg = std::make_shared<api::RequestBucketInfoCommand>(global_space, std::vector<BucketId>({bucket_id_a}));
+ global_msg->setAddress(makeStorageAddress(3));
+ f.tracker().insert(global_msg);
+
+ // Default space has the expected existing entries
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(default_space, bucket_id_a))),
+ UnorderedElementsAreArray(bucket_a_msgs_ids));
+ // Global space has only 1 entry
+ EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(global_space, bucket_id_a))),
+ ElementsAre(global_msg->getMsgId()));
+}
+
}