diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-18 14:05:08 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-18 14:05:08 +0000 |
commit | 9be87dcfc81d88b7aa5df918047ba3ab977b6c3f (patch) | |
tree | 069213d8e23e4db28450dd24191a7a17a7c7ed7f /storage | |
parent | 81dbfb5c6446bb2fa6c54923ff6d77b20815f611 (diff) |
Fix visitor manager test TSan mutex inversion warning and refactor deadline handling
Defer starting main message dispatch thread until test has enqueued a message
that will be immediately timed out. Avoids having to depend on taking an
internal mutex in the test to prevent racing with queue handoffs. Taking
said mutex triggered a mutex order inversion warning in TSan.
Also refactor visitor queue deadline handling by moving to a strongly typed
time point. This removes some manual unit scaling arithmetic that did not
appear to be entirely correct in the first place.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/tests/visiting/commandqueuetest.cpp | 7 | ||||
-rw-r--r-- | storage/src/tests/visiting/visitormanagertest.cpp | 20 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/commandqueue.h | 50 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitor.h | 2 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitormanager.cpp | 49 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitormanager.h | 8 |
6 files changed, 76 insertions, 60 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index ac4251ba21e..4d528ce4971 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -155,12 +155,11 @@ TEST(CommandQueueTest, release_lowest_priority) { std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand()); - std::pair<std::shared_ptr<api::CreateVisitorCommand>, uint64_t> cmd( - queue.releaseLowestPriorityCommand()); - if (cmd.first.get() == 0 || cmdPeek != cmd.first) { + auto cmd_and_deadline = queue.releaseLowestPriorityCommand(); + if (!cmd_and_deadline.first || cmdPeek != cmd_and_deadline.first) { break; } - commands.push_back(cmd.first); + commands.push_back(cmd_and_deadline.first); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("sixth t=14 p=50", getCommandString(commands[0])); diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 99172264d47..be4e7270c69 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -53,7 +53,7 @@ protected: ~VisitorManagerTest(); // Not using setUp since can't throw exception out of it. - void initializeTest(); + void initializeTest(bool defer_manager_thread_start = false); void addSomeRemoves(bool removeAll = false); void TearDown() override; TestVisitorMessageSession& getSession(uint32_t n); @@ -78,7 +78,7 @@ VisitorManagerTest::~VisitorManagerTest() = default; uint32_t VisitorManagerTest::docCount = 10; void -VisitorManagerTest::initializeTest() +VisitorManagerTest::initializeTest(bool defer_manager_thread_start) { vdstestlib::DirConfig config(getStandardConfig(true)); config.getConfig("stor-visitor").set("visitorthreads", "1"); @@ -88,7 +88,11 @@ VisitorManagerTest::initializeTest() _node->setupDummyPersistence(); _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1")); _top = std::make_unique<DummyStorageLink>(); - auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), _node->getComponentRegister(), *_messageSessionFactory); + auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), + _node->getComponentRegister(), + *_messageSessionFactory, + VisitorFactory::Map(), + defer_manager_thread_start); _manager = vm.get(); _top->push_back(std::move(vm)); _top->push_back(std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), _node->getPersistenceProvider(), @@ -753,22 +757,22 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) { } TEST_F(VisitorManagerTest, visitor_queue_timeout) { - ASSERT_NO_FATAL_FAILURE(initializeTest()); + ASSERT_NO_FATAL_FAILURE(initializeTest(true)); _manager->enforceQueueUsage(); { - std::lock_guard guard(_manager->getThread(0).getQueueMonitor()); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(_Address); cmd->setQueueTimeout(1ms); cmd->setTimeout(100 * 1000 * 1000ms); + // The manager thread isn't running yet so the visitor stays on the queue _top->sendDown(cmd); - - _node->getClock().addSecondsToTime(1000); } + _node->getClock().addSecondsToTime(1000); + _manager->create_and_start_manager_thread(); + // Don't answer any messages. Make sure we timeout anyways. _top->waitForMessages(1, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index 70f5b2421c3..0f345aa0d94 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -30,12 +30,12 @@ public: struct CommandEntry { typedef typename Command::Priority PriorityType; std::shared_ptr<Command> _command; - uint64_t _time; + vespalib::steady_time _time; uint64_t _sequenceId; PriorityType _priority; CommandEntry(const std::shared_ptr<Command>& cmd, - uint64_t time, + vespalib::steady_time time, uint64_t sequenceId, PriorityType priority) : _command(cmd), _time(time), _sequenceId(sequenceId), _priority(priority) @@ -51,19 +51,18 @@ public: }; private: - typedef boost::multi_index::multi_index_container< + using CommandList = boost::multi_index::multi_index_container< CommandEntry, boost::multi_index::indexed_by< boost::multi_index::ordered_unique< boost::multi_index::identity<CommandEntry> >, boost::multi_index::ordered_non_unique< - boost::multi_index::member<CommandEntry, uint64_t, &CommandEntry::_time> + boost::multi_index::member<CommandEntry, vespalib::steady_time, &CommandEntry::_time> > > - > CommandList; - typedef typename boost::multi_index - ::nth_index<CommandList, 1>::type timelist; + >; + using timelist = typename boost::multi_index::nth_index<CommandList, 1>::type; const framework::Clock& _clock; mutable CommandList _commands; @@ -76,7 +75,7 @@ public: typedef typename CommandList::const_reverse_iterator const_reverse_iterator; typedef typename timelist::const_iterator const_titerator; - CommandQueue(const framework::Clock& clock) + explicit CommandQueue(const framework::Clock& clock) : _clock(clock), _sequenceId(0) {} @@ -89,22 +88,22 @@ public: const_iterator end() const { return _commands.end(); } const_titerator tbegin() const { - timelist& tl = boost::multi_index::get<1>(_commands); + auto& tl = boost::multi_index::get<1>(_commands); return tl.begin(); } const_titerator tend() const { - timelist& tl = boost::multi_index::get<1>(_commands); + auto& tl = boost::multi_index::get<1>(_commands); return tl.end(); } bool empty() const { return _commands.empty(); } uint32_t size() const { return _commands.size(); } - std::pair<std::shared_ptr<Command>, time_t> releaseNextCommand(); + std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseNextCommand(); std::shared_ptr<Command> peekNextCommand() const; void add(const std::shared_ptr<Command>& msg); void erase(iterator it) { _commands.erase(it); } std::list<CommandEntry> releaseTimedOut(); - std::pair<std::shared_ptr<Command>, time_t> releaseLowestPriorityCommand(); + std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand(); std::shared_ptr<Command> peekLowestPriorityCommand() const; void clear() { return _commands.clear(); } @@ -113,11 +112,10 @@ public: template<class Command> -std::pair<std::shared_ptr<Command>, time_t> +std::pair<std::shared_ptr<Command>, vespalib::steady_time> CommandQueue<Command>::releaseNextCommand() { - std::pair<std::shared_ptr<Command>, time_t> retVal( - std::shared_ptr<Command>(), 0); + std::pair<std::shared_ptr<Command>, vespalib::steady_time> retVal; if (!_commands.empty()) { iterator first = _commands.begin(); retVal.first = first->_command; @@ -143,9 +141,8 @@ template<class Command> void CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd) { - framework::MicroSecTime time(_clock.getTimeInMicros() - + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout()))); - _commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority())); + auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout(); + _commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority())); } template<class Command> @@ -153,8 +150,8 @@ std::list<typename CommandQueue<Command>::CommandEntry> CommandQueue<Command>::releaseTimedOut() { std::list<CommandEntry> mylist; - framework::MicroSecTime time(_clock.getTimeInMicros()); - while (!empty() && tbegin()->_time <= time.getTime()) { + auto now = _clock.getMonotonicTime(); + while (!empty() && tbegin()->_time <= now) { mylist.push_back(*tbegin()); timelist& tl = boost::multi_index::get<1>(_commands); tl.erase(tbegin()); @@ -163,18 +160,17 @@ CommandQueue<Command>::releaseTimedOut() } template <class Command> -std::pair<std::shared_ptr<Command>, time_t> +std::pair<std::shared_ptr<Command>, vespalib::steady_time> CommandQueue<Command>::releaseLowestPriorityCommand() { if (!_commands.empty()) { iterator last = (++_commands.rbegin()).base(); - time_t time = last->_time; + auto time = last->_time; std::shared_ptr<Command> cmd(last->_command); _commands.erase(last); - return std::pair<std::shared_ptr<Command>, time_t>(cmd, time); + return {cmd, time}; } else { - return std::pair<std::shared_ptr<Command>, time_t>( - std::shared_ptr<Command>(), 0); + return {}; } } @@ -198,12 +194,12 @@ CommandQueue<Command>::print(std::ostream& out, bool verbose, const std::string& out << "Insert order:\n"; for (const_iterator it = begin(); it != end(); ++it) { out << indent << *it->_command << ", priority " << it->_priority - << ", time " << it->_time << "\n"; + << ", time " << vespalib::count_ms(it->_time.time_since_epoch()) << "\n"; } out << indent << "Time order:"; for (const_titerator it = tbegin(); it != tend(); ++it) { out << "\n" << indent << *it->_command << ", priority " << it->_priority - << ", time " << it->_time; + << ", time " << vespalib::count_ms(it->_time.time_since_epoch()); } } diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 11192021577..aa1c34d10b7 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -70,7 +70,7 @@ public: */ virtual void closed(api::VisitorId id) = 0; - virtual ~VisitorMessageHandler() {} + virtual ~VisitorMessageHandler() = default; }; /** diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index eb5c927873c..de418fe61db 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -22,7 +22,8 @@ namespace storage { VisitorManager::VisitorManager(const config::ConfigUri & configUri, StorageComponentRegister& componentRegister, VisitorMessageSessionFactory& messageSF, - const VisitorFactory::Map& externalFactories) + const VisitorFactory::Map& externalFactories, + bool defer_manager_thread_start) : StorageLink("Visitor Manager"), framework::HtmlStatusReporter("visitorman", "Visitor Manager"), _componentRegister(componentRegister), @@ -51,7 +52,9 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this); _configFetcher->start(); _component.registerMetric(*_metrics); - _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ); + if (!defer_manager_thread_start) { + create_and_start_manager_thread(); + } _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>(); _visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>(); @@ -65,7 +68,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, VisitorManager::~VisitorManager() { closeNextLink(); LOG(debug, "Deleting link %s.", toString().c_str()); - if (_thread.get() != 0) { + if (_thread) { _thread->interrupt(); _visitorCond.notify_all(); _thread->join(); @@ -74,6 +77,13 @@ VisitorManager::~VisitorManager() { } void +VisitorManager::create_and_start_manager_thread() +{ + assert(!_thread); + _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ); +} + +void VisitorManager::updateMetrics(const MetricLockGuard &) { _metrics->queueSize.addValue(_visitorQueue.size()); @@ -207,13 +217,14 @@ VisitorManager::run(framework::ThreadHandle& thread) } timedOut = _visitorQueue.releaseTimedOut(); } - framework::MicroSecTime currentTime(_component.getClock().getTimeInMicros()); + const auto currentTime = _component.getClock().getMonotonicTime(); for (std::list<CQ::CommandEntry>::iterator it = timedOut.begin(); it != timedOut.end(); ++it) { - _metrics->queueTimeoutWaitTime.addValue(currentTime.getTime() - it->_time); + // TODO is this really tracking what the metric description implies it's tracking...? + _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - it->_time) * 1000.0); // Double metric in millis std::shared_ptr<api::StorageReply> reply(it->_command->makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::BUSY,"Visitor timed out in visitor queue")); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue")); sendUp(reply); } { @@ -224,10 +235,10 @@ VisitorManager::run(framework::ThreadHandle& thread) _visitorCond.wait_for(waiter, 1000ms); thread.registerTick(framework::WAIT_CYCLE); } else { - uint64_t timediff = (_visitorQueue.tbegin()->_time- currentTime.getTime()) / 1000000; - timediff = std::min(timediff, uint64_t(1000)); - if (timediff > 0) { - _visitorCond.wait_for(waiter, std::chrono::milliseconds(timediff)); + auto time_diff = _visitorQueue.tbegin()->_time - currentTime; + time_diff = (time_diff < 1000ms) ? time_diff : 1000ms; + if (time_diff.count() > 0) { + _visitorCond.wait_for(waiter, time_diff); thread.registerTick(framework::WAIT_CYCLE); } } @@ -307,12 +318,13 @@ VisitorManager::scheduleVisitor( std::shared_ptr<api::CreateVisitorCommand> tail(_visitorQueue.peekLowestPriorityCommand()); // Lower int ==> higher pri if (cmd->getPriority() < tail->getPriority()) { - std::pair<api::CreateVisitorCommand::SP, time_t> evictCommand(_visitorQueue.releaseLowestPriorityCommand()); + auto evictCommand = _visitorQueue.releaseLowestPriorityCommand(); assert(tail == evictCommand.first); _visitorQueue.add(cmd); _visitorCond.notify_one(); - framework::MicroSecTime t(_component.getClock().getTimeInMicros()); - _metrics->queueEvictedWaitTime.addValue(t.getTime() - evictCommand.second); + auto now = _component.getClock().getMonotonicTime(); + // TODO is this really tracking what the metric description implies it's tracking...? + _metrics->queueEvictedWaitTime.addValue(vespalib::to_s(now - evictCommand.second) * 1000.0); // Double metric in millis failCommand = evictCommand.first; } else { failCommand = cmd; @@ -491,11 +503,12 @@ VisitorManager::attemptScheduleQueuedVisitor(MonitorGuard& visitorLock) std::shared_ptr<api::CreateVisitorCommand> cmd(_visitorQueue.peekNextCommand()); assert(cmd.get()); if (totCount < maximumConcurrent(*cmd)) { - std::pair<api::CreateVisitorCommand::SP, time_t> cmd2(_visitorQueue.releaseNextCommand()); + auto cmd2 = _visitorQueue.releaseNextCommand(); assert(cmd == cmd2.first); scheduleVisitor(cmd, true, visitorLock); - framework::MicroSecTime time(_component.getClock().getTimeInMicros()); - _metrics->queueWaitTime.addValue(time.getTime() - cmd2.second); + auto now = _component.getClock().getMonotonicTime(); + // TODO is this really tracking what the metric description implies it's tracking...? + _metrics->queueWaitTime.addValue(vespalib::to_s(now - cmd2.second) * 1000.0); // Double metric in millis // visitorLock is unlocked at this point return true; } @@ -584,7 +597,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, } out << "<h3>Queued visitors</h3>\n<ul>\n"; - framework::MicroSecTime time(_component.getClock().getTimeInMicros()); + const auto now = _component.getClock().getMonotonicTime(); for (CommandQueue<api::CreateVisitorCommand>::const_iterator it = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) { @@ -592,7 +605,7 @@ VisitorManager::reportHtmlStatus(std::ostream& out, assert(cmd); out << "<li>" << cmd->getInstanceId() << " - " << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout " - << (it->_time - time.getTime()) / 1000000 << " ms\n"; + << vespalib::count_ms(it->_time - now) << " ms\n"; } if (_visitorQueue.empty()) { out << "None\n"; diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 888c0873e9c..33703b392bc 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -84,9 +84,11 @@ private: bool _enforceQueueUse; VisitorFactory::Map _visitorFactories; public: - VisitorManager(const config::ConfigUri & configUri, StorageComponentRegister&, + VisitorManager(const config::ConfigUri & configUri, + StorageComponentRegister&, VisitorMessageSessionFactory&, - const VisitorFactory::Map& external = VisitorFactory::Map()); + const VisitorFactory::Map& external = VisitorFactory::Map(), + bool defer_manager_thread_start = false); ~VisitorManager() override; void onClose() override; @@ -115,6 +117,8 @@ public: } /** For unit testing */ bool hasPendingMessageState() const; + // Must be called exactly once iff manager was created with defer_manager_thread_start == true + void create_and_start_manager_thread(); void enforceQueueUsage() { _enforceQueueUse = true; } |