summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@oath.com>2018-01-30 12:06:39 +0000
committerTor Brede Vekterli <vekterli@oath.com>2018-01-30 12:06:39 +0000
commit6a1d08777d0ce8fd4ee7baed9fa73199c9ccb5ca (patch)
tree4914d43c2a1a7f73eb47029a78a4ddc234dbe752
parentdc0a442fdf1d775657d78c5d338606ad1249dd08 (diff)
Use FIFO for incoming messages instead of a priority queue
Priority ordering in the CommunicationManager inherently runs the risk of reordering differently prioritized client operations before they reach the "timestamps are strictly increasing" check further down the message chain. We already have priority queues for the persistence and visitor processing threads, so not likely to be any real benefit in having one in the communication manager as well. This should address the edge case where differently prioritized feed may cause transient out of sync issues due to timestamp-bounced operations.
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp11
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.cpp36
-rw-r--r--storage/src/vespa/storage/storageserver/communicationmanager.h51
3 files changed, 20 insertions, 78 deletions
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index e7e08f28ce3..aa20f32450d 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -23,7 +23,7 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
void testSimple();
void testDistPendingLimitConfigsArePropagatedToMessageBus();
void testStorPendingLimitConfigsArePropagatedToMessageBus();
- void testCommandsAreDequeuedInPriorityOrder();
+ void testCommandsAreDequeuedInFifoOrder();
void testRepliesAreDequeuedInFifoOrder();
void bucket_space_config_can_be_updated_live();
void unmapped_bucket_space_documentapi_request_returns_error_reply();
@@ -47,7 +47,7 @@ struct CommunicationManagerTest : public CppUnit::TestFixture {
CPPUNIT_TEST(testSimple);
CPPUNIT_TEST(testDistPendingLimitConfigsArePropagatedToMessageBus);
CPPUNIT_TEST(testStorPendingLimitConfigsArePropagatedToMessageBus);
- CPPUNIT_TEST(testCommandsAreDequeuedInPriorityOrder);
+ CPPUNIT_TEST(testCommandsAreDequeuedInFifoOrder);
CPPUNIT_TEST(testRepliesAreDequeuedInFifoOrder);
CPPUNIT_TEST(bucket_space_config_can_be_updated_live);
CPPUNIT_TEST(unmapped_bucket_space_documentapi_request_returns_error_reply);
@@ -175,7 +175,7 @@ CommunicationManagerTest::testStorPendingLimitConfigsArePropagatedToMessageBus()
}
void
-CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
+CommunicationManagerTest::testCommandsAreDequeuedInFifoOrder()
{
mbus::Slobrok slobrok;
vdstestlib::DirConfig storConfig(getStandardConfig(true));
@@ -190,8 +190,8 @@ CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
// Message dequeing does not start before we invoke `open` on the storage
// link chain, so we enqueue messages in randomized priority order before
- // doing so. After starting the thread, we should then get messages down
- // the chain in a deterministic, prioritized order.
+ // doing so. After starting the thread, we should get messages down
+ // the chain in a deterministic FIFO order and _not_ priority-order.
// Lower number == higher priority.
std::vector<api::StorageMessage::Priority> pris{200, 0, 255, 128};
for (auto pri : pris) {
@@ -200,7 +200,6 @@ CommunicationManagerTest::testCommandsAreDequeuedInPriorityOrder()
storage.open();
storageLink->waitForMessages(pris.size(), MESSAGE_WAIT_TIME_SEC);
- std::sort(pris.begin(), pris.end());
for (size_t i = 0; i < pris.size(); ++i) {
// Casting is just to avoid getting mismatched values printed to the
// output verbatim as chars.
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
index 4cf4839319f..b61717e5b67 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp
@@ -26,23 +26,16 @@ using vespalib::make_string;
namespace storage {
-PriorityQueue::PriorityQueue() :
- _queue(),
- _queueMonitor(),
- _msgCounter(0)
-{ }
-
-PriorityQueue::~PriorityQueue()
-{ }
+Queue::Queue() = default;
+Queue::~Queue() = default;
-bool PriorityQueue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout)
-{
+bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) {
vespalib::MonitorGuard sync(_queueMonitor);
bool first = true;
while (true) { // Max twice
if (!_queue.empty()) {
LOG(spam, "Picking message from queue");
- msg = _queue.top().second;
+ msg = _queue.front();
_queue.pop();
return true;
}
@@ -56,31 +49,18 @@ bool PriorityQueue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeo
return false;
}
-void
-PriorityQueue::enqueue(const std::shared_ptr<api::StorageMessage>& msg)
-{
+void Queue::enqueue(const std::shared_ptr<api::StorageMessage>& msg) {
vespalib::MonitorGuard sync(_queueMonitor);
- const uint8_t priority(msg->getType().isReply()
- ? FIXED_REPLY_PRIORITY
- : msg->getPriority());
- Key key(priority, _msgCounter);
- // We make a simplifying--though reasonable--assumption that we'll never
- // process more than UINT64_MAX replies before process restart.
- ++_msgCounter;
- _queue.push(std::make_pair(key, msg));
+ _queue.emplace(msg);
sync.unsafeSignalUnlock();
}
-void
-PriorityQueue::signal()
-{
+void Queue::signal() {
vespalib::MonitorGuard sync(_queueMonitor);
sync.unsafeSignalUnlock();
}
-int
-PriorityQueue::size()
-{
+size_t Queue::size() const {
vespalib::MonitorGuard sync(_queueMonitor);
return _queue.size();
}
diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.h b/storage/src/vespa/storage/storageserver/communicationmanager.h
index e2c890bfc9b..ff41e59846a 100644
--- a/storage/src/vespa/storage/storageserver/communicationmanager.h
+++ b/storage/src/vespa/storage/storageserver/communicationmanager.h
@@ -43,49 +43,15 @@ class VisitorThread;
class FNetListener;
class RPCRequestWrapper;
-class PriorityQueue {
+class Queue {
private:
- struct Key {
- uint8_t priority {255};
- uint64_t seqNum {0};
-
- Key(uint8_t pri, uint64_t seq)
- : priority(pri), seqNum(seq)
- {
- }
- };
- using ValueType = std::pair<Key, api::StorageMessage::SP>;
-
- struct PriorityThenFifoCmp {
- bool operator()(const ValueType& lhs,
- const ValueType& rhs) const noexcept
- {
- // priority_queue has largest element on top, so reverse order
- // since our semantics have 0 as the highest priority.
- if (lhs.first.priority != rhs.first.priority) {
- return (lhs.first.priority > rhs.first.priority);
- }
- return (lhs.first.seqNum > rhs.first.seqNum);
- }
- };
-
- using QueueType = std::priority_queue<
- ValueType,
- std::vector<ValueType>,
- PriorityThenFifoCmp>;
-
- // Sneakily chosen priority such that effectively only RPC commands are
- // allowed in front of replies. Replies must have the same effective
- // priority or they will get reordered and all hell breaks loose.
- static constexpr uint8_t FIXED_REPLY_PRIORITY = 1;
-
+ using QueueType = std::queue<api::StorageMessage::SP>;
QueueType _queue;
vespalib::Monitor _queueMonitor;
- uint64_t _msgCounter;
public:
- PriorityQueue();
- virtual ~PriorityQueue();
+ Queue();
+ ~Queue();
/**
* Returns the next event from the event queue
@@ -97,17 +63,14 @@ public:
bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout);
/**
- * If `msg` is a StorageCommand, enqueues it using the priority stored in
- * the command. If it's a reply, enqueues it using a fixed but very high
- * priority that ensure replies are processed before commands but also
- * ensures that replies are FIFO-ordered relative to each other.
+ * Enqueue msg in FIFO order.
*/
void enqueue(const std::shared_ptr<api::StorageMessage>& msg);
/** Signal queue monitor. */
void signal();
- int size();
+ size_t size() const;
};
class StorageTransportContext : public api::TransportContext {
@@ -137,7 +100,7 @@ private:
CommunicationManagerMetrics _metrics;
std::unique_ptr<FNetListener> _listener;
- PriorityQueue _eventQueue;
+ Queue _eventQueue;
// XXX: Should perhaps use a configsubscriber and poll from StorageComponent ?
std::unique_ptr<config::ConfigFetcher> _configFetcher;
using EarlierProtocol = std::pair<framework::SecondTime, mbus::IProtocol::SP>;