From d8b880a01c106f84270e91ef2e16d0639d817bd9 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 12 Oct 2017 14:27:07 +0000 Subject: Use priority queue for external client operations Internal operations and replies and handled in FIFO order as before. --- storage/src/tests/distributor/distributortest.cpp | 60 ++++++++++++++++++++ .../src/vespa/storage/distributor/distributor.cpp | 64 ++++++++++++++++++---- .../src/vespa/storage/distributor/distributor.h | 16 +++++- 3 files changed, 128 insertions(+), 12 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 3aedd31f574..12a4118aa08 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -48,6 +48,8 @@ class Distributor_Test : public CppUnit::TestFixture, CPPUNIT_TEST(sequencing_config_is_propagated_to_distributor_config); CPPUNIT_TEST(merge_busy_inhibit_duration_config_is_propagated_to_distributor_config); CPPUNIT_TEST(merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker); + CPPUNIT_TEST(external_client_requests_are_handled_individually_in_priority_order); + CPPUNIT_TEST(internal_messages_are_started_in_fifo_order_batch); CPPUNIT_TEST_SUITE_END(); protected: @@ -77,6 +79,8 @@ protected: void sequencing_config_is_propagated_to_distributor_config(); void merge_busy_inhibit_duration_config_is_propagated_to_distributor_config(); void merge_busy_inhibit_duration_is_propagated_to_pending_message_tracker(); + void external_client_requests_are_handled_individually_in_priority_order(); + void internal_messages_are_started_in_fifo_order_batch(); public: void setUp() override { @@ -867,6 +871,62 @@ void Distributor_Test::merge_busy_inhibit_duration_is_propagated_to_pending_mess CPPUNIT_ASSERT(!node_info.isBusy(0)); } +void Distributor_Test::external_client_requests_are_handled_individually_in_priority_order() { + setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + addNodesToBucketDB(document::BucketId(16, 1), "0=1/1/1/t/a"); + + std::vector priorities({50, 255, 10, 40, 0}); + document::DocumentId id("id:foo:testdoctype1:n=1:foo"); + vespalib::stringref field_set = ""; + for (auto pri : priorities) { + auto cmd = std::make_shared(document::BucketId(), id, field_set); + cmd->setPriority(pri); + // onDown appends to internal message FIFO queue, awaiting hand-off. + _distributor->onDown(cmd); + } + // At the hand-off point we expect client requests to be prioritized. + // For each tick, a priority-order client request is processed and sent off. + for (size_t i = 1; i <= priorities.size(); ++i) { + tickDistributorNTimes(1); + CPPUNIT_ASSERT_EQUAL(size_t(i), _sender.commands.size()); + } + + std::vector expected({0, 10, 40, 50, 255}); + std::vector actual; + for (auto& msg : _sender.commands) { + actual.emplace_back(static_cast(msg->getPriority())); + } + CPPUNIT_ASSERT_EQUAL(expected, actual); +} + +void Distributor_Test::internal_messages_are_started_in_fifo_order_batch() { + // To test internal request ordering, we use NotifyBucketChangeCommand + // for the reason that it explicitly updates the bucket database for + // each individual invocation. + setupDistributor(Redundancy(1), NodeCount(1), "storage:1 distributor:1"); + document::BucketId bucket(16, 1); + addNodesToBucketDB(bucket, "0=1/1/1/t"); + + std::vector priorities({50, 255, 10, 40, 1}); + for (auto pri : priorities) { + api::BucketInfo fake_info(pri, pri, pri); + auto cmd = std::make_shared(bucket, fake_info); + cmd->setSourceIndex(0); + cmd->setPriority(pri); + _distributor->onDown(cmd); + } + + // Doing a single tick should process all internal requests in one batch + tickDistributorNTimes(1); + CPPUNIT_ASSERT_EQUAL(size_t(5), _sender.replies.size()); + + // The bucket info for priority 1 (last FIFO-order change command received, but + // highest priority) should be the end-state of the bucket database, _not_ that + // of lowest priority 255. + BucketDatabase::Entry e(getBucket(bucket)); + CPPUNIT_ASSERT_EQUAL(api::BucketInfo(1, 1, 1), e.getBucketInfo().getNode(0)->getBucketInfo()); +} + } } diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 0dc52650131..53df19fd10c 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -572,19 +572,61 @@ Distributor::workWasDone() return !_tickResult.waitWanted(); } -void -Distributor::startExternalOperations() -{ - for (uint32_t i=0; i<_fetchedMessages.size(); ++i) { - MBUS_TRACE(_fetchedMessages[i]->getTrace(), 9, - "Distributor: Grabbed from queue to be processed."); - if (!handleMessage(_fetchedMessages[i])) { - MBUS_TRACE(_fetchedMessages[i]->getTrace(), 9, - "Distributor: Not handling it. Sending further down."); - sendDown(_fetchedMessages[i]); +namespace { + +bool is_client_request(const api::StorageMessage& msg) noexcept { + // Despite having been converted to StorageAPI messages, the following + // set of messages are never sent to the distributor by other processes + // than clients. + switch (msg.getType().getId()) { + case api::MessageType::GET_ID: + case api::MessageType::PUT_ID: + case api::MessageType::REMOVE_ID: + case api::MessageType::VISITOR_CREATE_ID: + case api::MessageType::VISITOR_DESTROY_ID: + case api::MessageType::MULTIOPERATION_ID: // Deprecated + case api::MessageType::GETBUCKETLIST_ID: + case api::MessageType::STATBUCKET_ID: + case api::MessageType::UPDATE_ID: + case api::MessageType::REMOVELOCATION_ID: + case api::MessageType::BATCHPUTREMOVE_ID: // Deprecated + case api::MessageType::BATCHDOCUMENTUPDATE_ID: // Deprecated + return true; + default: + return false; + } +} + +} + +void Distributor::handle_or_propagate_message(const std::shared_ptr& msg) { + if (!handleMessage(msg)) { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Not handling it. Sending further down."); + sendDown(msg); + } +} + +void Distributor::startExternalOperations() { + for (auto& msg : _fetchedMessages) { + if (is_client_request(*msg)) { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: adding to client request priority queue"); + _client_request_priority_queue.emplace(std::move(msg)); + } else { + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from queue to be processed."); + handle_or_propagate_message(msg); } } - if (!_fetchedMessages.empty()) { + + const bool start_single_client_request = !_client_request_priority_queue.empty(); + if (start_single_client_request) { + auto& msg = _client_request_priority_queue.top(); + MBUS_TRACE(msg->getTrace(), 9, "Distributor: Grabbed from " + "client request priority queue to be processed."); + handle_or_propagate_message(msg); // TODO move() once we've move-enabled our message chains + _client_request_priority_queue.pop(); + } + + if (!_fetchedMessages.empty() || start_single_client_request) { signalWorkWasDone(); } _fetchedMessages.clear(); diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index caf3a13d113..438001acc40 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -23,6 +23,7 @@ #include #include #include +#include namespace storage { @@ -191,6 +192,7 @@ private: bool isMaintenanceReply(const api::StorageReply& reply) const; void handleStatusRequests(); + void handle_or_propagate_message(const std::shared_ptr& msg); void startExternalOperations(); /** @@ -252,8 +254,20 @@ private: mutable std::shared_ptr _distribution; std::shared_ptr _nextDistribution; - typedef std::vector > MessageQueue; + using MessageQueue = std::vector>; + struct IndirectHigherPriority { + template + bool operator()(const Lhs& lhs, const Rhs& rhs) const noexcept { + return lhs->getPriority() > rhs->getPriority(); + } + }; + using ClientRequestPriorityQueue = std::priority_queue< + std::shared_ptr, + std::vector>, + IndirectHigherPriority + >; MessageQueue _messageQueue; + ClientRequestPriorityQueue _client_request_priority_queue; MessageQueue _fetchedMessages; framework::TickingThreadPool& _threadPool; vespalib::Monitor _statusMonitor; -- cgit v1.2.3