summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-03-18 14:05:08 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2022-03-18 14:05:08 +0000
commit9be87dcfc81d88b7aa5df918047ba3ab977b6c3f (patch)
tree069213d8e23e4db28450dd24191a7a17a7c7ed7f /storage
parent81dbfb5c6446bb2fa6c54923ff6d77b20815f611 (diff)
Fix visitor manager test TSan mutex inversion warning and refactor deadline handling
Defer starting main message dispatch thread until test has enqueued a message that will be immediately timed out. Avoids having to depend on taking an internal mutex in the test to prevent racing with queue handoffs. Taking said mutex triggered a mutex order inversion warning in TSan. Also refactor visitor queue deadline handling by moving to a strongly typed time point. This removes some manual unit scaling arithmetic that did not appear to be entirely correct in the first place.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/visiting/commandqueuetest.cpp7
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp20
-rw-r--r--storage/src/vespa/storage/visiting/commandqueue.h50
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h2
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp49
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h8
6 files changed, 76 insertions, 60 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp
index ac4251ba21e..4d528ce4971 100644
--- a/storage/src/tests/visiting/commandqueuetest.cpp
+++ b/storage/src/tests/visiting/commandqueuetest.cpp
@@ -155,12 +155,11 @@ TEST(CommandQueueTest, release_lowest_priority) {
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand());
- std::pair<std::shared_ptr<api::CreateVisitorCommand>, uint64_t> cmd(
- queue.releaseLowestPriorityCommand());
- if (cmd.first.get() == 0 || cmdPeek != cmd.first) {
+ auto cmd_and_deadline = queue.releaseLowestPriorityCommand();
+ if (!cmd_and_deadline.first || cmdPeek != cmd_and_deadline.first) {
break;
}
- commands.push_back(cmd.first);
+ commands.push_back(cmd_and_deadline.first);
}
ASSERT_EQ(7, commands.size());
EXPECT_EQ("sixth t=14 p=50", getCommandString(commands[0]));
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 99172264d47..be4e7270c69 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -53,7 +53,7 @@ protected:
~VisitorManagerTest();
// Not using setUp since can't throw exception out of it.
- void initializeTest();
+ void initializeTest(bool defer_manager_thread_start = false);
void addSomeRemoves(bool removeAll = false);
void TearDown() override;
TestVisitorMessageSession& getSession(uint32_t n);
@@ -78,7 +78,7 @@ VisitorManagerTest::~VisitorManagerTest() = default;
uint32_t VisitorManagerTest::docCount = 10;
void
-VisitorManagerTest::initializeTest()
+VisitorManagerTest::initializeTest(bool defer_manager_thread_start)
{
vdstestlib::DirConfig config(getStandardConfig(true));
config.getConfig("stor-visitor").set("visitorthreads", "1");
@@ -88,7 +88,11 @@ VisitorManagerTest::initializeTest()
_node->setupDummyPersistence();
_node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1"));
_top = std::make_unique<DummyStorageLink>();
- auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), _node->getComponentRegister(), *_messageSessionFactory);
+ auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()),
+ _node->getComponentRegister(),
+ *_messageSessionFactory,
+ VisitorFactory::Map(),
+ defer_manager_thread_start);
_manager = vm.get();
_top->push_back(std::move(vm));
_top->push_back(std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), _node->getPersistenceProvider(),
@@ -753,22 +757,22 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) {
}
TEST_F(VisitorManagerTest, visitor_queue_timeout) {
- ASSERT_NO_FATAL_FAILURE(initializeTest());
+ ASSERT_NO_FATAL_FAILURE(initializeTest(true));
_manager->enforceQueueUsage();
{
- std::lock_guard guard(_manager->getThread(0).getQueueMonitor());
-
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(_Address);
cmd->setQueueTimeout(1ms);
cmd->setTimeout(100 * 1000 * 1000ms);
+ // The manager thread isn't running yet so the visitor stays on the queue
_top->sendDown(cmd);
-
- _node->getClock().addSecondsToTime(1000);
}
+ _node->getClock().addSecondsToTime(1000);
+ _manager->create_and_start_manager_thread();
+
// Don't answer any messages. Make sure we timeout anyways.
_top->waitForMessages(1, 60);
const msg_ptr_vector replies = _top->getRepliesOnce();
diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h
index 70f5b2421c3..0f345aa0d94 100644
--- a/storage/src/vespa/storage/visiting/commandqueue.h
+++ b/storage/src/vespa/storage/visiting/commandqueue.h
@@ -30,12 +30,12 @@ public:
struct CommandEntry {
typedef typename Command::Priority PriorityType;
std::shared_ptr<Command> _command;
- uint64_t _time;
+ vespalib::steady_time _time;
uint64_t _sequenceId;
PriorityType _priority;
CommandEntry(const std::shared_ptr<Command>& cmd,
- uint64_t time,
+ vespalib::steady_time time,
uint64_t sequenceId,
PriorityType priority)
: _command(cmd), _time(time), _sequenceId(sequenceId), _priority(priority)
@@ -51,19 +51,18 @@ public:
};
private:
- typedef boost::multi_index::multi_index_container<
+ using CommandList = boost::multi_index::multi_index_container<
CommandEntry,
boost::multi_index::indexed_by<
boost::multi_index::ordered_unique<
boost::multi_index::identity<CommandEntry>
>,
boost::multi_index::ordered_non_unique<
- boost::multi_index::member<CommandEntry, uint64_t, &CommandEntry::_time>
+ boost::multi_index::member<CommandEntry, vespalib::steady_time, &CommandEntry::_time>
>
>
- > CommandList;
- typedef typename boost::multi_index
- ::nth_index<CommandList, 1>::type timelist;
+ >;
+ using timelist = typename boost::multi_index::nth_index<CommandList, 1>::type;
const framework::Clock& _clock;
mutable CommandList _commands;
@@ -76,7 +75,7 @@ public:
typedef typename CommandList::const_reverse_iterator const_reverse_iterator;
typedef typename timelist::const_iterator const_titerator;
- CommandQueue(const framework::Clock& clock)
+ explicit CommandQueue(const framework::Clock& clock)
: _clock(clock),
_sequenceId(0) {}
@@ -89,22 +88,22 @@ public:
const_iterator end() const { return _commands.end(); }
const_titerator tbegin() const {
- timelist& tl = boost::multi_index::get<1>(_commands);
+ auto& tl = boost::multi_index::get<1>(_commands);
return tl.begin();
}
const_titerator tend() const {
- timelist& tl = boost::multi_index::get<1>(_commands);
+ auto& tl = boost::multi_index::get<1>(_commands);
return tl.end();
}
bool empty() const { return _commands.empty(); }
uint32_t size() const { return _commands.size(); }
- std::pair<std::shared_ptr<Command>, time_t> releaseNextCommand();
+ std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseNextCommand();
std::shared_ptr<Command> peekNextCommand() const;
void add(const std::shared_ptr<Command>& msg);
void erase(iterator it) { _commands.erase(it); }
std::list<CommandEntry> releaseTimedOut();
- std::pair<std::shared_ptr<Command>, time_t> releaseLowestPriorityCommand();
+ std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand();
std::shared_ptr<Command> peekLowestPriorityCommand() const;
void clear() { return _commands.clear(); }
@@ -113,11 +112,10 @@ public:
template<class Command>
-std::pair<std::shared_ptr<Command>, time_t>
+std::pair<std::shared_ptr<Command>, vespalib::steady_time>
CommandQueue<Command>::releaseNextCommand()
{
- std::pair<std::shared_ptr<Command>, time_t> retVal(
- std::shared_ptr<Command>(), 0);
+ std::pair<std::shared_ptr<Command>, vespalib::steady_time> retVal;
if (!_commands.empty()) {
iterator first = _commands.begin();
retVal.first = first->_command;
@@ -143,9 +141,8 @@ template<class Command>
void
CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd)
{
- framework::MicroSecTime time(_clock.getTimeInMicros()
- + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout())));
- _commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority()));
+ auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout();
+ _commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority()));
}
template<class Command>
@@ -153,8 +150,8 @@ std::list<typename CommandQueue<Command>::CommandEntry>
CommandQueue<Command>::releaseTimedOut()
{
std::list<CommandEntry> mylist;
- framework::MicroSecTime time(_clock.getTimeInMicros());
- while (!empty() && tbegin()->_time <= time.getTime()) {
+ auto now = _clock.getMonotonicTime();
+ while (!empty() && tbegin()->_time <= now) {
mylist.push_back(*tbegin());
timelist& tl = boost::multi_index::get<1>(_commands);
tl.erase(tbegin());
@@ -163,18 +160,17 @@ CommandQueue<Command>::releaseTimedOut()
}
template <class Command>
-std::pair<std::shared_ptr<Command>, time_t>
+std::pair<std::shared_ptr<Command>, vespalib::steady_time>
CommandQueue<Command>::releaseLowestPriorityCommand()
{
if (!_commands.empty()) {
iterator last = (++_commands.rbegin()).base();
- time_t time = last->_time;
+ auto time = last->_time;
std::shared_ptr<Command> cmd(last->_command);
_commands.erase(last);
- return std::pair<std::shared_ptr<Command>, time_t>(cmd, time);
+ return {cmd, time};
} else {
- return std::pair<std::shared_ptr<Command>, time_t>(
- std::shared_ptr<Command>(), 0);
+ return {};
}
}
@@ -198,12 +194,12 @@ CommandQueue<Command>::print(std::ostream& out, bool verbose, const std::string&
out << "Insert order:\n";
for (const_iterator it = begin(); it != end(); ++it) {
out << indent << *it->_command << ", priority " << it->_priority
- << ", time " << it->_time << "\n";
+ << ", time " << vespalib::count_ms(it->_time.time_since_epoch()) << "\n";
}
out << indent << "Time order:";
for (const_titerator it = tbegin(); it != tend(); ++it) {
out << "\n" << indent << *it->_command << ", priority " << it->_priority
- << ", time " << it->_time;
+ << ", time " << vespalib::count_ms(it->_time.time_since_epoch());
}
}
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 11192021577..aa1c34d10b7 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -70,7 +70,7 @@ public:
*/
virtual void closed(api::VisitorId id) = 0;
- virtual ~VisitorMessageHandler() {}
+ virtual ~VisitorMessageHandler() = default;
};
/**
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index eb5c927873c..de418fe61db 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -22,7 +22,8 @@ namespace storage {
VisitorManager::VisitorManager(const config::ConfigUri & configUri,
StorageComponentRegister& componentRegister,
VisitorMessageSessionFactory& messageSF,
- const VisitorFactory::Map& externalFactories)
+ const VisitorFactory::Map& externalFactories,
+ bool defer_manager_thread_start)
: StorageLink("Visitor Manager"),
framework::HtmlStatusReporter("visitorman", "Visitor Manager"),
_componentRegister(componentRegister),
@@ -51,7 +52,9 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
_configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this);
_configFetcher->start();
_component.registerMetric(*_metrics);
- _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ);
+ if (!defer_manager_thread_start) {
+ create_and_start_manager_thread();
+ }
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
_visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>();
_visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>();
@@ -65,7 +68,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
VisitorManager::~VisitorManager() {
closeNextLink();
LOG(debug, "Deleting link %s.", toString().c_str());
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interrupt();
_visitorCond.notify_all();
_thread->join();
@@ -74,6 +77,13 @@ VisitorManager::~VisitorManager() {
}
void
+VisitorManager::create_and_start_manager_thread()
+{
+ assert(!_thread);
+ _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ);
+}
+
+void
VisitorManager::updateMetrics(const MetricLockGuard &)
{
_metrics->queueSize.addValue(_visitorQueue.size());
@@ -207,13 +217,14 @@ VisitorManager::run(framework::ThreadHandle& thread)
}
timedOut = _visitorQueue.releaseTimedOut();
}
- framework::MicroSecTime currentTime(_component.getClock().getTimeInMicros());
+ const auto currentTime = _component.getClock().getMonotonicTime();
for (std::list<CQ::CommandEntry>::iterator it = timedOut.begin();
it != timedOut.end(); ++it)
{
- _metrics->queueTimeoutWaitTime.addValue(currentTime.getTime() - it->_time);
+ // TODO is this really tracking what the metric description implies it's tracking...?
+ _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - it->_time) * 1000.0); // Double metric in millis
std::shared_ptr<api::StorageReply> reply(it->_command->makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::BUSY,"Visitor timed out in visitor queue"));
+ reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue"));
sendUp(reply);
}
{
@@ -224,10 +235,10 @@ VisitorManager::run(framework::ThreadHandle& thread)
_visitorCond.wait_for(waiter, 1000ms);
thread.registerTick(framework::WAIT_CYCLE);
} else {
- uint64_t timediff = (_visitorQueue.tbegin()->_time- currentTime.getTime()) / 1000000;
- timediff = std::min(timediff, uint64_t(1000));
- if (timediff > 0) {
- _visitorCond.wait_for(waiter, std::chrono::milliseconds(timediff));
+ auto time_diff = _visitorQueue.tbegin()->_time - currentTime;
+ time_diff = (time_diff < 1000ms) ? time_diff : 1000ms;
+ if (time_diff.count() > 0) {
+ _visitorCond.wait_for(waiter, time_diff);
thread.registerTick(framework::WAIT_CYCLE);
}
}
@@ -307,12 +318,13 @@ VisitorManager::scheduleVisitor(
std::shared_ptr<api::CreateVisitorCommand> tail(_visitorQueue.peekLowestPriorityCommand());
// Lower int ==> higher pri
if (cmd->getPriority() < tail->getPriority()) {
- std::pair<api::CreateVisitorCommand::SP, time_t> evictCommand(_visitorQueue.releaseLowestPriorityCommand());
+ auto evictCommand = _visitorQueue.releaseLowestPriorityCommand();
assert(tail == evictCommand.first);
_visitorQueue.add(cmd);
_visitorCond.notify_one();
- framework::MicroSecTime t(_component.getClock().getTimeInMicros());
- _metrics->queueEvictedWaitTime.addValue(t.getTime() - evictCommand.second);
+ auto now = _component.getClock().getMonotonicTime();
+ // TODO is this really tracking what the metric description implies it's tracking...?
+ _metrics->queueEvictedWaitTime.addValue(vespalib::to_s(now - evictCommand.second) * 1000.0); // Double metric in millis
failCommand = evictCommand.first;
} else {
failCommand = cmd;
@@ -491,11 +503,12 @@ VisitorManager::attemptScheduleQueuedVisitor(MonitorGuard& visitorLock)
std::shared_ptr<api::CreateVisitorCommand> cmd(_visitorQueue.peekNextCommand());
assert(cmd.get());
if (totCount < maximumConcurrent(*cmd)) {
- std::pair<api::CreateVisitorCommand::SP, time_t> cmd2(_visitorQueue.releaseNextCommand());
+ auto cmd2 = _visitorQueue.releaseNextCommand();
assert(cmd == cmd2.first);
scheduleVisitor(cmd, true, visitorLock);
- framework::MicroSecTime time(_component.getClock().getTimeInMicros());
- _metrics->queueWaitTime.addValue(time.getTime() - cmd2.second);
+ auto now = _component.getClock().getMonotonicTime();
+ // TODO is this really tracking what the metric description implies it's tracking...?
+ _metrics->queueWaitTime.addValue(vespalib::to_s(now - cmd2.second) * 1000.0); // Double metric in millis
// visitorLock is unlocked at this point
return true;
}
@@ -584,7 +597,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
}
out << "<h3>Queued visitors</h3>\n<ul>\n";
- framework::MicroSecTime time(_component.getClock().getTimeInMicros());
+ const auto now = _component.getClock().getMonotonicTime();
for (CommandQueue<api::CreateVisitorCommand>::const_iterator it
= _visitorQueue.begin(); it != _visitorQueue.end(); ++it)
{
@@ -592,7 +605,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
assert(cmd);
out << "<li>" << cmd->getInstanceId() << " - "
<< vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout "
- << (it->_time - time.getTime()) / 1000000 << " ms\n";
+ << vespalib::count_ms(it->_time - now) << " ms\n";
}
if (_visitorQueue.empty()) {
out << "None\n";
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 888c0873e9c..33703b392bc 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -84,9 +84,11 @@ private:
bool _enforceQueueUse;
VisitorFactory::Map _visitorFactories;
public:
- VisitorManager(const config::ConfigUri & configUri, StorageComponentRegister&,
+ VisitorManager(const config::ConfigUri & configUri,
+ StorageComponentRegister&,
VisitorMessageSessionFactory&,
- const VisitorFactory::Map& external = VisitorFactory::Map());
+ const VisitorFactory::Map& external = VisitorFactory::Map(),
+ bool defer_manager_thread_start = false);
~VisitorManager() override;
void onClose() override;
@@ -115,6 +117,8 @@ public:
}
/** For unit testing */
bool hasPendingMessageState() const;
+ // Must be called exactly once iff manager was created with defer_manager_thread_start == true
+ void create_and_start_manager_thread();
void enforceQueueUsage() { _enforceQueueUse = true; }