From 9aeff921406eb238bedf0210803b1bea441030aa Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Fri, 18 Mar 2022 16:37:19 +0000 Subject: Clean up and refactor visitor management code --- storage/src/tests/visiting/commandqueuetest.cpp | 44 ++++++------ storage/src/vespa/storage/visiting/commandqueue.h | 46 +++++++------ .../src/vespa/storage/visiting/visitormanager.cpp | 79 ++++++++++------------ .../src/vespa/storage/visiting/visitorthread.cpp | 28 ++++---- storage/src/vespa/storage/visiting/visitorthread.h | 22 +++--- 5 files changed, 101 insertions(+), 118 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index 4d528ce4971..1e24fafed4f 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -53,10 +53,11 @@ TEST(CommandQueueTest, fifo) { ASSERT_FALSE(queue.empty()); std::vector> commands; for (;;) { - std::shared_ptr cmd( - queue.releaseNextCommand().first); - if (cmd.get() == 0) break; - commands.push_back(cmd); + auto cmd = queue.releaseNextCommand().first; + if (!cmd) { + break; + } + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("first t=1 p=0", getCommandString(commands[0])); @@ -89,12 +90,12 @@ TEST(CommandQueueTest, fifo_with_priorities) { ASSERT_FALSE(queue.empty()); std::vector> commands; for (;;) { - std::shared_ptr cmdPeek(queue.peekNextCommand()); - std::shared_ptr cmd(queue.releaseNextCommand().first); - if (cmd.get() == 0 || cmdPeek != cmd) { + auto cmdPeek = queue.peekNextCommand(); + auto cmd = queue.releaseNextCommand().first; + if (!cmd || cmdPeek != cmd) { break; } - commands.push_back(cmd); + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0])); @@ -119,17 +120,14 @@ TEST(CommandQueueTest, release_oldest) { queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); - using CommandEntry = CommandQueue::CommandEntry; - std::list timedOut(queue.releaseTimedOut()); + auto timedOut = queue.releaseTimedOut(); ASSERT_TRUE(timedOut.empty()); clock.addMilliSecondsToTime(400); timedOut = queue.releaseTimedOut(); ASSERT_EQ(4, timedOut.size()); std::ostringstream ost; - for (std::list::const_iterator it = timedOut.begin(); - it != timedOut.end(); ++it) - { - ost << getCommandString(it->_command) << "\n"; + for (const auto& timed_out_entry : timedOut) { + ost << getCommandString(timed_out_entry._command) << "\n"; } EXPECT_EQ("fourth t=5 p=0\n" "first t=10 p=0\n" @@ -154,12 +152,12 @@ TEST(CommandQueueTest, release_lowest_priority) { std::vector> commands; for (;;) { - std::shared_ptr cmdPeek(queue.peekLowestPriorityCommand()); - auto cmd_and_deadline = queue.releaseLowestPriorityCommand(); - if (!cmd_and_deadline.first || cmdPeek != cmd_and_deadline.first) { + auto cmdPeek = queue.peekLowestPriorityCommand(); + auto cmd = queue.releaseLowestPriorityCommand().first; + if (!cmd || cmdPeek != cmd) { break; } - commands.push_back(cmd_and_deadline.first); + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("sixth t=14 p=50", getCommandString(commands[0])); @@ -184,20 +182,18 @@ TEST(CommandQueueTest, delete_iterator) { queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); - CommandQueue::iterator it = queue.begin(); + auto it = queue.begin(); ++it; ++it; queue.erase(it); ASSERT_EQ(6u, queue.size()); std::vector> cmds; for (;;) { - std::shared_ptr cmd( - std::dynamic_pointer_cast( - queue.releaseNextCommand().first)); - if (cmd.get() == 0) { + auto cmd = queue.releaseNextCommand().first; + if (!cmd) { break; } - cmds.push_back(cmd); + cmds.emplace_back(std::move(cmd)); } ASSERT_EQ(6, cmds.size()); EXPECT_EQ("first t=10 p=0", getCommandString(cmds[0])); diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index 0f345aa0d94..3232baf50de 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -18,7 +18,7 @@ #include #include #include -#include +#include #include namespace storage { @@ -28,17 +28,21 @@ class CommandQueue : public vespalib::Printable { public: struct CommandEntry { - typedef typename Command::Priority PriorityType; + using PriorityType = typename Command::Priority; + std::shared_ptr _command; - vespalib::steady_time _time; - uint64_t _sequenceId; - PriorityType _priority; + vespalib::steady_time _deadline; + uint64_t _sequenceId; + PriorityType _priority; CommandEntry(const std::shared_ptr& cmd, - vespalib::steady_time time, + vespalib::steady_time deadline, uint64_t sequenceId, PriorityType priority) - : _command(cmd), _time(time), _sequenceId(sequenceId), _priority(priority) + : _command(cmd), + _deadline(deadline), + _sequenceId(sequenceId), + _priority(priority) {} // Sort on both priority and sequence ID @@ -58,15 +62,15 @@ private: boost::multi_index::identity >, boost::multi_index::ordered_non_unique< - boost::multi_index::member + boost::multi_index::member > > >; using timelist = typename boost::multi_index::nth_index::type; const framework::Clock& _clock; - mutable CommandList _commands; - uint64_t _sequenceId; + mutable CommandList _commands; + uint64_t _sequenceId; public: typedef typename CommandList::iterator iterator; @@ -102,7 +106,7 @@ public: std::shared_ptr peekNextCommand() const; void add(const std::shared_ptr& msg); void erase(iterator it) { _commands.erase(it); } - std::list releaseTimedOut(); + std::vector releaseTimedOut(); std::pair, vespalib::steady_time> releaseLowestPriorityCommand(); std::shared_ptr peekLowestPriorityCommand() const; @@ -119,7 +123,7 @@ CommandQueue::releaseNextCommand() if (!_commands.empty()) { iterator first = _commands.begin(); retVal.first = first->_command; - retVal.second = first->_time; + retVal.second = first->_deadline; _commands.erase(first); } return retVal; @@ -146,17 +150,17 @@ CommandQueue::add(const std::shared_ptr& cmd) } template -std::list::CommandEntry> +std::vector::CommandEntry> CommandQueue::releaseTimedOut() { - std::list mylist; + std::vector timed_out; auto now = _clock.getMonotonicTime(); - while (!empty() && tbegin()->_time <= now) { - mylist.push_back(*tbegin()); + while (!empty() && (tbegin()->_deadline <= now)) { + timed_out.emplace_back(*tbegin()); timelist& tl = boost::multi_index::get<1>(_commands); tl.erase(tbegin()); } - return mylist; + return timed_out; } template @@ -165,10 +169,10 @@ CommandQueue::releaseLowestPriorityCommand() { if (!_commands.empty()) { iterator last = (++_commands.rbegin()).base(); - auto time = last->_time; + auto deadline = last->_deadline; std::shared_ptr cmd(last->_command); _commands.erase(last); - return {cmd, time}; + return {cmd, deadline}; } else { return {}; } @@ -194,12 +198,12 @@ CommandQueue::print(std::ostream& out, bool verbose, const std::string& out << "Insert order:\n"; for (const_iterator it = begin(); it != end(); ++it) { out << indent << *it->_command << ", priority " << it->_priority - << ", time " << vespalib::count_ms(it->_time.time_since_epoch()) << "\n"; + << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()) << "\n"; } out << indent << "Time order:"; for (const_titerator it = tbegin(); it != tend(); ++it) { out << "\n" << indent << *it->_command << ", priority " << it->_priority - << ", time " << vespalib::count_ms(it->_time.time_since_epoch()); + << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()); } } diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index de418fe61db..282299ebbc1 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -56,11 +56,11 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, create_and_start_manager_thread(); } _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); - _visitorFactories["dumpvisitor"] = std::make_shared(); + _visitorFactories["dumpvisitor"] = std::make_shared(); _visitorFactories["dumpvisitorsingle"] = std::make_shared(); - _visitorFactories["testvisitor"] = std::make_shared(); - _visitorFactories["countvisitor"] = std::make_shared(); - _visitorFactories["recoveryvisitor"] = std::make_shared(); + _visitorFactories["testvisitor"] = std::make_shared(); + _visitorFactories["countvisitor"] = std::make_shared(); + _visitorFactories["recoveryvisitor"] = std::make_shared(); _visitorFactories["reindexingvisitor"] = std::make_shared(); _component.registerStatusPage(*this); } @@ -96,17 +96,15 @@ VisitorManager::onClose() _configFetcher->close(); { std::lock_guard sync(_visitorLock); - for (CommandQueue::iterator it - = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) - { - auto reply = std::make_shared(*it->_command); + for (auto& enqueued : _visitorQueue) { + auto reply = std::make_shared(*enqueued._command); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); sendUp(reply); } _visitorQueue.clear(); } - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - _visitorThread[i].first->shutdown(); + for (auto& visitor_thread : _visitorThread) { + visitor_thread.first->shutdown(); } } @@ -155,7 +153,7 @@ VisitorManager::configure(std::unique_ptr 0); + bool liveUpdate = !_visitorThread.empty(); if (liveUpdate) { if (_visitorThread.size() != static_cast(config->visitorthreads)) { LOG(warning, "Ignoring config change requesting %u visitor " @@ -187,7 +185,10 @@ VisitorManager::configure(std::unique_ptrinitThreads(config->visitorthreads); for (int32_t i=0; ivisitorthreads; ++i) { _visitorThread.emplace_back( - new VisitorThread(i, _componentRegister, _messageSessionFactory, _visitorFactories, *_metrics->threads[i], *this), + // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager + std::shared_ptr( + new VisitorThread(i, _componentRegister, _messageSessionFactory, + _visitorFactories, *_metrics->threads[i], *this)), std::map()); } } @@ -206,8 +207,8 @@ VisitorManager::run(framework::ThreadHandle& thread) { LOG(debug, "Started visitor manager thread with pid %d.", getpid()); typedef CommandQueue CQ; - std::list timedOut; - // Run forever, dump messages in the visitor queue that times out. + std::vector timedOut; + // Run forever, dump messages in the visitor queue that times out. while (true) { thread.registerTick(framework::PROCESS_CYCLE); { @@ -218,12 +219,10 @@ VisitorManager::run(framework::ThreadHandle& thread) timedOut = _visitorQueue.releaseTimedOut(); } const auto currentTime = _component.getClock().getMonotonicTime(); - for (std::list::iterator it = timedOut.begin(); - it != timedOut.end(); ++it) - { + for (auto& timed_out_entry : timedOut) { // TODO is this really tracking what the metric description implies it's tracking...? - _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - it->_time) * 1000.0); // Double metric in millis - std::shared_ptr reply(it->_command->makeReply()); + _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - timed_out_entry._deadline) * 1000.0); // Double metric in millis + std::shared_ptr reply(timed_out_entry._command->makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue")); sendUp(reply); } @@ -235,7 +234,7 @@ VisitorManager::run(framework::ThreadHandle& thread) _visitorCond.wait_for(waiter, 1000ms); thread.registerTick(framework::WAIT_CYCLE); } else { - auto time_diff = _visitorQueue.tbegin()->_time - currentTime; + auto time_diff = _visitorQueue.tbegin()->_deadline - currentTime; time_diff = (time_diff < 1000ms) ? time_diff : 1000ms; if (time_diff.count() > 0) { _visitorCond.wait_for(waiter, time_diff); @@ -267,8 +266,8 @@ VisitorManager::getActiveVisitorCount() const { std::lock_guard sync(_visitorLock); uint32_t totalCount = 0; - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - totalCount += _visitorThread[i].second.size(); + for (const auto& visitor_thread : _visitorThread) { + totalCount += visitor_thread.second.size(); } return totalCount; } @@ -285,8 +284,8 @@ void VisitorManager::setTimeBetweenTicks(uint32_t time) { std::lock_guard sync(_visitorLock); - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - _visitorThread[i].first->setTimeBetweenTicks(time); + for (auto& visitor_thread : _visitorThread) { + visitor_thread.first->setTimeBetweenTicks(time); } } @@ -296,8 +295,8 @@ VisitorManager::scheduleVisitor( MonitorGuard& visitorLock) { api::VisitorId id; - typedef std::map NameToIdMap; - typedef std::pair NameIdPair; + using NameToIdMap = std::map; + using NameIdPair = std::pair; std::pair newEntry; { uint32_t totCount; @@ -457,8 +456,7 @@ VisitorManager::send(const std::shared_ptr& cmd, // Only add to internal state if not destroy iterator command, as // these are considered special-cased fire-and-forget commands // that don't have replies. - if (static_cast(*cmd).getType() != DestroyIteratorCommand::ID) - { + if (static_cast(*cmd).getType() != DestroyIteratorCommand::ID) { MessageInfo inf; inf.id = visitor.getVisitorId(); inf.timestamp = _component.getClock().getTimeInSeconds().getTime(); @@ -500,7 +498,7 @@ VisitorManager::attemptScheduleQueuedVisitor(MonitorGuard& visitorLock) uint32_t totCount; getLeastLoadedThread(_visitorThread, totCount); - std::shared_ptr cmd(_visitorQueue.peekNextCommand()); + auto cmd = _visitorQueue.peekNextCommand(); assert(cmd.get()); if (totCount < maximumConcurrent(*cmd)) { auto cmd2 = _visitorQueue.releaseNextCommand(); @@ -519,9 +517,9 @@ void VisitorManager::closed(api::VisitorId id) { std::unique_lock sync(_visitorLock); - std::map& usedIds(_visitorThread[id % _visitorThread.size()].second); + auto& usedIds(_visitorThread[id % _visitorThread.size()].second); - std::map::iterator it = usedIds.find(id); + auto it = usedIds.find(id); if (it == usedIds.end()) { LOG(warning, "VisitorManager::closed() called multiple times for the " "same visitor. This was not intended."); @@ -583,14 +581,11 @@ VisitorManager::reportHtmlStatus(std::ostream& out, for (uint32_t i=0; i<_visitorThread.size(); ++i) { visitorCount += _visitorThread[i].second.size(); out << "Thread " << i << ":"; - if (_visitorThread[i].second.size() == 0) { + if (_visitorThread[i].second.empty()) { out << " none"; } else { - for (std::map::const_iterator it - = _visitorThread[i].second.begin(); - it != _visitorThread[i].second.end(); it++) - { - out << " " << it->second << " (" << it->first << ")"; + for (const auto& id_and_visitor : _visitorThread[i].second) { + out << " " << id_and_visitor.second << " (" << id_and_visitor.first << ")"; } } out << "
\n"; @@ -598,20 +593,18 @@ VisitorManager::reportHtmlStatus(std::ostream& out, out << "

Queued visitors

\n
    \n"; const auto now = _component.getClock().getMonotonicTime(); - for (CommandQueue::const_iterator it - = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) - { - std::shared_ptr cmd(it->_command); + for (const auto& enqueued : _visitorQueue) { + auto& cmd = enqueued._command; assert(cmd); out << "
  • " << cmd->getInstanceId() << " - " << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout " - << vespalib::count_ms(it->_time - now) << " ms\n"; + << vespalib::count_ms(enqueued._deadline - now) << " ms\n"; } if (_visitorQueue.empty()) { out << "None\n"; } out << "
\n"; - if (_visitorMessages.size() > 0) { + if (!_visitorMessages.empty()) { out << "

Waiting for the following visitor replies

" << "\n" << "" diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index ef01a684901..cc3e709a848 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -24,7 +24,7 @@ namespace storage { VisitorThread::Event::Event(Event&& other) noexcept : _visitorId(other._visitorId), - _message(other._message), + _message(std::move(other._message)), _mbusReply(std::move(other._mbusReply)), _timer(other._timer), _type(other._type) @@ -37,18 +37,18 @@ VisitorThread::Event& VisitorThread::Event::operator= (Event&& other) noexcept { _visitorId = other._visitorId; - _message = other._message; + _message = std::move(other._message); _mbusReply = std::move(other._mbusReply); _timer = other._timer; _type = other._type; return *this; } -VisitorThread::Event::Event(api::VisitorId visitor, const std::shared_ptr& msg) +VisitorThread::Event::Event(api::VisitorId visitor, std::shared_ptr msg) : _visitorId(visitor), - _message(msg), + _message(std::move(msg)), _timer(), - _type(PERSISTENCE) + _type(Type::PERSISTENCE) { } @@ -56,7 +56,7 @@ VisitorThread::Event::Event(api::VisitorId visitor, mbus::Reply::UP reply) : _visitorId(visitor), _mbusReply(std::move(reply)), _timer(), - _type(MBUS) + _type(Type::MBUS) { } @@ -102,7 +102,7 @@ VisitorThread::VisitorThread(uint32_t threadIndex, VisitorThread::~VisitorThread() { - if (_thread.get() != 0) { + if (_thread) { _thread->interruptAndJoin(_cond); } } @@ -125,12 +125,11 @@ VisitorThread::shutdown() // Answer all queued up commands and clear queue { std::lock_guard sync(_lock); - for (const Event & event : _queue) - { + for (const Event & event : _queue) { if (event._message.get()) { if (!event._message->getType().isReply() - && (event._message->getType() != api::MessageType::INTERNAL - || static_cast(*event._message).getType() != PropagateVisitorConfig::ID)) + && (event._message->getType() != api::MessageType::INTERNAL + || static_cast(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr reply( static_cast(*event._message).makeReply()); @@ -142,9 +141,7 @@ VisitorThread::shutdown() _queue.clear(); } // Close all visitors. Send create visitor replies - for (VisitorMap::iterator it = _visitors.begin(); - it != _visitors.end();) - { + for (auto it = _visitors.begin(); it != _visitors.end();) { LOG(debug, "Force-closing visitor %s as we're shutting down.", it->second->getVisitorName().c_str()); _currentlyRunningVisitor = it++; @@ -158,9 +155,8 @@ VisitorThread::processMessage(api::VisitorId id, const std::shared_ptr& msg) { { - Event m(id, msg); std::unique_lock sync(_lock); - _queue.push_back(Event(id, msg)); + _queue.emplace_back(id, msg); } _cond.notify_one(); } diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index ebae51ed2b2..226e7c0631b 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -33,15 +33,15 @@ class VisitorThread : public framework::Runnable, private api::MessageHandler, private framework::MetricUpdateHook { - typedef std::map > LibMap; + using LibMap = std::map>; LibMap _libs; - typedef std::map > VisitorMap; + using VisitorMap = std::map>; VisitorMap _visitors; - std::deque > _recentlyCompleted; + std::deque> _recentlyCompleted; struct Event { - enum Type { + enum class Type { MBUS, PERSISTENCE, NONE @@ -50,21 +50,20 @@ class VisitorThread : public framework::Runnable, api::VisitorId _visitorId; std::shared_ptr _message; mbus::Reply::UP _mbusReply; - metrics::MetricTimer _timer; Type _type; - Event() noexcept : _visitorId(0), _message(), _timer(), _type(NONE) {} + Event() noexcept : _visitorId(0), _message(), _timer(), _type(Type::NONE) {} Event(Event&& other) noexcept; Event& operator= (Event&& other) noexcept; Event(const Event& other) = delete; Event& operator= (const Event& other) = delete; Event(api::VisitorId visitor, mbus::Reply::UP reply); - Event(api::VisitorId visitor, const std::shared_ptr& msg); + Event(api::VisitorId visitor, std::shared_ptr msg); ~Event(); - bool empty() const noexcept { - return (_type == NONE); + [[nodiscard]] bool empty() const noexcept { + return (_type == Type::NONE); } }; @@ -105,9 +104,6 @@ public: void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); } void handleMessageBusReply(std::unique_ptr reply, Visitor& visitor); - /** For unit tests needing to pause thread. */ - std::mutex & getQueueMonitor() { return _lock; } - const VisitorThreadMetrics& getMetrics() const noexcept { return _metrics; } @@ -130,8 +126,6 @@ private: vespalib::asciistream & error); bool onCreateVisitor(const std::shared_ptr&) override; - - bool onVisitorReply(const std::shared_ptr& reply); bool onInternal(const std::shared_ptr&) override; bool onInternalReply(const std::shared_ptr&) override; -- cgit v1.2.3
Storage API message id