summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2018-01-30 13:27:50 +0100
committerGitHub <noreply@github.com>2018-01-30 13:27:50 +0100
commit3c0eb51abccc00f6710d35c7768db8b23c8b5840 (patch)
treef9c5f07ac29df667f47a73aa84f9118cfa06df1c
parent08b55bd6f23e3dccf896febf8de24db273f2b0e8 (diff)
parent6a1d08777d0ce8fd4ee7baed9fa73199c9ccb5ca (diff)
Merge pull request #4813 from vespa-engine/vekterli/replace-communication-manager-priority-queue-with-fifo
Use FIFO for incoming messages instead of a priority queue
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp11
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp36
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h51
3 files changed, 20 insertions, 78 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index e7e08f28ce3..aa20f32450d 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -23,7 +23,7 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
void testSimple();
void testDistPendingLimitConfigsArePropagatedToMessageBus();
void testStorPendingLimitConfigsArePropagatedToMessageBus();
- void testCommandsAreDequeuedInPriorityOrder();
+ void testCommandsAreDequeuedInFifoOrder();
void testRepliesAreDequeuedInFifoOrder();
void bucket_space_config_can_be_updated_live();
void unmapped_bucket_space_documentapi_request_returns_error_reply();
@@ -47,7 +47,7 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testSimple);
CPPUNIT_TEST(testDistPendingLimitConfigsArePropagatedToMessageBus);
CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus);
- CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder);
+ CPPUNIT_TEST(testCommandsAreDequeuedInFifoOrder);
CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder);
CPPUNIT_TEST(bucket_space_config_can_be_updated_live);
CPPUNIT_TEST(unmapped_bucket_space_documentapi_request_returns_error_reply);
@@ -175,7 +175,7 @@ CommunicationManagerTest::testStorPendingLimitConfigsArePropagatedToMessageBus()
}
void
-CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
+CommunicationManagerTest::testCommandsAreDequeuedInFifoOrder()
{
mbus::Slobrok slobrok;
vdstestlib::DirConfig storConfig(getStandardConfig(true));
@@ -190,8 +190,8 @@ CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
// Message dequeing does not start before we invoke `open` on the storage
// link chain, so we enqueue messages in randomized priority order before
- // doing so. After starting the thread, we should then get messages down
- // the chain in a deterministic, prioritized order.
+ // doing so. After starting the thread, we should get messages down
+ // the chain in a deterministic FIFO order and _not_ priority-order.
// Lower number == higher priority.
std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
for (auto pri : pris) {
@@ -200,7 +200,6 @@ CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
- std::sort(pris.begin(), pris.end());
for (size_t i = 0; i < pris.size(); ++i) {
// Casting is just to avoid getting mismatched values printed to the
// output verbatim as chars.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 4cf4839319f..b61717e5b67 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -26,23 +26,16 @@ using vespalib::make_string;
namespace storage {
-PriorityQueue::PriorityQueue() :
- _queue(),
- _queueMonitor(),
- _msgCounter(0)
-{ }
-
-PriorityQueue::~PriorityQueue()
-{ }
+Queue::Queue() = default;
+Queue::~Queue() = default;
-bool PriorityQueue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout)
-{
+bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) {
vespalib::MonitorGuard sync(_queueMonitor);
bool first = true;
while (true) { // Max twice
if (!_queue.empty()) {
LOG(spam, "Picking message from queue");
- msg = _queue.top().second;
+ msg = _queue.front();
_queue.pop();
return true;
}
@@ -56,31 +49,18 @@ bool PriorityQueue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeo
return false;
}
-void
-PriorityQueue::enqueue(const std::shared_ptr<api::StorageMessage>& msg)
-{
+void Queue::enqueue(const std::shared_ptr<api::StorageMessage>& msg) {
vespalib::MonitorGuard sync(_queueMonitor);
- const uint8_t priority(msg->getType().isReply()
- ? FIXED_REPLY_PRIORITY
- : msg->getPriority());
- Key key(priority, _msgCounter);
- // We make a simplifying--though reasonable--assumption that we'll never
- // process more than UINT64_MAX replies before process restart.
- ++_msgCounter;
- _queue.push(std::make_pair(key, msg));
+ _queue.emplace(msg);
sync.unsafeSignalUnlock();
}
-void
-PriorityQueue::signal()
-{
+void Queue::signal() {
vespalib::MonitorGuard sync(_queueMonitor);
sync.unsafeSignalUnlock();
}
-int
-PriorityQueue::size()
-{
+size_t Queue::size() const {
vespalib::MonitorGuard sync(_queueMonitor);
return _queue.size();
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index e2c890bfc9b..ff41e59846a 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -43,49 +43,15 @@ class VisitorThread;
class FNetListener;
class RPCRequestWrapper;
-class PriorityQueue {
+class Queue {
private:
- struct Key {
- uint8_t priority {255};
- uint64_t seqNum {0};
-
- Key(uint8_t pri, uint64_t seq)
- : priority(pri), seqNum(seq)
- {
- }
- };
- using ValueType = std::pair<Key, api::StorageMessage::SP>;
-
- struct PriorityThenFifoCmp {
- bool operator()(const ValueType& lhs,
- const ValueType& rhs) const noexcept
- {
- // priority_queue has largest element on top, so reverse order
- // since our semantics have 0 as the highest priority.
- if (lhs.first.priority != rhs.first.priority) {
- return (lhs.first.priority > rhs.first.priority);
- }
- return (lhs.first.seqNum > rhs.first.seqNum);
- }
- };
-
- using QueueType = std::priority_queue<
- ValueType,
- std::vector<ValueType>,
- PriorityThenFifoCmp>;
-
- // Sneakily chosen priority such that effectively only RPC commands are
- // allowed in front of replies. Replies must have the same effective
- // priority or they will get reordered and all hell breaks loose.
- static constexpr uint8_t FIXED_REPLY_PRIORITY = 1;
-
+ using QueueType = std::queue<api::StorageMessage::SP>;
QueueType _queue;
vespalib::Monitor _queueMonitor;
- uint64_t _msgCounter;
public:
- PriorityQueue();
- virtual ~PriorityQueue();
+ Queue();
+ ~Queue();
/**
* Returns the next event from the event queue
@@ -97,17 +63,14 @@ public:
bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout);
/**
- * If `msg` is a StorageCommand, enqueues it using the priority stored in
- * the command. If it's a reply, enqueues it using a fixed but very high
- * priority that ensure replies are processed before commands but also
- * ensures that replies are FIFO-ordered relative to each other.
+ * Enqueue msg in FIFO order.
*/
void enqueue(const std::shared_ptr<api::StorageMessage>& msg);
/** Signal queue monitor. */
void signal();
- int size();
+ size_t size() const;
};
class StorageTransportContext : public api::TransportContext {
@@ -137,7 +100,7 @@ private:
CommunicationManagerMetrics _metrics;
std::unique_ptr<FNetListener> _listener;
- PriorityQueue _eventQueue;
+ Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>;