diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-05-04 13:40:33 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-05-04 13:40:33 +0000 |
commit | 61131edef7ccaeb39c7aa6b09f8ce1ebbd8b92ad (patch) | |
tree | 92bdacf158b280e48133479ededa34d65b53efe8 /storage | |
parent | d9ea4c99b13fa83a75bd8612d5f6c4d50510e3e9 (diff) |
Expose a relaxed atomic view of visitor queue size and use it for metric callback
Avoids having to take a mutex that may potentially trigger lock order inversion
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/visiting/commandqueuetest.cpp | 11 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/commandqueue.h | 23 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitormanager.cpp | 2 |
3 files changed, 32 insertions, 4 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index 1e24fafed4f..97331774644 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -51,6 +51,8 @@ TEST(CommandQueueTest, fifo) { queue.add(getCommand("seventh", 7ms)); ASSERT_FALSE(queue.empty()); + EXPECT_EQ(7, queue.size()); + EXPECT_EQ(7, queue.relaxed_atomic_size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { auto cmd = queue.releaseNextCommand().first; @@ -60,6 +62,8 @@ TEST(CommandQueueTest, fifo) { commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); + EXPECT_EQ(0, queue.size()); + EXPECT_EQ(0, queue.relaxed_atomic_size()); EXPECT_EQ("first t=1 p=0", getCommandString(commands[0])); EXPECT_EQ("second t=10 p=0", getCommandString(commands[1])); EXPECT_EQ("third t=5 p=0", getCommandString(commands[2])); @@ -88,6 +92,8 @@ TEST(CommandQueueTest, fifo_with_priorities) { EXPECT_EQ("sixth t=14 p=50", getCommandString(queue.peekLowestPriorityCommand())); ASSERT_FALSE(queue.empty()); + EXPECT_EQ(7, queue.size()); + EXPECT_EQ(7, queue.relaxed_atomic_size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { auto cmdPeek = queue.peekNextCommand(); @@ -98,6 +104,8 @@ TEST(CommandQueueTest, fifo_with_priorities) { commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); + EXPECT_EQ(0, queue.size()); + EXPECT_EQ(0, queue.relaxed_atomic_size()); EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0])); EXPECT_EQ("third t=5 p=9", getCommandString(commands[1])); EXPECT_EQ("first t=1 p=10", getCommandString(commands[2])); @@ -134,6 +142,7 @@ TEST(CommandQueueTest, release_oldest) { "second t=100 p=0\n" "sixth t=400 p=0\n", ost.str()); EXPECT_EQ(3u, queue.size()); + EXPECT_EQ(3u, queue.relaxed_atomic_size()); } TEST(CommandQueueTest, release_lowest_priority) { @@ -149,6 +158,7 @@ TEST(CommandQueueTest, release_lowest_priority) { queue.add(getCommand("sixth", 14ms, 50)); queue.add(getCommand("seventh", 7ms, 0)); ASSERT_EQ(7u, queue.size()); + EXPECT_EQ(7u, queue.relaxed_atomic_size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { @@ -186,6 +196,7 @@ TEST(CommandQueueTest, delete_iterator) { ++it; ++it; queue.erase(it); ASSERT_EQ(6u, queue.size()); + ASSERT_EQ(6u, queue.relaxed_atomic_size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> cmds; for (;;) { diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index 3232baf50de..4d948051b32 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -18,6 +18,7 @@ #include <vespa/vespalib/util/printable.h> #include <vespa/vespalib/util/time.h> #include <vespa/storageframework/generic/clock/clock.h> +#include <atomic> #include <vector> #include <ostream> @@ -71,6 +72,7 @@ private: const framework::Clock& _clock; mutable CommandList _commands; uint64_t _sequenceId; + std::atomic<size_t> _cached_size; public: typedef typename CommandList::iterator iterator; @@ -81,7 +83,9 @@ public: explicit CommandQueue(const framework::Clock& clock) : _clock(clock), - _sequenceId(0) {} + _sequenceId(0), + _cached_size(0) + {} const framework::Clock& getTimer() const { return _clock; } @@ -101,17 +105,26 @@ public: } bool empty() const { return _commands.empty(); } - uint32_t size() const { return _commands.size(); } + size_t size() const { return _commands.size(); } + size_t relaxed_atomic_size() const noexcept { return _cached_size.load(std::memory_order_relaxed); } std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseNextCommand(); std::shared_ptr<Command> peekNextCommand() const; void add(const std::shared_ptr<Command>& msg); - void erase(iterator it) { _commands.erase(it); } + void erase(iterator it) { + _commands.erase(it); + update_cached_size(); + } std::vector<CommandEntry> releaseTimedOut(); std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand(); std::shared_ptr<Command> peekLowestPriorityCommand() const; void clear() { return _commands.clear(); } void print(std::ostream& out, bool verbose, const std::string& indent) const override; + +private: + void update_cached_size() noexcept { + _cached_size.store(_commands.size(), std::memory_order_relaxed); + } }; @@ -125,6 +138,7 @@ CommandQueue<Command>::releaseNextCommand() retVal.first = first->_command; retVal.second = first->_deadline; _commands.erase(first); + update_cached_size(); } return retVal; } @@ -147,6 +161,7 @@ CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd) { auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout(); _commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority())); + update_cached_size(); } template<class Command> @@ -160,6 +175,7 @@ CommandQueue<Command>::releaseTimedOut() timelist& tl = boost::multi_index::get<1>(_commands); tl.erase(tbegin()); } + update_cached_size(); return timed_out; } @@ -172,6 +188,7 @@ CommandQueue<Command>::releaseLowestPriorityCommand() auto deadline = last->_deadline; std::shared_ptr<Command> cmd(last->_command); _commands.erase(last); + update_cached_size(); return {cmd, deadline}; } else { return {}; diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 282299ebbc1..c548ef9e20c 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -86,7 +86,7 @@ VisitorManager::create_and_start_manager_thread() void VisitorManager::updateMetrics(const MetricLockGuard &) { - _metrics->queueSize.addValue(_visitorQueue.size()); + _metrics->queueSize.addValue(static_cast<int64_t>(_visitorQueue.relaxed_atomic_size())); } void |