diff options
Diffstat (limited to 'storage/src/tests/distributor/distributortest.cpp')
-rw-r--r-- | storage/src/tests/distributor/distributortest.cpp | 60 |
1 files changed, 60 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 3aedd31f574..12a4118aa08 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -48,6 +48,8 @@ class Distributor_Test : public CppUnit::TestFixture, CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config); CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config); CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker); + CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order); + CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch); CPPUNIT_TEST_SUITE_END(); protected: @@ -77,6 +79,8 @@ protected: void sequencing_config_is_propagated_to_distributor_config(); void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config(); void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker(); + void external_client_requests_are_handled_individually_in_priority_order(); + void internal_messages_are_started_in_fifo_order_batch(); public: void setUp() override { @@ -867,6 +871,62 @@ void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_mess CPPUNIT_ASSERT(!node_info.isBusy(0)); } +void Distributor_Test::external_client_requests_are_handled_individually_in_priority_order() { + setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a"); + + std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 0}); + document::DocumentId id("id:foo:testdoctype1:n=1:foo"); + vespalib::stringref field_set = ""; + for (auto pri : priorities) { + auto cmd = std::make_shared<api::GetCommand>(document::BucketId(), id, field_set); + cmd->setPriority(pri); + // onDown appends to internal message FIFO queue, awaiting hand-off. + _distributor->onDown(cmd); + } + // At the hand-off point we expect client requests to be prioritized. + // For each tick, a priority-order client request is processed and sent off. + for (size_t i = 1; i <= priorities.size(); ++i) { + tickDistributorNTimes(1); + CPPUNIT_ASSERT_EQUAL(size_t(i), _sender.commands.size()); + } + + std::vector<int> expected({0, 10, 40, 50, 255}); + std::vector<int> actual; + for (auto& msg : _sender.commands) { + actual.emplace_back(static_cast<int>(msg->getPriority())); + } + CPPUNIT_ASSERT_EQUAL(expected, actual); +} + +void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() { + // To test internal request ordering, we use NotifyBucketChangeCommand + // for the reason that it explicitly updates the bucket database for + // each individual invocation. + setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + document::BucketId bucket(16, 1); + addNodesToBucketDB(bucket, "0=1/1/1/t"); + + std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 1}); + for (auto pri : priorities) { + api::BucketInfo fake_info(pri, pri, pri); + auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, fake_info); + cmd->setSourceIndex(0); + cmd->setPriority(pri); + _distributor->onDown(cmd); + } + + // Doing a single tick should process all internal requests in one batch + tickDistributorNTimes(1); + CPPUNIT_ASSERT_EQUAL(size_t(5), _sender.replies.size()); + + // The bucket info for priority 1 (last FIFO-order change command received, but + // highest priority) should be the end-state of the bucket database, _not_ that + // of lowest priority 255. + BucketDatabase::Entry e(getBucket(bucket)); + CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo()); +} + } } |