diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2018-01-30 13:27:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-01-30 13:27:50 +0100 |
commit | 3c0eb51abccc00f6710d35c7768db8b23c8b5840 (patch) | |
tree | f9c5f07ac29df667f47a73aa84f9118cfa06df1c | |
parent | 08b55bd6f23e3dccf896febf8de24db273f2b0e8 (diff) | |
parent | 6a1d08777d0ce8fd4ee7baed9fa73199c9ccb5ca (diff) |
Merge pull request #4813 from vespa-engine/vekterli/replace-communication-manager-priority-queue-with-fifo
Use FIFO for incoming messages instead of a priority queue
3 files changed, 20 insertions, 78 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index e7e08f28ce3..aa20f32450d 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -23,7 +23,7 @@ struct CommunicationManagerTest : public CppUnit::TestFixture { void testSimple(); void testDistPendingLimitConfigsArePropagatedToMessageBus(); void testStorPendingLimitConfigsArePropagatedToMessageBus(); - void testCommandsAreDequeuedInPriorityOrder(); + void testCommandsAreDequeuedInFifoOrder(); void testRepliesAreDequeuedInFifoOrder(); void bucket_space_config_can_be_updated_live(); void unmapped_bucket_space_documentapi_request_returns_error_reply(); @@ -47,7 +47,7 @@ struct CommunicationManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(testSimple); CPPUNIT_TEST(testDistPendingLimitConfigsArePropagatedToMessageBus); CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus); - CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder); + CPPUNIT_TEST(testCommandsAreDequeuedInFifoOrder); CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder); CPPUNIT_TEST(bucket_space_config_can_be_updated_live); CPPUNIT_TEST(unmapped_bucket_space_documentapi_request_returns_error_reply); @@ -175,7 +175,7 @@ CommunicationManagerTest::testStorPendingLimitConfigsArePropagatedToMessageBus() } void -CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder() +CommunicationManagerTest::testCommandsAreDequeuedInFifoOrder() { mbus::Slobrok slobrok; vdstestlib::DirConfig storConfig(getStandardConfig(true)); @@ -190,8 +190,8 @@ CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder() // Message dequeing does not start before we invoke `open` on the storage // link chain, so we enqueue messages in randomized priority order before - // doing so. After starting the thread, we should then get messages down - // the chain in a deterministic, prioritized order. + // doing so. After starting the thread, we should get messages down + // the chain in a deterministic FIFO order and _not_ priority-order. // Lower number == higher priority. std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128}; for (auto pri : pris) { @@ -200,7 +200,6 @@ CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder() storage.open(); storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC); - std::sort(pris.begin(), pris.end()); for (size_t i = 0; i < pris.size(); ++i) { // Casting is just to avoid getting mismatched values printed to the // output verbatim as chars. diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 4cf4839319f..b61717e5b67 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -26,23 +26,16 @@ using vespalib::make_string; namespace storage { -PriorityQueue::PriorityQueue() : - _queue(), - _queueMonitor(), - _msgCounter(0) -{ } - -PriorityQueue::~PriorityQueue() -{ } +Queue::Queue() = default; +Queue::~Queue() = default; -bool PriorityQueue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) -{ +bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { vespalib::MonitorGuard sync(_queueMonitor); bool first = true; while (true) { // Max twice if (!_queue.empty()) { LOG(spam, "Picking message from queue"); - msg = _queue.top().second; + msg = _queue.front(); _queue.pop(); return true; } @@ -56,31 +49,18 @@ bool PriorityQueue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeo return false; } -void -PriorityQueue::enqueue(const std::shared_ptr<api::StorageMessage>& msg) -{ +void Queue::enqueue(const std::shared_ptr<api::StorageMessage>& msg) { vespalib::MonitorGuard sync(_queueMonitor); - const uint8_t priority(msg->getType().isReply() - ? FIXED_REPLY_PRIORITY - : msg->getPriority()); - Key key(priority, _msgCounter); - // We make a simplifying--though reasonable--assumption that we'll never - // process more than UINT64_MAX replies before process restart. - ++_msgCounter; - _queue.push(std::make_pair(key, msg)); + _queue.emplace(msg); sync.unsafeSignalUnlock(); } -void -PriorityQueue::signal() -{ +void Queue::signal() { vespalib::MonitorGuard sync(_queueMonitor); sync.unsafeSignalUnlock(); } -int -PriorityQueue::size() -{ +size_t Queue::size() const { vespalib::MonitorGuard sync(_queueMonitor); return _queue.size(); } diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h index e2c890bfc9b..ff41e59846a 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.h +++ b/storage/src/vespa/storage/storageserver/communicationmanager.h @@ -43,49 +43,15 @@ class VisitorThread; class FNetListener; class RPCRequestWrapper; -class PriorityQueue { +class Queue { private: - struct Key { - uint8_t priority {255}; - uint64_t seqNum {0}; - - Key(uint8_t pri, uint64_t seq) - : priority(pri), seqNum(seq) - { - } - }; - using ValueType = std::pair<Key, api::StorageMessage::SP>; - - struct PriorityThenFifoCmp { - bool operator()(const ValueType& lhs, - const ValueType& rhs) const noexcept - { - // priority_queue has largest element on top, so reverse order - // since our semantics have 0 as the highest priority. - if (lhs.first.priority != rhs.first.priority) { - return (lhs.first.priority > rhs.first.priority); - } - return (lhs.first.seqNum > rhs.first.seqNum); - } - }; - - using QueueType = std::priority_queue< - ValueType, - std::vector<ValueType>, - PriorityThenFifoCmp>; - - // Sneakily chosen priority such that effectively only RPC commands are - // allowed in front of replies. Replies must have the same effective - // priority or they will get reordered and all hell breaks loose. - static constexpr uint8_t FIXED_REPLY_PRIORITY = 1; - + using QueueType = std::queue<api::StorageMessage::SP>; QueueType _queue; vespalib::Monitor _queueMonitor; - uint64_t _msgCounter; public: - PriorityQueue(); - virtual ~PriorityQueue(); + Queue(); + ~Queue(); /** * Returns the next event from the event queue @@ -97,17 +63,14 @@ public: bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout); /** - * If `msg` is a StorageCommand, enqueues it using the priority stored in - * the command. If it's a reply, enqueues it using a fixed but very high - * priority that ensure replies are processed before commands but also - * ensures that replies are FIFO-ordered relative to each other. + * Enqueue msg in FIFO order. */ void enqueue(const std::shared_ptr<api::StorageMessage>& msg); /** Signal queue monitor. */ void signal(); - int size(); + size_t size() const; }; class StorageTransportContext : public api::TransportContext { @@ -137,7 +100,7 @@ private: CommunicationManagerMetrics _metrics; std::unique_ptr<FNetListener> _listener; - PriorityQueue _eventQueue; + Queue _eventQueue; // XXX: Should perhaps use a configsubscriber and poll from StorageComponent ? std::unique_ptr<config::ConfigFetcher> _configFetcher; using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>; |