summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/distributortest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/distributortest.cpp')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp60
1 files changed, 60 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 3aedd31f574..12a4118aa08 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -48,6 +48,8 @@ class Distributor_Test : public CppUnit::TestFixture,
CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config);
CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker);
+ CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order);
+ CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch);
CPPUNIT_TEST_SUITE_END();
protected:
@@ -77,6 +79,8 @@ protected:
void sequencing_config_is_propagated_to_distributor_config();
void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config();
void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker();
+ void external_client_requests_are_handled_individually_in_priority_order();
+ void internal_messages_are_started_in_fifo_order_batch();
public:
void setUp() override {
@@ -867,6 +871,62 @@ void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_mess
CPPUNIT_ASSERT(!node_info.isBusy(0));
}
+void Distributor_Test::external_client_requests_are_handled_individually_in_priority_order() {
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+ addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a");
+
+ std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 0});
+ document::DocumentId id("id:foo:testdoctype1:n=1:foo");
+ vespalib::stringref field_set = "";
+ for (auto pri : priorities) {
+ auto cmd = std::make_shared<api::GetCommand>(document::BucketId(), id, field_set);
+ cmd->setPriority(pri);
+ // onDown appends to internal message FIFO queue, awaiting hand-off.
+ _distributor->onDown(cmd);
+ }
+ // At the hand-off point we expect client requests to be prioritized.
+ // For each tick, a priority-order client request is processed and sent off.
+ for (size_t i = 1; i <= priorities.size(); ++i) {
+ tickDistributorNTimes(1);
+ CPPUNIT_ASSERT_EQUAL(size_t(i), _sender.commands.size());
+ }
+
+ std::vector<int> expected({0, 10, 40, 50, 255});
+ std::vector<int> actual;
+ for (auto& msg : _sender.commands) {
+ actual.emplace_back(static_cast<int>(msg->getPriority()));
+ }
+ CPPUNIT_ASSERT_EQUAL(expected, actual);
+}
+
+void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() {
+ // To test internal request ordering, we use NotifyBucketChangeCommand
+ // for the reason that it explicitly updates the bucket database for
+ // each individual invocation.
+ setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1");
+ document::BucketId bucket(16, 1);
+ addNodesToBucketDB(bucket, "0=1/1/1/t");
+
+ std::vector<api::StorageMessage::Priority> priorities({50, 255, 10, 40, 1});
+ for (auto pri : priorities) {
+ api::BucketInfo fake_info(pri, pri, pri);
+ auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(bucket, fake_info);
+ cmd->setSourceIndex(0);
+ cmd->setPriority(pri);
+ _distributor->onDown(cmd);
+ }
+
+ // Doing a single tick should process all internal requests in one batch
+ tickDistributorNTimes(1);
+ CPPUNIT_ASSERT_EQUAL(size_t(5), _sender.replies.size());
+
+ // The bucket info for priority 1 (last FIFO-order change command received, but
+ // highest priority) should be the end-state of the bucket database, _not_ that
+ // of lowest priority 255.
+ BucketDatabase::Entry e(getBucket(bucket));
+ CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo());
+}
+
}
}