From 61131edef7ccaeb39c7aa6b09f8ce1ebbd8b92ad Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 4 May 2022 13:40:33 +0000 Subject: Expose a relaxed atomic view of visitor queue size and use it for metric callback Avoids having to take a mutex that may potentially trigger lock order inversion --- storage/src/tests/visiting/commandqueuetest.cpp | 11 +++++++++++ storage/src/vespa/storage/visiting/commandqueue.h | 23 +++++++++++++++++++--- .../src/vespa/storage/visiting/visitormanager.cpp | 2 +- 3 files changed, 32 insertions(+), 4 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index 1e24fafed4f..97331774644 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -51,6 +51,8 @@ TEST(CommandQueueTest, fifo) { queue.add(getCommand("seventh", 7ms)); ASSERT_FALSE(queue.empty()); + EXPECT_EQ(7, queue.size()); + EXPECT_EQ(7, queue.relaxed_atomic_size()); std::vector> commands; for (;;) { auto cmd = queue.releaseNextCommand().first; @@ -60,6 +62,8 @@ TEST(CommandQueueTest, fifo) { commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); + EXPECT_EQ(0, queue.size()); + EXPECT_EQ(0, queue.relaxed_atomic_size()); EXPECT_EQ("first t=1 p=0", getCommandString(commands[0])); EXPECT_EQ("second t=10 p=0", getCommandString(commands[1])); EXPECT_EQ("third t=5 p=0", getCommandString(commands[2])); @@ -88,6 +92,8 @@ TEST(CommandQueueTest, fifo_with_priorities) { EXPECT_EQ("sixth t=14 p=50", getCommandString(queue.peekLowestPriorityCommand())); ASSERT_FALSE(queue.empty()); + EXPECT_EQ(7, queue.size()); + EXPECT_EQ(7, queue.relaxed_atomic_size()); std::vector> commands; for (;;) { auto cmdPeek = queue.peekNextCommand(); @@ -98,6 +104,8 @@ TEST(CommandQueueTest, fifo_with_priorities) { commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); + EXPECT_EQ(0, queue.size()); + EXPECT_EQ(0, queue.relaxed_atomic_size()); EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0])); EXPECT_EQ("third t=5 p=9", getCommandString(commands[1])); EXPECT_EQ("first t=1 p=10", getCommandString(commands[2])); @@ -134,6 +142,7 @@ TEST(CommandQueueTest, release_oldest) { "second t=100 p=0\n" "sixth t=400 p=0\n", ost.str()); EXPECT_EQ(3u, queue.size()); + EXPECT_EQ(3u, queue.relaxed_atomic_size()); } TEST(CommandQueueTest, release_lowest_priority) { @@ -149,6 +158,7 @@ TEST(CommandQueueTest, release_lowest_priority) { queue.add(getCommand("sixth", 14ms, 50)); queue.add(getCommand("seventh", 7ms, 0)); ASSERT_EQ(7u, queue.size()); + EXPECT_EQ(7u, queue.relaxed_atomic_size()); std::vector> commands; for (;;) { @@ -186,6 +196,7 @@ TEST(CommandQueueTest, delete_iterator) { ++it; ++it; queue.erase(it); ASSERT_EQ(6u, queue.size()); + ASSERT_EQ(6u, queue.relaxed_atomic_size()); std::vector> cmds; for (;;) { diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index 3232baf50de..4d948051b32 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -71,6 +72,7 @@ private: const framework::Clock& _clock; mutable CommandList _commands; uint64_t _sequenceId; + std::atomic _cached_size; public: typedef typename CommandList::iterator iterator; @@ -81,7 +83,9 @@ public: explicit CommandQueue(const framework::Clock& clock) : _clock(clock), - _sequenceId(0) {} + _sequenceId(0), + _cached_size(0) + {} const framework::Clock& getTimer() const { return _clock; } @@ -101,17 +105,26 @@ public: } bool empty() const { return _commands.empty(); } - uint32_t size() const { return _commands.size(); } + size_t size() const { return _commands.size(); } + size_t relaxed_atomic_size() const noexcept { return _cached_size.load(std::memory_order_relaxed); } std::pair, vespalib::steady_time> releaseNextCommand(); std::shared_ptr peekNextCommand() const; void add(const std::shared_ptr& msg); - void erase(iterator it) { _commands.erase(it); } + void erase(iterator it) { + _commands.erase(it); + update_cached_size(); + } std::vector releaseTimedOut(); std::pair, vespalib::steady_time> releaseLowestPriorityCommand(); std::shared_ptr peekLowestPriorityCommand() const; void clear() { return _commands.clear(); } void print(std::ostream& out, bool verbose, const std::string& indent) const override; + +private: + void update_cached_size() noexcept { + _cached_size.store(_commands.size(), std::memory_order_relaxed); + } }; @@ -125,6 +138,7 @@ CommandQueue::releaseNextCommand() retVal.first = first->_command; retVal.second = first->_deadline; _commands.erase(first); + update_cached_size(); } return retVal; } @@ -147,6 +161,7 @@ CommandQueue::add(const std::shared_ptr& cmd) { auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout(); _commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority())); + update_cached_size(); } template @@ -160,6 +175,7 @@ CommandQueue::releaseTimedOut() timelist& tl = boost::multi_index::get<1>(_commands); tl.erase(tbegin()); } + update_cached_size(); return timed_out; } @@ -172,6 +188,7 @@ CommandQueue::releaseLowestPriorityCommand() auto deadline = last->_deadline; std::shared_ptr cmd(last->_command); _commands.erase(last); + update_cached_size(); return {cmd, deadline}; } else { return {}; diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 282299ebbc1..c548ef9e20c 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -86,7 +86,7 @@ VisitorManager::create_and_start_manager_thread() void VisitorManager::updateMetrics(const MetricLockGuard &) { - _metrics->queueSize.addValue(_visitorQueue.size()); + _metrics->queueSize.addValue(static_cast(_visitorQueue.relaxed_atomic_size())); } void -- cgit v1.2.3