diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-21 14:42:11 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-03-21 14:42:11 +0100 |
commit | 34cf5db6ceeb838f1850558984cbe5e06d63870f (patch) | |
tree | 4ca69b11d7ad4acc9946d60d944b65d7144e59d5 | |
parent | 90db7b1ee93c737e3ab26530c321dfe66f558f53 (diff) | |
parent | 9aeff921406eb238bedf0210803b1bea441030aa (diff) |
Merge pull request #21757 from vespa-engine/vekterli/fix-visitor-mgr-test-tsan-deadlock-warning
Fix visitor manager test TSan mutex inversion warning and refactor deadline handling [run-systemtest]
10 files changed, 165 insertions, 168 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index ac4251ba21e..1e24fafed4f 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -53,10 +53,11 @@ TEST(CommandQueueTest, fifo) { ASSERT_FALSE(queue.empty()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmd( - queue.releaseNextCommand().first); - if (cmd.get() == 0) break; - commands.push_back(cmd); + auto cmd = queue.releaseNextCommand().first; + if (!cmd) { + break; + } + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("first t=1 p=0", getCommandString(commands[0])); @@ -89,12 +90,12 @@ TEST(CommandQueueTest, fifo_with_priorities) { ASSERT_FALSE(queue.empty()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekNextCommand()); - std::shared_ptr<api::CreateVisitorCommand> cmd(queue.releaseNextCommand().first); - if (cmd.get() == 0 || cmdPeek != cmd) { + auto cmdPeek = queue.peekNextCommand(); + auto cmd = queue.releaseNextCommand().first; + if (!cmd || cmdPeek != cmd) { break; } - commands.push_back(cmd); + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0])); @@ -119,17 +120,14 @@ TEST(CommandQueueTest, release_oldest) { queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); - using CommandEntry = CommandQueue<api::CreateVisitorCommand>::CommandEntry; - std::list<CommandEntry> timedOut(queue.releaseTimedOut()); + auto timedOut = queue.releaseTimedOut(); ASSERT_TRUE(timedOut.empty()); clock.addMilliSecondsToTime(400); timedOut = queue.releaseTimedOut(); ASSERT_EQ(4, timedOut.size()); std::ostringstream ost; - for (std::list<CommandEntry>::const_iterator it = timedOut.begin(); - it != timedOut.end(); ++it) - { - ost << getCommandString(it->_command) << "\n"; + for (const auto& timed_out_entry : timedOut) { + ost << getCommandString(timed_out_entry._command) << "\n"; } EXPECT_EQ("fourth t=5 p=0\n" "first t=10 p=0\n" @@ -154,13 +152,12 @@ TEST(CommandQueueTest, release_lowest_priority) { std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand()); - std::pair<std::shared_ptr<api::CreateVisitorCommand>, uint64_t> cmd( - queue.releaseLowestPriorityCommand()); - if (cmd.first.get() == 0 || cmdPeek != cmd.first) { + auto cmdPeek = queue.peekLowestPriorityCommand(); + auto cmd = queue.releaseLowestPriorityCommand().first; + if (!cmd || cmdPeek != cmd) { break; } - commands.push_back(cmd.first); + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("sixth t=14 p=50", getCommandString(commands[0])); @@ -185,20 +182,18 @@ TEST(CommandQueueTest, delete_iterator) { queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); - CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin(); + auto it = queue.begin(); ++it; ++it; queue.erase(it); ASSERT_EQ(6u, queue.size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> cmds; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmd( - std::dynamic_pointer_cast<api::CreateVisitorCommand>( - queue.releaseNextCommand().first)); - if (cmd.get() == 0) { + auto cmd = queue.releaseNextCommand().first; + if (!cmd) { break; } - cmds.push_back(cmd); + cmds.emplace_back(std::move(cmd)); } ASSERT_EQ(6, cmds.size()); EXPECT_EQ("first t=10 p=0", getCommandString(cmds[0])); diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 99172264d47..be4e7270c69 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -53,7 +53,7 @@ protected: ~VisitorManagerTest(); // Not using setUp since can't throw exception out of it. - void initializeTest(); + void initializeTest(bool defer_manager_thread_start = false); void addSomeRemoves(bool removeAll = false); void TearDown() override; TestVisitorMessageSession& getSession(uint32_t n); @@ -78,7 +78,7 @@ VisitorManagerTest::~VisitorManagerTest() = default; uint32_t VisitorManagerTest::docCount = 10; void -VisitorManagerTest::initializeTest() +VisitorManagerTest::initializeTest(bool defer_manager_thread_start) { vdstestlib::DirConfig config(getStandardConfig(true)); config.getConfig("stor-visitor").set("visitorthreads", "1"); @@ -88,7 +88,11 @@ VisitorManagerTest::initializeTest() _node->setupDummyPersistence(); _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1")); _top = std::make_unique<DummyStorageLink>(); - auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), _node->getComponentRegister(), *_messageSessionFactory); + auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), + _node->getComponentRegister(), + *_messageSessionFactory, + VisitorFactory::Map(), + defer_manager_thread_start); _manager = vm.get(); _top->push_back(std::move(vm)); _top->push_back(std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), _node->getPersistenceProvider(), @@ -753,22 +757,22 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) { } TEST_F(VisitorManagerTest, visitor_queue_timeout) { - ASSERT_NO_FATAL_FAILURE(initializeTest()); + ASSERT_NO_FATAL_FAILURE(initializeTest(true)); _manager->enforceQueueUsage(); { - std::lock_guard guard(_manager->getThread(0).getQueueMonitor()); - auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); cmd->setAddress(_Address); cmd->setQueueTimeout(1ms); cmd->setTimeout(100 * 1000 * 1000ms); + // The manager thread isn't running yet so the visitor stays on the queue _top->sendDown(cmd); - - _node->getClock().addSecondsToTime(1000); } + _node->getClock().addSecondsToTime(1000); + _manager->create_and_start_manager_thread(); + // Don't answer any messages. Make sure we timeout anyways. _top->waitForMessages(1, 60); const msg_ptr_vector replies = _top->getRepliesOnce(); diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index 70f5b2421c3..3232baf50de 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -18,7 +18,7 @@ #include <vespa/vespalib/util/printable.h> #include <vespa/vespalib/util/time.h> #include <vespa/storageframework/generic/clock/clock.h> -#include <list> +#include <vector> #include <ostream> namespace storage { @@ -28,17 +28,21 @@ class CommandQueue : public vespalib::Printable { public: struct CommandEntry { - typedef typename Command::Priority PriorityType; + using PriorityType = typename Command::Priority; + std::shared_ptr<Command> _command; - uint64_t _time; - uint64_t _sequenceId; - PriorityType _priority; + vespalib::steady_time _deadline; + uint64_t _sequenceId; + PriorityType _priority; CommandEntry(const std::shared_ptr<Command>& cmd, - uint64_t time, + vespalib::steady_time deadline, uint64_t sequenceId, PriorityType priority) - : _command(cmd), _time(time), _sequenceId(sequenceId), _priority(priority) + : _command(cmd), + _deadline(deadline), + _sequenceId(sequenceId), + _priority(priority) {} // Sort on both priority and sequence ID @@ -51,23 +55,22 @@ public: }; private: - typedef boost::multi_index::multi_index_container< + using CommandList = boost::multi_index::multi_index_container< CommandEntry, boost::multi_index::indexed_by< boost::multi_index::ordered_unique< boost::multi_index::identity<CommandEntry> >, boost::multi_index::ordered_non_unique< - boost::multi_index::member<CommandEntry, uint64_t, &CommandEntry::_time> + boost::multi_index::member<CommandEntry, vespalib::steady_time, &CommandEntry::_deadline> > > - > CommandList; - typedef typename boost::multi_index - ::nth_index<CommandList, 1>::type timelist; + >; + using timelist = typename boost::multi_index::nth_index<CommandList, 1>::type; const framework::Clock& _clock; - mutable CommandList _commands; - uint64_t _sequenceId; + mutable CommandList _commands; + uint64_t _sequenceId; public: typedef typename CommandList::iterator iterator; @@ -76,7 +79,7 @@ public: typedef typename CommandList::const_reverse_iterator const_reverse_iterator; typedef typename timelist::const_iterator const_titerator; - CommandQueue(const framework::Clock& clock) + explicit CommandQueue(const framework::Clock& clock) : _clock(clock), _sequenceId(0) {} @@ -89,22 +92,22 @@ public: const_iterator end() const { return _commands.end(); } const_titerator tbegin() const { - timelist& tl = boost::multi_index::get<1>(_commands); + auto& tl = boost::multi_index::get<1>(_commands); return tl.begin(); } const_titerator tend() const { - timelist& tl = boost::multi_index::get<1>(_commands); + auto& tl = boost::multi_index::get<1>(_commands); return tl.end(); } bool empty() const { return _commands.empty(); } uint32_t size() const { return _commands.size(); } - std::pair<std::shared_ptr<Command>, time_t> releaseNextCommand(); + std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseNextCommand(); std::shared_ptr<Command> peekNextCommand() const; void add(const std::shared_ptr<Command>& msg); void erase(iterator it) { _commands.erase(it); } - std::list<CommandEntry> releaseTimedOut(); - std::pair<std::shared_ptr<Command>, time_t> releaseLowestPriorityCommand(); + std::vector<CommandEntry> releaseTimedOut(); + std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand(); std::shared_ptr<Command> peekLowestPriorityCommand() const; void clear() { return _commands.clear(); } @@ -113,15 +116,14 @@ public: template<class Command> -std::pair<std::shared_ptr<Command>, time_t> +std::pair<std::shared_ptr<Command>, vespalib::steady_time> CommandQueue<Command>::releaseNextCommand() { - std::pair<std::shared_ptr<Command>, time_t> retVal( - std::shared_ptr<Command>(), 0); + std::pair<std::shared_ptr<Command>, vespalib::steady_time> retVal; if (!_commands.empty()) { iterator first = _commands.begin(); retVal.first = first->_command; - retVal.second = first->_time; + retVal.second = first->_deadline; _commands.erase(first); } return retVal; @@ -143,38 +145,36 @@ template<class Command> void CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd) { - framework::MicroSecTime time(_clock.getTimeInMicros() - + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout()))); - _commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority())); + auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout(); + _commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority())); } template<class Command> -std::list<typename CommandQueue<Command>::CommandEntry> +std::vector<typename CommandQueue<Command>::CommandEntry> CommandQueue<Command>::releaseTimedOut() { - std::list<CommandEntry> mylist; - framework::MicroSecTime time(_clock.getTimeInMicros()); - while (!empty() && tbegin()->_time <= time.getTime()) { - mylist.push_back(*tbegin()); + std::vector<CommandEntry> timed_out; + auto now = _clock.getMonotonicTime(); + while (!empty() && (tbegin()->_deadline <= now)) { + timed_out.emplace_back(*tbegin()); timelist& tl = boost::multi_index::get<1>(_commands); tl.erase(tbegin()); } - return mylist; + return timed_out; } template <class Command> -std::pair<std::shared_ptr<Command>, time_t> +std::pair<std::shared_ptr<Command>, vespalib::steady_time> CommandQueue<Command>::releaseLowestPriorityCommand() { if (!_commands.empty()) { iterator last = (++_commands.rbegin()).base(); - time_t time = last->_time; + auto deadline = last->_deadline; std::shared_ptr<Command> cmd(last->_command); _commands.erase(last); - return std::pair<std::shared_ptr<Command>, time_t>(cmd, time); + return {cmd, deadline}; } else { - return std::pair<std::shared_ptr<Command>, time_t>( - std::shared_ptr<Command>(), 0); + return {}; } } @@ -198,12 +198,12 @@ CommandQueue<Command>::print(std::ostream& out, bool verbose, const std::string& out << "Insert order:\n"; for (const_iterator it = begin(); it != end(); ++it) { out << indent << *it->_command << ", priority " << it->_priority - << ", time " << it->_time << "\n"; + << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()) << "\n"; } out << indent << "Time order:"; for (const_titerator it = tbegin(); it != tend(); ++it) { out << "\n" << indent << *it->_command << ", priority " << it->_priority - << ", time " << it->_time; + << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()); } } diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h index 11192021577..aa1c34d10b7 100644 --- a/storage/src/vespa/storage/visiting/visitor.h +++ b/storage/src/vespa/storage/visiting/visitor.h @@ -70,7 +70,7 @@ public: */ virtual void closed(api::VisitorId id) = 0; - virtual ~VisitorMessageHandler() {} + virtual ~VisitorMessageHandler() = default; }; /** diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index eb5c927873c..282299ebbc1 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -22,7 +22,8 @@ namespace storage { VisitorManager::VisitorManager(const config::ConfigUri & configUri, StorageComponentRegister& componentRegister, VisitorMessageSessionFactory& messageSF, - const VisitorFactory::Map& externalFactories) + const VisitorFactory::Map& externalFactories, + bool defer_manager_thread_start) : StorageLink("Visitor Manager"), framework::HtmlStatusReporter("visitorman", "Visitor Manager"), _componentRegister(componentRegister), @@ -51,13 +52,15 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this); _configFetcher->start(); _component.registerMetric(*_metrics); - _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ); + if (!defer_manager_thread_start) { + create_and_start_manager_thread(); + } _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); - _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>(); + _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>(); _visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>(); - _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>(); - _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>(); - _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>(); + _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>(); + _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>(); + _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>(); _visitorFactories["reindexingvisitor"] = std::make_shared<ReindexingVisitorFactory>(); _component.registerStatusPage(*this); } @@ -65,7 +68,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, VisitorManager::~VisitorManager() { closeNextLink(); LOG(debug, "Deleting link %s.", toString().c_str()); - if (_thread.get() != 0) { + if (_thread) { _thread->interrupt(); _visitorCond.notify_all(); _thread->join(); @@ -74,6 +77,13 @@ VisitorManager::~VisitorManager() { } void +VisitorManager::create_and_start_manager_thread() +{ + assert(!_thread); + _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ); +} + +void VisitorManager::updateMetrics(const MetricLockGuard &) { _metrics->queueSize.addValue(_visitorQueue.size()); @@ -86,17 +96,15 @@ VisitorManager::onClose() _configFetcher->close(); { std::lock_guard sync(_visitorLock); - for (CommandQueue<api::CreateVisitorCommand>::iterator it - = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) - { - auto reply = std::make_shared<api::CreateVisitorReply>(*it->_command); + for (auto& enqueued : _visitorQueue) { + auto reply = std::make_shared<api::CreateVisitorReply>(*enqueued._command); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); sendUp(reply); } _visitorQueue.clear(); } - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - _visitorThread[i].first->shutdown(); + for (auto& visitor_thread : _visitorThread) { + visitor_thread.first->shutdown(); } } @@ -145,7 +153,7 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi maxConcurrentVisitorsVariable = 0; } - bool liveUpdate = (_visitorThread.size() > 0); + bool liveUpdate = !_visitorThread.empty(); if (liveUpdate) { if (_visitorThread.size() != static_cast<uint32_t>(config->visitorthreads)) { LOG(warning, "Ignoring config change requesting %u visitor " @@ -177,7 +185,10 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi _metrics->initThreads(config->visitorthreads); for (int32_t i=0; i<config->visitorthreads; ++i) { _visitorThread.emplace_back( - new VisitorThread(i, _componentRegister, _messageSessionFactory, _visitorFactories, *_metrics->threads[i], *this), + // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager + std::shared_ptr<VisitorThread>( + new VisitorThread(i, _componentRegister, _messageSessionFactory, + _visitorFactories, *_metrics->threads[i], *this)), std::map<api::VisitorId, std::string>()); } } @@ -196,8 +207,8 @@ VisitorManager::run(framework::ThreadHandle& thread) { LOG(debug, "Started visitor manager thread with pid %d.", getpid()); typedef CommandQueue<api::CreateVisitorCommand> CQ; - std::list<CQ::CommandEntry> timedOut; - // Run forever, dump messages in the visitor queue that times out. + std::vector<CQ::CommandEntry> timedOut; + // Run forever, dump messages in the visitor queue that times out. while (true) { thread.registerTick(framework::PROCESS_CYCLE); { @@ -207,13 +218,12 @@ VisitorManager::run(framework::ThreadHandle& thread) } timedOut = _visitorQueue.releaseTimedOut(); } - framework::MicroSecTime currentTime(_component.getClock().getTimeInMicros()); - for (std::list<CQ::CommandEntry>::iterator it = timedOut.begin(); - it != timedOut.end(); ++it) - { - _metrics->queueTimeoutWaitTime.addValue(currentTime.getTime() - it->_time); - std::shared_ptr<api::StorageReply> reply(it->_command->makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::BUSY,"Visitor timed out in visitor queue")); + const auto currentTime = _component.getClock().getMonotonicTime(); + for (auto& timed_out_entry : timedOut) { + // TODO is this really tracking what the metric description implies it's tracking...? + _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - timed_out_entry._deadline) * 1000.0); // Double metric in millis + std::shared_ptr<api::StorageReply> reply(timed_out_entry._command->makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue")); sendUp(reply); } { @@ -224,10 +234,10 @@ VisitorManager::run(framework::ThreadHandle& thread) _visitorCond.wait_for(waiter, 1000ms); thread.registerTick(framework::WAIT_CYCLE); } else { - uint64_t timediff = (_visitorQueue.tbegin()->_time- currentTime.getTime()) / 1000000; - timediff = std::min(timediff, uint64_t(1000)); - if (timediff > 0) { - _visitorCond.wait_for(waiter, std::chrono::milliseconds(timediff)); + auto time_diff = _visitorQueue.tbegin()->_deadline - currentTime; + time_diff = (time_diff < 1000ms) ? time_diff : 1000ms; + if (time_diff.count() > 0) { + _visitorCond.wait_for(waiter, time_diff); thread.registerTick(framework::WAIT_CYCLE); } } @@ -256,8 +266,8 @@ VisitorManager::getActiveVisitorCount() const { std::lock_guard sync(_visitorLock); uint32_t totalCount = 0; - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - totalCount += _visitorThread[i].second.size(); + for (const auto& visitor_thread : _visitorThread) { + totalCount += visitor_thread.second.size(); } return totalCount; } @@ -274,8 +284,8 @@ void VisitorManager::setTimeBetweenTicks(uint32_t time) { std::lock_guard sync(_visitorLock); - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - _visitorThread[i].first->setTimeBetweenTicks(time); + for (auto& visitor_thread : _visitorThread) { + visitor_thread.first->setTimeBetweenTicks(time); } } @@ -285,8 +295,8 @@ VisitorManager::scheduleVisitor( MonitorGuard& visitorLock) { api::VisitorId id; - typedef std::map<std::string, api::VisitorId> NameToIdMap; - typedef std::pair<std::string, api::VisitorId> NameIdPair; + using NameToIdMap = std::map<std::string, api::VisitorId>; + using NameIdPair = std::pair<std::string, api::VisitorId>; std::pair<NameToIdMap::iterator, bool> newEntry; { uint32_t totCount; @@ -307,12 +317,13 @@ VisitorManager::scheduleVisitor( std::shared_ptr<api::CreateVisitorCommand> tail(_visitorQueue.peekLowestPriorityCommand()); // Lower int ==> higher pri if (cmd->getPriority() < tail->getPriority()) { - std::pair<api::CreateVisitorCommand::SP, time_t> evictCommand(_visitorQueue.releaseLowestPriorityCommand()); + auto evictCommand = _visitorQueue.releaseLowestPriorityCommand(); assert(tail == evictCommand.first); _visitorQueue.add(cmd); _visitorCond.notify_one(); - framework::MicroSecTime t(_component.getClock().getTimeInMicros()); - _metrics->queueEvictedWaitTime.addValue(t.getTime() - evictCommand.second); + auto now = _component.getClock().getMonotonicTime(); + // TODO is this really tracking what the metric description implies it's tracking...? + _metrics->queueEvictedWaitTime.addValue(vespalib::to_s(now - evictCommand.second) * 1000.0); // Double metric in millis failCommand = evictCommand.first; } else { failCommand = cmd; @@ -445,8 +456,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, // Only add to internal state if not destroy iterator command, as // these are considered special-cased fire-and-forget commands // that don't have replies. - if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) - { + if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) { MessageInfo inf; inf.id = visitor.getVisitorId(); inf.timestamp = _component.getClock().getTimeInSeconds().getTime(); @@ -488,14 +498,15 @@ VisitorManager::attemptScheduleQueuedVisitor(MonitorGuard& visitorLock) uint32_t totCount; getLeastLoadedThread(_visitorThread, totCount); - std::shared_ptr<api::CreateVisitorCommand> cmd(_visitorQueue.peekNextCommand()); + auto cmd = _visitorQueue.peekNextCommand(); assert(cmd.get()); if (totCount < maximumConcurrent(*cmd)) { - std::pair<api::CreateVisitorCommand::SP, time_t> cmd2(_visitorQueue.releaseNextCommand()); + auto cmd2 = _visitorQueue.releaseNextCommand(); assert(cmd == cmd2.first); scheduleVisitor(cmd, true, visitorLock); - framework::MicroSecTime time(_component.getClock().getTimeInMicros()); - _metrics->queueWaitTime.addValue(time.getTime() - cmd2.second); + auto now = _component.getClock().getMonotonicTime(); + // TODO is this really tracking what the metric description implies it's tracking...? + _metrics->queueWaitTime.addValue(vespalib::to_s(now - cmd2.second) * 1000.0); // Double metric in millis // visitorLock is unlocked at this point return true; } @@ -506,9 +517,9 @@ void VisitorManager::closed(api::VisitorId id) { std::unique_lock sync(_visitorLock); - std::map<api::VisitorId, std::string>& usedIds(_visitorThread[id % _visitorThread.size()].second); + auto& usedIds(_visitorThread[id % _visitorThread.size()].second); - std::map<api::VisitorId, std::string>::iterator it = usedIds.find(id); + auto it = usedIds.find(id); if (it == usedIds.end()) { LOG(warning, "VisitorManager::closed() called multiple times for the " "same visitor. This was not intended."); @@ -570,35 +581,30 @@ VisitorManager::reportHtmlStatus(std::ostream& out, for (uint32_t i=0; i<_visitorThread.size(); ++i) { visitorCount += _visitorThread[i].second.size(); out << "Thread " << i << ":"; - if (_visitorThread[i].second.size() == 0) { + if (_visitorThread[i].second.empty()) { out << " none"; } else { - for (std::map<api::VisitorId,std::string>::const_iterator it - = _visitorThread[i].second.begin(); - it != _visitorThread[i].second.end(); it++) - { - out << " " << it->second << " (" << it->first << ")"; + for (const auto& id_and_visitor : _visitorThread[i].second) { + out << " " << id_and_visitor.second << " (" << id_and_visitor.first << ")"; } } out << "<br>\n"; } out << "<h3>Queued visitors</h3>\n<ul>\n"; - framework::MicroSecTime time(_component.getClock().getTimeInMicros()); - for (CommandQueue<api::CreateVisitorCommand>::const_iterator it - = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) - { - std::shared_ptr<api::CreateVisitorCommand> cmd(it->_command); + const auto now = _component.getClock().getMonotonicTime(); + for (const auto& enqueued : _visitorQueue) { + auto& cmd = enqueued._command; assert(cmd); out << "<li>" << cmd->getInstanceId() << " - " << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout " - << (it->_time - time.getTime()) / 1000000 << " ms\n"; + << vespalib::count_ms(enqueued._deadline - now) << " ms\n"; } if (_visitorQueue.empty()) { out << "None\n"; } out << "</ul>\n"; - if (_visitorMessages.size() > 0) { + if (!_visitorMessages.empty()) { out << "<h3>Waiting for the following visitor replies</h3>" << "\n<table><tr>" << "<th>Storage API message id</th>" diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 888c0873e9c..33703b392bc 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -84,9 +84,11 @@ private: bool _enforceQueueUse; VisitorFactory::Map _visitorFactories; public: - VisitorManager(const config::ConfigUri & configUri, StorageComponentRegister&, + VisitorManager(const config::ConfigUri & configUri, + StorageComponentRegister&, VisitorMessageSessionFactory&, - const VisitorFactory::Map& external = VisitorFactory::Map()); + const VisitorFactory::Map& external = VisitorFactory::Map(), + bool defer_manager_thread_start = false); ~VisitorManager() override; void onClose() override; @@ -115,6 +117,8 @@ public: } /** For unit testing */ bool hasPendingMessageState() const; + // Must be called exactly once iff manager was created with defer_manager_thread_start == true + void create_and_start_manager_thread(); void enforceQueueUsage() { _enforceQueueUse = true; } diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index ef01a684901..cc3e709a848 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -24,7 +24,7 @@ namespace storage { VisitorThread::Event::Event(Event&& other) noexcept : _visitorId(other._visitorId), - _message(other._message), + _message(std::move(other._message)), _mbusReply(std::move(other._mbusReply)), _timer(other._timer), _type(other._type) @@ -37,18 +37,18 @@ VisitorThread::Event& VisitorThread::Event::operator= (Event&& other) noexcept { _visitorId = other._visitorId; - _message = other._message; + _message = std::move(other._message); _mbusReply = std::move(other._mbusReply); _timer = other._timer; _type = other._type; return *this; } -VisitorThread::Event::Event(api::VisitorId visitor, const std::shared_ptr<api::StorageMessage>& msg) +VisitorThread::Event::Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg) : _visitorId(visitor), - _message(msg), + _message(std::move(msg)), _timer(), - _type(PERSISTENCE) + _type(Type::PERSISTENCE) { } @@ -56,7 +56,7 @@ VisitorThread::Event::Event(api::VisitorId visitor, mbus::Reply::UP reply) : _visitorId(visitor), _mbusReply(std::move(reply)), _timer(), - _type(MBUS) + _type(Type::MBUS) { } @@ -102,7 +102,7 @@ VisitorThread::VisitorThread(uint32_t threadIndex, VisitorThread::~VisitorThread() { - if (_thread.get() != 0) { + if (_thread) { _thread->interruptAndJoin(_cond); } } @@ -125,12 +125,11 @@ VisitorThread::shutdown() // Answer all queued up commands and clear queue { std::lock_guard sync(_lock); - for (const Event & event : _queue) - { + for (const Event & event : _queue) { if (event._message.get()) { if (!event._message->getType().isReply() - && (event._message->getType() != api::MessageType::INTERNAL - || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) + && (event._message->getType() != api::MessageType::INTERNAL + || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( static_cast<api::StorageCommand&>(*event._message).makeReply()); @@ -142,9 +141,7 @@ VisitorThread::shutdown() _queue.clear(); } // Close all visitors. Send create visitor replies - for (VisitorMap::iterator it = _visitors.begin(); - it != _visitors.end();) - { + for (auto it = _visitors.begin(); it != _visitors.end();) { LOG(debug, "Force-closing visitor %s as we're shutting down.", it->second->getVisitorName().c_str()); _currentlyRunningVisitor = it++; @@ -158,9 +155,8 @@ VisitorThread::processMessage(api::VisitorId id, const std::shared_ptr<api::StorageMessage>& msg) { { - Event m(id, msg); std::unique_lock sync(_lock); - _queue.push_back(Event(id, msg)); + _queue.emplace_back(id, msg); } _cond.notify_one(); } diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index ebae51ed2b2..226e7c0631b 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -33,15 +33,15 @@ class VisitorThread : public framework::Runnable, private api::MessageHandler, private framework::MetricUpdateHook { - typedef std::map<std::string, std::shared_ptr<VisitorEnvironment> > LibMap; + using LibMap = std::map<std::string, std::shared_ptr<VisitorEnvironment>>; LibMap _libs; - typedef std::map<api::VisitorId, std::shared_ptr<Visitor> > VisitorMap; + using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>; VisitorMap _visitors; - std::deque<std::pair<api::VisitorId, framework::SecondTime> > _recentlyCompleted; + std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted; struct Event { - enum Type { + enum class Type { MBUS, PERSISTENCE, NONE @@ -50,21 +50,20 @@ class VisitorThread : public framework::Runnable, api::VisitorId _visitorId; std::shared_ptr<api::StorageMessage> _message; mbus::Reply::UP _mbusReply; - metrics::MetricTimer _timer; Type _type; - Event() noexcept : _visitorId(0), _message(), _timer(), _type(NONE) {} + Event() noexcept : _visitorId(0), _message(), _timer(), _type(Type::NONE) {} Event(Event&& other) noexcept; Event& operator= (Event&& other) noexcept; Event(const Event& other) = delete; Event& operator= (const Event& other) = delete; Event(api::VisitorId visitor, mbus::Reply::UP reply); - Event(api::VisitorId visitor, const std::shared_ptr<api::StorageMessage>& msg); + Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg); ~Event(); - bool empty() const noexcept { - return (_type == NONE); + [[nodiscard]] bool empty() const noexcept { + return (_type == Type::NONE); } }; @@ -105,9 +104,6 @@ public: void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); } void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor); - /** For unit tests needing to pause thread. */ - std::mutex & getQueueMonitor() { return _lock; } - const VisitorThreadMetrics& getMetrics() const noexcept { return _metrics; } @@ -130,8 +126,6 @@ private: vespalib::asciistream & error); bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override; - - bool onVisitorReply(const std::shared_ptr<api::StorageReply>& reply); bool onInternal(const std::shared_ptr<api::InternalCommand>&) override; bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override; diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h index b4157c7860a..30d59e5fe4b 100644 --- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h +++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h @@ -36,9 +36,7 @@ public: void setSourceIndex(uint16_t sourceIndex) { _sourceIndex = sourceIndex; } uint16_t getSourceIndex() const { return _sourceIndex; } - /** Set timeout in milliseconds. */ - void setTimeout(duration milliseconds) { _timeout = milliseconds; } - /** Get timeout in milliseconds. */ + void setTimeout(duration timeout) { _timeout = timeout; } duration getTimeout() const { return _timeout; } /** Used to set a new id so the message can be resent. */ diff --git a/storageframework/src/vespa/storageframework/generic/clock/clock.h b/storageframework/src/vespa/storageframework/generic/clock/clock.h index becd17da8a8..c9b8f652bfe 100644 --- a/storageframework/src/vespa/storageframework/generic/clock/clock.h +++ b/storageframework/src/vespa/storageframework/generic/clock/clock.h @@ -21,7 +21,7 @@ namespace storage::framework { struct Clock { using UP = std::unique_ptr<Clock>; - virtual ~Clock() {} + virtual ~Clock() = default; virtual MicroSecTime getTimeInMicros() const = 0; virtual MilliSecTime getTimeInMillis() const = 0; |