// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include using document::Bucket; using document::BucketId; using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using namespace ::testing; namespace storage::distributor { using namespace std::chrono_literals; struct PendingMessageTrackerTest : Test { std::vector> insertMessages(PendingMessageTracker& tracker); }; namespace { class RequestBuilder { uint16_t _toNode; std::chrono::milliseconds _atTime; public: RequestBuilder() noexcept : _toNode(0), _atTime() {} RequestBuilder& atTime(std::chrono::milliseconds t) { _atTime = t; return *this; } RequestBuilder& toNode(uint16_t node) { _toNode = node; return *this; } uint16_t toNode() const { return _toNode; } std::chrono::milliseconds atTime() const { return _atTime; } }; api::StorageMessageAddress makeStorageAddress(uint16_t node) { static vespalib::string _storage("storage"); return {&_storage, lib::NodeType::STORAGE, node}; } class Fixture { StorageComponentRegisterImpl _compReg; framework::defaultimplementation::FakeClock _clock; std::unique_ptr _tracker; document::TestDocMan _testDocMan; public: Fixture(); ~Fixture(); std::shared_ptr sendPut(const RequestBuilder& builder) { assignMockedTime(builder.atTime()); auto put = createPutToNode(builder.toNode()); _tracker->insert(put); return put; } void sendPutReply(api::PutCommand& putCmd, const RequestBuilder& builder, const api::ReturnCode& result = api::ReturnCode()) { assignMockedTime(builder.atTime()); auto putReply = putCmd.makeReply(); putReply->setResult(result); _tracker->reply(*putReply); } std::shared_ptr createPutToNode(uint16_t node) const { document::BucketId bucket(16, 1234); auto cmd = std::make_shared( makeDocumentBucket(bucket), createDummyDocumentForBucket(bucket), api::Timestamp(123456)); cmd->setAddress(makeStorageAddress(node)); return cmd; } std::shared_ptr create_get_to_node(uint16_t node) const { document::BucketId bucket(16, 1234); auto cmd = std::make_shared( makeDocumentBucket(bucket), document::DocumentId("id::testdoctype1:n=1234:foo"), "[all]"); cmd->setAddress(makeStorageAddress(node)); return cmd; } PendingMessageTracker& tracker() { return *_tracker; } auto& clock() { return _clock; } std::vector enumerate_msg_ids(const std::function& bucket_predicate) const { std::vector enumerated_ids; auto insert_enumerated_ids = [&](uint64_t msg_id) { enumerated_ids.emplace_back(msg_id); }; _tracker->enumerate_matching_pending_bucket_ops(bucket_predicate, insert_enumerated_ids); return enumerated_ids; } private: static std::string createDummyIdString(const document::BucketId& bucket) { std::ostringstream id; id << "id:foo:testdoctype1:n=" << bucket.getId() << ":foo"; return id.str(); } document::Document::SP createDummyDocumentForBucket(const document::BucketId& bucket) const { return _testDocMan.createDocument("foobar", createDummyIdString(bucket)); } std::shared_ptr createRemoveToNode(uint16_t node) const { document::BucketId bucket(16, 1234); auto cmd = std::make_shared( makeDocumentBucket(bucket), document::DocumentId(createDummyIdString(bucket)), api::Timestamp(123456)); cmd->setAddress(makeStorageAddress(node)); return cmd; } void assignMockedTime(std::chrono::milliseconds time) { _clock.setAbsoluteTimeInMicroSeconds(time.count() * 1000); } }; Fixture::Fixture() : _compReg(), _clock(), _tracker(), _testDocMan() { _compReg.setClock(_clock); _clock.setAbsoluteTimeInSeconds(1); // Have to set clock in compReg before constructing tracker, or it'll // flip out and die on an explicit nullptr check. _tracker = std::make_unique(_compReg, 0); } Fixture::~Fixture() = default; } // anonymous namespace TEST_F(PendingMessageTrackerTest, simple) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); std::ostringstream dummy; // Enable time tracking tracker.reportStatus(dummy, framework::HttpUrlPath("/pendingmessages?order=bucket")); auto remove = std::make_shared( makeDocumentBucket(document::BucketId(16, 1234)), document::DocumentId("id:footype:testdoc:n=1234:foo"), 1001); remove->setAddress(makeStorageAddress(0)); tracker.insert(remove); { std::ostringstream ost; tracker.reportStatus(ost, framework::HttpUrlPath("/pendingmessages?order=bucket")); EXPECT_THAT(ost.str(), HasSubstr( "Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))\n" "
    \n" "
  • Node 0: 1970-01-01 00:00:01.000 UTC " "Remove(BucketId(0x40000000000004d2), priority=127)
  • \n" "
