summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-05-04 13:40:33 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-05-04 13:40:33 +0000
commit61131edef7ccaeb39c7aa6b09f8ce1ebbd8b92ad (patch)
tree92bdacf158b280e48133479ededa34d65b53efe8 /storage
parentd9ea4c99b13fa83a75bd8612d5f6c4d50510e3e9 (diff)
Expose a relaxed atomic view of visitor queue size and use it for metric callback
Avoids having to take a mutex that may potentially trigger lock order inversion
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/visiting/commandqueuetest.cpp11
-rw-r--r--storage/src/vespa/storage/visiting/commandqueue.h23
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp2
3 files changed, 32 insertions, 4 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp
index 1e24fafed4f..97331774644 100644
--- a/storage/src/tests/visiting/commandqueuetest.cpp
+++ b/storage/src/tests/visiting/commandqueuetest.cpp
@@ -51,6 +51,8 @@ TEST(CommandQueueTest, fifo) {
queue.add(getCommand("seventh", 7ms));
ASSERT_FALSE(queue.empty());
+ EXPECT_EQ(7, queue.size());
+ EXPECT_EQ(7, queue.relaxed_atomic_size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
auto cmd = queue.releaseNextCommand().first;
@@ -60,6 +62,8 @@ TEST(CommandQueueTest, fifo) {
commands.emplace_back(std::move(cmd));
}
ASSERT_EQ(7, commands.size());
+ EXPECT_EQ(0, queue.size());
+ EXPECT_EQ(0, queue.relaxed_atomic_size());
EXPECT_EQ("first t=1 p=0", getCommandString(commands[0]));
EXPECT_EQ("second t=10 p=0", getCommandString(commands[1]));
EXPECT_EQ("third t=5 p=0", getCommandString(commands[2]));
@@ -88,6 +92,8 @@ TEST(CommandQueueTest, fifo_with_priorities) {
EXPECT_EQ("sixth t=14 p=50", getCommandString(queue.peekLowestPriorityCommand()));
ASSERT_FALSE(queue.empty());
+ EXPECT_EQ(7, queue.size());
+ EXPECT_EQ(7, queue.relaxed_atomic_size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
auto cmdPeek = queue.peekNextCommand();
@@ -98,6 +104,8 @@ TEST(CommandQueueTest, fifo_with_priorities) {
commands.emplace_back(std::move(cmd));
}
ASSERT_EQ(7, commands.size());
+ EXPECT_EQ(0, queue.size());
+ EXPECT_EQ(0, queue.relaxed_atomic_size());
EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0]));
EXPECT_EQ("third t=5 p=9", getCommandString(commands[1]));
EXPECT_EQ("first t=1 p=10", getCommandString(commands[2]));
@@ -134,6 +142,7 @@ TEST(CommandQueueTest, release_oldest) {
"second t=100 p=0\n"
"sixth t=400 p=0\n", ost.str());
EXPECT_EQ(3u, queue.size());
+ EXPECT_EQ(3u, queue.relaxed_atomic_size());
}
TEST(CommandQueueTest, release_lowest_priority) {
@@ -149,6 +158,7 @@ TEST(CommandQueueTest, release_lowest_priority) {
queue.add(getCommand("sixth", 14ms, 50));
queue.add(getCommand("seventh", 7ms, 0));
ASSERT_EQ(7u, queue.size());
+ EXPECT_EQ(7u, queue.relaxed_atomic_size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
@@ -186,6 +196,7 @@ TEST(CommandQueueTest, delete_iterator) {
++it; ++it;
queue.erase(it);
ASSERT_EQ(6u, queue.size());
+ ASSERT_EQ(6u, queue.relaxed_atomic_size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> cmds;
for (;;) {
diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h
index 3232baf50de..4d948051b32 100644
--- a/storage/src/vespa/storage/visiting/commandqueue.h
+++ b/storage/src/vespa/storage/visiting/commandqueue.h
@@ -18,6 +18,7 @@
#include <vespa/vespalib/util/printable.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/storageframework/generic/clock/clock.h>
+#include <atomic>
#include <vector>
#include <ostream>
@@ -71,6 +72,7 @@ private:
const framework::Clock& _clock;
mutable CommandList _commands;
uint64_t _sequenceId;
+ std::atomic<size_t> _cached_size;
public:
typedef typename CommandList::iterator iterator;
@@ -81,7 +83,9 @@ public:
explicit CommandQueue(const framework::Clock& clock)
: _clock(clock),
- _sequenceId(0) {}
+ _sequenceId(0),
+ _cached_size(0)
+ {}
const framework::Clock& getTimer() const { return _clock; }
@@ -101,17 +105,26 @@ public:
}
bool empty() const { return _commands.empty(); }
- uint32_t size() const { return _commands.size(); }
+ size_t size() const { return _commands.size(); }
+ size_t relaxed_atomic_size() const noexcept { return _cached_size.load(std::memory_order_relaxed); }
std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseNextCommand();
std::shared_ptr<Command> peekNextCommand() const;
void add(const std::shared_ptr<Command>& msg);
- void erase(iterator it) { _commands.erase(it); }
+ void erase(iterator it) {
+ _commands.erase(it);
+ update_cached_size();
+ }
std::vector<CommandEntry> releaseTimedOut();
std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand();
std::shared_ptr<Command> peekLowestPriorityCommand() const;
void clear() { return _commands.clear(); }
void print(std::ostream& out, bool verbose, const std::string& indent) const override;
+
+private:
+ void update_cached_size() noexcept {
+ _cached_size.store(_commands.size(), std::memory_order_relaxed);
+ }
};
@@ -125,6 +138,7 @@ CommandQueue<Command>::releaseNextCommand()
retVal.first = first->_command;
retVal.second = first->_deadline;
_commands.erase(first);
+ update_cached_size();
}
return retVal;
}
@@ -147,6 +161,7 @@ CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd)
{
auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout();
_commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority()));
+ update_cached_size();
}
template<class Command>
@@ -160,6 +175,7 @@ CommandQueue<Command>::releaseTimedOut()
timelist& tl = boost::multi_index::get<1>(_commands);
tl.erase(tbegin());
}
+ update_cached_size();
return timed_out;
}
@@ -172,6 +188,7 @@ CommandQueue<Command>::releaseLowestPriorityCommand()
auto deadline = last->_deadline;
std::shared_ptr<Command> cmd(last->_command);
_commands.erase(last);
+ update_cached_size();
return {cmd, deadline};
} else {
return {};
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 282299ebbc1..c548ef9e20c 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -86,7 +86,7 @@ VisitorManager::create_and_start_manager_thread()
void
VisitorManager::updateMetrics(const MetricLockGuard &)
{
- _metrics->queueSize.addValue(_visitorQueue.size());
+ _metrics->queueSize.addValue(static_cast<int64_t>(_visitorQueue.relaxed_atomic_size()));
}
void