\n")); } api::RemoveReply reply(*remove); tracker.reply(reply); { std::ostringstream ost; tracker.reportStatus(ost, framework::HttpUrlPath("/pendingmessages?order=bucket")); EXPECT_THAT(ost.str(), Not(HasSubstr("id:"))); } } std::vector> PendingMessageTrackerTest::insertMessages(PendingMessageTracker& tracker) { std::vector> inserted; for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; ost << "id:footype:testdoc:n=1234:" << i; auto remove = std::make_shared( makeDocumentBucket(document::BucketId(16, 1234)), document::DocumentId(ost.str()), 1000 + i); remove->setAddress(makeStorageAddress(i % 2)); tracker.insert(remove); inserted.emplace_back(std::move(remove)); } for (uint32_t i = 0; i < 4; i++) { std::ostringstream ost; ost << "id:footype:testdoc:n=4567:" << i; auto remove = std::make_shared(makeDocumentBucket(document::BucketId(16, 4567)), document::DocumentId(ost.str()), 2000 + i); remove->setAddress(makeStorageAddress(i % 2)); tracker.insert(remove); inserted.emplace_back(std::move(remove)); } return inserted; } TEST_F(PendingMessageTrackerTest, start_page) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); PendingMessageTracker tracker(compReg, 3); { std::ostringstream ost; tracker.reportStatus(ost, framework::HttpUrlPath("/pendingmessages3")); EXPECT_THAT(ost.str(), HasSubstr( "

Pending messages to storage nodes (stripe 3)

\n" "View:\n" "
    \n" "
  • Group by bucket
  • " "
  • Group by node
  • ")); } } TEST_F(PendingMessageTrackerTest, multiple_messages) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); std::ostringstream dummy; // Enable time tracking tracker.reportStatus(dummy, framework::HttpUrlPath("/pendingmessages?order=bucket")); insertMessages(tracker); { std::ostringstream ost; tracker.reportStatus(ost, framework::HttpUrlPath("/pendingmessages?order=bucket")); EXPECT_THAT(ost.str(), HasSubstr( "Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000004d2))\n" "
      \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    \n" "Bucket(BucketSpace(0x0000000000000001), BucketId(0x40000000000011d7))\n" "
      \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    \n")); } { std::ostringstream ost; tracker.reportStatus(ost, framework::HttpUrlPath("/pendingmessages?order=node")); EXPECT_THAT(ost.str(), HasSubstr( "Node 0 (pending count: 4)\n" "
      \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    • Node 0: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    \n" "Node 1 (pending count: 4)\n" "
      \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000004d2), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    • Node 1: 1970-01-01 00:00:01.000 UTC Remove(BucketId(0x40000000000011d7), priority=127)
    • \n" "
    \n")); } } namespace { class TestChecker : public PendingMessageTracker::Checker { public: uint8_t pri; TestChecker() noexcept : pri(UINT8_MAX) {} bool check(uint32_t msgType, [[maybe_unused]] uint16_t node, uint8_t p) override { if (msgType == api::MessageType::REMOVE_ID) { pri = p; return false; } return true; } }; } TEST_F(PendingMessageTrackerTest, get_pending_message_types) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); document::BucketId bid(16, 1234); auto remove = std::make_shared(makeDocumentBucket(bid), document::DocumentId("id:footype:testdoc:n=1234:foo"), 1001); remove->setAddress(makeStorageAddress(0)); tracker.insert(remove); { TestChecker checker; tracker.checkPendingMessages(0, makeDocumentBucket(bid), checker); EXPECT_EQ(127, static_cast(checker.pri)); } { TestChecker checker; tracker.checkPendingMessages(0, makeDocumentBucket(document::BucketId(16, 1235)), checker); EXPECT_EQ(255, static_cast(checker.pri)); } { TestChecker checker; tracker.checkPendingMessages(1, makeDocumentBucket(bid), checker); EXPECT_EQ(255, static_cast(checker.pri)); } } TEST_F(PendingMessageTrackerTest, has_pending_message) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); document::BucketId bid(16, 1234); EXPECT_FALSE(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); { auto remove = std::make_shared(makeDocumentBucket(bid), document::DocumentId("id:footype:testdoc:n=1234:foo"), 1001); remove->setAddress(makeStorageAddress(1)); tracker.insert(remove); } EXPECT_TRUE(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); EXPECT_FALSE(tracker.hasPendingMessage(0, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); EXPECT_FALSE(tracker.hasPendingMessage(2, makeDocumentBucket(bid), api::MessageType::REMOVE_ID)); EXPECT_FALSE(tracker.hasPendingMessage(1, makeDocumentBucket(document::BucketId(16, 1233)), api::MessageType::REMOVE_ID)); EXPECT_FALSE(tracker.hasPendingMessage(1, makeDocumentBucket(bid), api::MessageType::DELETEBUCKET_ID)); } namespace { class OperationEnumerator : public PendingMessageTracker::Checker { std::ostringstream ss; public: bool check(uint32_t msgType, uint16_t node, [[maybe_unused]] uint8_t p) override { ss << api::MessageType::get(static_cast(msgType)).getName() << " -> " << node << "\n"; return true; } std::string str() const { return ss.str(); } }; } // anon ns TEST_F(PendingMessageTrackerTest, get_all_messages_for_single_bucket) { StorageComponentRegisterImpl compReg; framework::defaultimplementation::FakeClock clock; compReg.setClock(clock); clock.setAbsoluteTimeInSeconds(1); PendingMessageTracker tracker(compReg, 0); insertMessages(tracker); { OperationEnumerator enumerator; tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator); EXPECT_EQ("Remove -> 0\n" "Remove -> 0\n" "Remove -> 1\n" "Remove -> 1\n", enumerator.str()); } { OperationEnumerator enumerator; tracker.checkPendingMessages(makeDocumentBucket(document::BucketId(16, 9876)), enumerator); EXPECT_EQ("", enumerator.str()); } } // TODO don't set busy for visitor replies? These will mark the node as busy today, // but have the same actual semantics as busy merges (i.e. "queue is full", not "node // is too busy to accept new requests in general"). TEST_F(PendingMessageTrackerTest, busy_reply_marks_node_as_busy) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); EXPECT_FALSE(f.tracker().getNodeInfo().isBusy(0)); f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0)); EXPECT_FALSE(f.tracker().getNodeInfo().isBusy(1)); } TEST_F(PendingMessageTrackerTest, busy_node_duration_can_be_adjusted) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); f.tracker().setNodeBusyDuration(10s); f.sendPutReply(*cmd, RequestBuilder(), api::ReturnCode(api::ReturnCode::BUSY)); EXPECT_TRUE(f.tracker().getNodeInfo().isBusy(0)); f.clock().addSecondsToTime(11); EXPECT_FALSE(f.tracker().getNodeInfo().isBusy(0)); } namespace { document::BucketId bucket_of(const document::DocumentId& id) { return document::BucketId(16, id.getGlobalId().convertToBucketId().getId()); } } TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_no_pending_write_ops) { Fixture f; auto cmd = f.createPutToNode(0); auto bucket_id = bucket_of(cmd->getDocumentId()); auto state = TaskRunState::Aborted; f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ state = s; })); EXPECT_EQ(state, TaskRunState::OK); } TEST_F(PendingMessageTrackerTest, start_deferred_task_immediately_if_only_pending_read_ops) { Fixture f; auto cmd = f.create_get_to_node(0); f.tracker().insert(cmd); auto bucket_id = bucket_of(cmd->getDocumentId()); auto state = TaskRunState::Aborted; f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ state = s; })); EXPECT_EQ(state, TaskRunState::OK); } TEST_F(PendingMessageTrackerTest, deferred_task_not_started_before_pending_ops_completed) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); auto bucket_id = bucket_of(cmd->getDocumentId()); auto state = TaskRunState::Aborted; f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ state = s; })); EXPECT_EQ(state, TaskRunState::Aborted); f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be run as part of this. EXPECT_EQ(state, TaskRunState::OK); } TEST_F(PendingMessageTrackerTest, deferred_task_can_be_started_with_pending_read_op) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); auto bucket_id = bucket_of(cmd->getDocumentId()); auto state = TaskRunState::Aborted; f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ state = s; })); EXPECT_EQ(state, TaskRunState::Aborted); f.tracker().insert(f.create_get_to_node(0)); // Concurrent Get and Put f.sendPutReply(*cmd, RequestBuilder()); // Deferred task should be allowed to run EXPECT_EQ(state, TaskRunState::OK); } TEST_F(PendingMessageTrackerTest, abort_invokes_deferred_tasks_with_aborted_status) { Fixture f; auto cmd = f.sendPut(RequestBuilder().toNode(0)); auto bucket_id = bucket_of(cmd->getDocumentId()); auto state = TaskRunState::OK; f.tracker().run_once_no_pending_for_bucket(makeDocumentBucket(bucket_id), make_deferred_task([&](TaskRunState s){ state = s; })); EXPECT_EQ(state, TaskRunState::OK); f.tracker().abort_deferred_tasks(); EXPECT_EQ(state, TaskRunState::Aborted); } TEST_F(PendingMessageTrackerTest, request_bucket_info_with_no_buckets_tracked_as_null_bucket) { Fixture f; auto msg = std::make_shared(makeBucketSpace(), 0, lib::ClusterState(), ""); msg->setAddress(makeStorageAddress(2)); f.tracker().insert(msg); // Tracked as null bucket { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator); EXPECT_EQ("Request bucket info -> 2\n", enumerator.str()); } // Nothing to a specific bucket { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 1234)), enumerator); EXPECT_EQ("", enumerator.str()); } auto reply = std::shared_ptr(msg->makeReply()); f.tracker().reply(*reply); // No longer tracked as null bucket { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator); EXPECT_EQ("", enumerator.str()); } } TEST_F(PendingMessageTrackerTest, request_bucket_info_with_bucket_tracked_with_superbucket) { Fixture f; document::BucketId bucket(16, 1234); auto msg = std::make_shared(makeBucketSpace(), std::vector({bucket})); msg->setAddress(makeStorageAddress(3)); f.tracker().insert(msg); // Not tracked as null bucket { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId()), enumerator); EXPECT_EQ("", enumerator.str()); } // Tracked for superbucket { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator); EXPECT_EQ("Request bucket info -> 3\n", enumerator.str()); } // Not tracked for other buckets { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(document::BucketId(16, 2345)), enumerator); EXPECT_EQ("", enumerator.str()); } auto reply = std::shared_ptr(msg->makeReply()); f.tracker().reply(*reply); // No longer tracked for specified bucket { OperationEnumerator enumerator; f.tracker().checkPendingMessages(makeDocumentBucket(bucket), enumerator); EXPECT_EQ("", enumerator.str()); } } namespace { auto bucket_id_eq_fn(BucketId matched_id) { return [matched_id](auto& bucket) noexcept { return bucket.getBucketId() == matched_id; }; } auto bucket_eq_fn(Bucket matched_bucket) { return [matched_bucket](auto& bucket) noexcept { return bucket == matched_bucket; }; } } TEST_F(PendingMessageTrackerTest, can_enumerate_all_message_ids_for_ops_to_matching_buckets) { Fixture f; BucketId bucket_id_a(16, 1234); BucketId bucket_id_b(16, 4567); // This inserts 4 ops for bucket {16, 1234} (bucket 'a') and 4 ops for {16, 4567} (bucket 'b') auto inserted_ops = insertMessages(f.tracker()); std::vector bucket_a_msgs_ids, bucket_b_msgs_ids; for (auto& op : inserted_ops) { if (op->getBucketId() == bucket_id_a) { bucket_a_msgs_ids.emplace_back(op->getMsgId()); } else { bucket_b_msgs_ids.emplace_back(op->getMsgId()); } } // Match all for bucket 'a' EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_a)), UnorderedElementsAreArray(bucket_a_msgs_ids)); // Match all for bucket 'b' EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(bucket_id_b)), UnorderedElementsAreArray(bucket_b_msgs_ids)); // Match no buckets EXPECT_THAT(f.enumerate_msg_ids(bucket_id_eq_fn(BucketId(16, 7890))), IsEmpty()); const auto default_space = document::FixedBucketSpaces::default_space(); const auto global_space = document::FixedBucketSpaces::global_space(); // Message to global bucket space (the former messages were all in the default space) auto global_msg = std::make_shared(global_space, std::vector({bucket_id_a})); global_msg->setAddress(makeStorageAddress(3)); f.tracker().insert(global_msg); // Default space has the expected existing entries EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(default_space, bucket_id_a))), UnorderedElementsAreArray(bucket_a_msgs_ids)); // Global space has only 1 entry EXPECT_THAT(f.enumerate_msg_ids(bucket_eq_fn(Bucket(global_space, bucket_id_a))), ElementsAre(global_msg->getMsgId())); } }