diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-18 16:37:19 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2022-03-18 16:37:19 +0000 |
commit | 9aeff921406eb238bedf0210803b1bea441030aa (patch) | |
tree | 1901bd8359ca031ffdd3bd9ec380c74f3a067151 /storage/src | |
parent | 9be87dcfc81d88b7aa5df918047ba3ab977b6c3f (diff) |
Clean up and refactor visitor management code
Diffstat (limited to 'storage/src')
-rw-r--r-- | storage/src/tests/visiting/commandqueuetest.cpp | 44 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/commandqueue.h | 46 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitormanager.cpp | 79 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitorthread.cpp | 28 | ||||
-rw-r--r-- | storage/src/vespa/storage/visiting/visitorthread.h | 22 |
5 files changed, 101 insertions, 118 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp index 4d528ce4971..1e24fafed4f 100644 --- a/storage/src/tests/visiting/commandqueuetest.cpp +++ b/storage/src/tests/visiting/commandqueuetest.cpp @@ -53,10 +53,11 @@ TEST(CommandQueueTest, fifo) { ASSERT_FALSE(queue.empty()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmd( - queue.releaseNextCommand().first); - if (cmd.get() == 0) break; - commands.push_back(cmd); + auto cmd = queue.releaseNextCommand().first; + if (!cmd) { + break; + } + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("first t=1 p=0", getCommandString(commands[0])); @@ -89,12 +90,12 @@ TEST(CommandQueueTest, fifo_with_priorities) { ASSERT_FALSE(queue.empty()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekNextCommand()); - std::shared_ptr<api::CreateVisitorCommand> cmd(queue.releaseNextCommand().first); - if (cmd.get() == 0 || cmdPeek != cmd) { + auto cmdPeek = queue.peekNextCommand(); + auto cmd = queue.releaseNextCommand().first; + if (!cmd || cmdPeek != cmd) { break; } - commands.push_back(cmd); + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0])); @@ -119,17 +120,14 @@ TEST(CommandQueueTest, release_oldest) { queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); - using CommandEntry = CommandQueue<api::CreateVisitorCommand>::CommandEntry; - std::list<CommandEntry> timedOut(queue.releaseTimedOut()); + auto timedOut = queue.releaseTimedOut(); ASSERT_TRUE(timedOut.empty()); clock.addMilliSecondsToTime(400); timedOut = queue.releaseTimedOut(); ASSERT_EQ(4, timedOut.size()); std::ostringstream ost; - for (std::list<CommandEntry>::const_iterator it = timedOut.begin(); - it != timedOut.end(); ++it) - { - ost << getCommandString(it->_command) << "\n"; + for (const auto& timed_out_entry : timedOut) { + ost << getCommandString(timed_out_entry._command) << "\n"; } EXPECT_EQ("fourth t=5 p=0\n" "first t=10 p=0\n" @@ -154,12 +152,12 @@ TEST(CommandQueueTest, release_lowest_priority) { std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand()); - auto cmd_and_deadline = queue.releaseLowestPriorityCommand(); - if (!cmd_and_deadline.first || cmdPeek != cmd_and_deadline.first) { + auto cmdPeek = queue.peekLowestPriorityCommand(); + auto cmd = queue.releaseLowestPriorityCommand().first; + if (!cmd || cmdPeek != cmd) { break; } - commands.push_back(cmd_and_deadline.first); + commands.emplace_back(std::move(cmd)); } ASSERT_EQ(7, commands.size()); EXPECT_EQ("sixth t=14 p=50", getCommandString(commands[0])); @@ -184,20 +182,18 @@ TEST(CommandQueueTest, delete_iterator) { queue.add(getCommand("seventh", 700ms)); ASSERT_EQ(7u, queue.size()); - CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin(); + auto it = queue.begin(); ++it; ++it; queue.erase(it); ASSERT_EQ(6u, queue.size()); std::vector<std::shared_ptr<api::CreateVisitorCommand>> cmds; for (;;) { - std::shared_ptr<api::CreateVisitorCommand> cmd( - std::dynamic_pointer_cast<api::CreateVisitorCommand>( - queue.releaseNextCommand().first)); - if (cmd.get() == 0) { + auto cmd = queue.releaseNextCommand().first; + if (!cmd) { break; } - cmds.push_back(cmd); + cmds.emplace_back(std::move(cmd)); } ASSERT_EQ(6, cmds.size()); EXPECT_EQ("first t=10 p=0", getCommandString(cmds[0])); diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h index 0f345aa0d94..3232baf50de 100644 --- a/storage/src/vespa/storage/visiting/commandqueue.h +++ b/storage/src/vespa/storage/visiting/commandqueue.h @@ -18,7 +18,7 @@ #include <vespa/vespalib/util/printable.h> #include <vespa/vespalib/util/time.h> #include <vespa/storageframework/generic/clock/clock.h> -#include <list> +#include <vector> #include <ostream> namespace storage { @@ -28,17 +28,21 @@ class CommandQueue : public vespalib::Printable { public: struct CommandEntry { - typedef typename Command::Priority PriorityType; + using PriorityType = typename Command::Priority; + std::shared_ptr<Command> _command; - vespalib::steady_time _time; - uint64_t _sequenceId; - PriorityType _priority; + vespalib::steady_time _deadline; + uint64_t _sequenceId; + PriorityType _priority; CommandEntry(const std::shared_ptr<Command>& cmd, - vespalib::steady_time time, + vespalib::steady_time deadline, uint64_t sequenceId, PriorityType priority) - : _command(cmd), _time(time), _sequenceId(sequenceId), _priority(priority) + : _command(cmd), + _deadline(deadline), + _sequenceId(sequenceId), + _priority(priority) {} // Sort on both priority and sequence ID @@ -58,15 +62,15 @@ private: boost::multi_index::identity<CommandEntry> >, boost::multi_index::ordered_non_unique< - boost::multi_index::member<CommandEntry, vespalib::steady_time, &CommandEntry::_time> + boost::multi_index::member<CommandEntry, vespalib::steady_time, &CommandEntry::_deadline> > > >; using timelist = typename boost::multi_index::nth_index<CommandList, 1>::type; const framework::Clock& _clock; - mutable CommandList _commands; - uint64_t _sequenceId; + mutable CommandList _commands; + uint64_t _sequenceId; public: typedef typename CommandList::iterator iterator; @@ -102,7 +106,7 @@ public: std::shared_ptr<Command> peekNextCommand() const; void add(const std::shared_ptr<Command>& msg); void erase(iterator it) { _commands.erase(it); } - std::list<CommandEntry> releaseTimedOut(); + std::vector<CommandEntry> releaseTimedOut(); std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand(); std::shared_ptr<Command> peekLowestPriorityCommand() const; @@ -119,7 +123,7 @@ CommandQueue<Command>::releaseNextCommand() if (!_commands.empty()) { iterator first = _commands.begin(); retVal.first = first->_command; - retVal.second = first->_time; + retVal.second = first->_deadline; _commands.erase(first); } return retVal; @@ -146,17 +150,17 @@ CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd) } template<class Command> -std::list<typename CommandQueue<Command>::CommandEntry> +std::vector<typename CommandQueue<Command>::CommandEntry> CommandQueue<Command>::releaseTimedOut() { - std::list<CommandEntry> mylist; + std::vector<CommandEntry> timed_out; auto now = _clock.getMonotonicTime(); - while (!empty() && tbegin()->_time <= now) { - mylist.push_back(*tbegin()); + while (!empty() && (tbegin()->_deadline <= now)) { + timed_out.emplace_back(*tbegin()); timelist& tl = boost::multi_index::get<1>(_commands); tl.erase(tbegin()); } - return mylist; + return timed_out; } template <class Command> @@ -165,10 +169,10 @@ CommandQueue<Command>::releaseLowestPriorityCommand() { if (!_commands.empty()) { iterator last = (++_commands.rbegin()).base(); - auto time = last->_time; + auto deadline = last->_deadline; std::shared_ptr<Command> cmd(last->_command); _commands.erase(last); - return {cmd, time}; + return {cmd, deadline}; } else { return {}; } @@ -194,12 +198,12 @@ CommandQueue<Command>::print(std::ostream& out, bool verbose, const std::string& out << "Insert order:\n"; for (const_iterator it = begin(); it != end(); ++it) { out << indent << *it->_command << ", priority " << it->_priority - << ", time " << vespalib::count_ms(it->_time.time_since_epoch()) << "\n"; + << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()) << "\n"; } out << indent << "Time order:"; for (const_titerator it = tbegin(); it != tend(); ++it) { out << "\n" << indent << *it->_command << ", priority " << it->_priority - << ", time " << vespalib::count_ms(it->_time.time_since_epoch()); + << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()); } } diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index de418fe61db..282299ebbc1 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -56,11 +56,11 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, create_and_start_manager_thread(); } _component.registerMetricUpdateHook(*this, framework::SecondTime(5)); - _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>(); + _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>(); _visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>(); - _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>(); - _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>(); - _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>(); + _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>(); + _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>(); + _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>(); _visitorFactories["reindexingvisitor"] = std::make_shared<ReindexingVisitorFactory>(); _component.registerStatusPage(*this); } @@ -96,17 +96,15 @@ VisitorManager::onClose() _configFetcher->close(); { std::lock_guard sync(_visitorLock); - for (CommandQueue<api::CreateVisitorCommand>::iterator it - = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) - { - auto reply = std::make_shared<api::CreateVisitorReply>(*it->_command); + for (auto& enqueued : _visitorQueue) { + auto reply = std::make_shared<api::CreateVisitorReply>(*enqueued._command); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); sendUp(reply); } _visitorQueue.clear(); } - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - _visitorThread[i].first->shutdown(); + for (auto& visitor_thread : _visitorThread) { + visitor_thread.first->shutdown(); } } @@ -155,7 +153,7 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi maxConcurrentVisitorsVariable = 0; } - bool liveUpdate = (_visitorThread.size() > 0); + bool liveUpdate = !_visitorThread.empty(); if (liveUpdate) { if (_visitorThread.size() != static_cast<uint32_t>(config->visitorthreads)) { LOG(warning, "Ignoring config change requesting %u visitor " @@ -187,7 +185,10 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi _metrics->initThreads(config->visitorthreads); for (int32_t i=0; i<config->visitorthreads; ++i) { _visitorThread.emplace_back( - new VisitorThread(i, _componentRegister, _messageSessionFactory, _visitorFactories, *_metrics->threads[i], *this), + // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager + std::shared_ptr<VisitorThread>( + new VisitorThread(i, _componentRegister, _messageSessionFactory, + _visitorFactories, *_metrics->threads[i], *this)), std::map<api::VisitorId, std::string>()); } } @@ -206,8 +207,8 @@ VisitorManager::run(framework::ThreadHandle& thread) { LOG(debug, "Started visitor manager thread with pid %d.", getpid()); typedef CommandQueue<api::CreateVisitorCommand> CQ; - std::list<CQ::CommandEntry> timedOut; - // Run forever, dump messages in the visitor queue that times out. + std::vector<CQ::CommandEntry> timedOut; + // Run forever, dump messages in the visitor queue that times out. while (true) { thread.registerTick(framework::PROCESS_CYCLE); { @@ -218,12 +219,10 @@ VisitorManager::run(framework::ThreadHandle& thread) timedOut = _visitorQueue.releaseTimedOut(); } const auto currentTime = _component.getClock().getMonotonicTime(); - for (std::list<CQ::CommandEntry>::iterator it = timedOut.begin(); - it != timedOut.end(); ++it) - { + for (auto& timed_out_entry : timedOut) { // TODO is this really tracking what the metric description implies it's tracking...? - _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - it->_time) * 1000.0); // Double metric in millis - std::shared_ptr<api::StorageReply> reply(it->_command->makeReply()); + _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - timed_out_entry._deadline) * 1000.0); // Double metric in millis + std::shared_ptr<api::StorageReply> reply(timed_out_entry._command->makeReply()); reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue")); sendUp(reply); } @@ -235,7 +234,7 @@ VisitorManager::run(framework::ThreadHandle& thread) _visitorCond.wait_for(waiter, 1000ms); thread.registerTick(framework::WAIT_CYCLE); } else { - auto time_diff = _visitorQueue.tbegin()->_time - currentTime; + auto time_diff = _visitorQueue.tbegin()->_deadline - currentTime; time_diff = (time_diff < 1000ms) ? time_diff : 1000ms; if (time_diff.count() > 0) { _visitorCond.wait_for(waiter, time_diff); @@ -267,8 +266,8 @@ VisitorManager::getActiveVisitorCount() const { std::lock_guard sync(_visitorLock); uint32_t totalCount = 0; - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - totalCount += _visitorThread[i].second.size(); + for (const auto& visitor_thread : _visitorThread) { + totalCount += visitor_thread.second.size(); } return totalCount; } @@ -285,8 +284,8 @@ void VisitorManager::setTimeBetweenTicks(uint32_t time) { std::lock_guard sync(_visitorLock); - for (uint32_t i=0; i<_visitorThread.size(); ++i) { - _visitorThread[i].first->setTimeBetweenTicks(time); + for (auto& visitor_thread : _visitorThread) { + visitor_thread.first->setTimeBetweenTicks(time); } } @@ -296,8 +295,8 @@ VisitorManager::scheduleVisitor( MonitorGuard& visitorLock) { api::VisitorId id; - typedef std::map<std::string, api::VisitorId> NameToIdMap; - typedef std::pair<std::string, api::VisitorId> NameIdPair; + using NameToIdMap = std::map<std::string, api::VisitorId>; + using NameIdPair = std::pair<std::string, api::VisitorId>; std::pair<NameToIdMap::iterator, bool> newEntry; { uint32_t totCount; @@ -457,8 +456,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd, // Only add to internal state if not destroy iterator command, as // these are considered special-cased fire-and-forget commands // that don't have replies. - if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) - { + if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) { MessageInfo inf; inf.id = visitor.getVisitorId(); inf.timestamp = _component.getClock().getTimeInSeconds().getTime(); @@ -500,7 +498,7 @@ VisitorManager::attemptScheduleQueuedVisitor(MonitorGuard& visitorLock) uint32_t totCount; getLeastLoadedThread(_visitorThread, totCount); - std::shared_ptr<api::CreateVisitorCommand> cmd(_visitorQueue.peekNextCommand()); + auto cmd = _visitorQueue.peekNextCommand(); assert(cmd.get()); if (totCount < maximumConcurrent(*cmd)) { auto cmd2 = _visitorQueue.releaseNextCommand(); @@ -519,9 +517,9 @@ void VisitorManager::closed(api::VisitorId id) { std::unique_lock sync(_visitorLock); - std::map<api::VisitorId, std::string>& usedIds(_visitorThread[id % _visitorThread.size()].second); + auto& usedIds(_visitorThread[id % _visitorThread.size()].second); - std::map<api::VisitorId, std::string>::iterator it = usedIds.find(id); + auto it = usedIds.find(id); if (it == usedIds.end()) { LOG(warning, "VisitorManager::closed() called multiple times for the " "same visitor. This was not intended."); @@ -583,14 +581,11 @@ VisitorManager::reportHtmlStatus(std::ostream& out, for (uint32_t i=0; i<_visitorThread.size(); ++i) { visitorCount += _visitorThread[i].second.size(); out << "Thread " << i << ":"; - if (_visitorThread[i].second.size() == 0) { + if (_visitorThread[i].second.empty()) { out << " none"; } else { - for (std::map<api::VisitorId,std::string>::const_iterator it - = _visitorThread[i].second.begin(); - it != _visitorThread[i].second.end(); it++) - { - out << " " << it->second << " (" << it->first << ")"; + for (const auto& id_and_visitor : _visitorThread[i].second) { + out << " " << id_and_visitor.second << " (" << id_and_visitor.first << ")"; } } out << "<br>\n"; @@ -598,20 +593,18 @@ VisitorManager::reportHtmlStatus(std::ostream& out, out << "<h3>Queued visitors</h3>\n<ul>\n"; const auto now = _component.getClock().getMonotonicTime(); - for (CommandQueue<api::CreateVisitorCommand>::const_iterator it - = _visitorQueue.begin(); it != _visitorQueue.end(); ++it) - { - std::shared_ptr<api::CreateVisitorCommand> cmd(it->_command); + for (const auto& enqueued : _visitorQueue) { + auto& cmd = enqueued._command; assert(cmd); out << "<li>" << cmd->getInstanceId() << " - " << vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout " - << vespalib::count_ms(it->_time - now) << " ms\n"; + << vespalib::count_ms(enqueued._deadline - now) << " ms\n"; } if (_visitorQueue.empty()) { out << "None\n"; } out << "</ul>\n"; - if (_visitorMessages.size() > 0) { + if (!_visitorMessages.empty()) { out << "<h3>Waiting for the following visitor replies</h3>" << "\n<table><tr>" << "<th>Storage API message id</th>" diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index ef01a684901..cc3e709a848 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -24,7 +24,7 @@ namespace storage { VisitorThread::Event::Event(Event&& other) noexcept : _visitorId(other._visitorId), - _message(other._message), + _message(std::move(other._message)), _mbusReply(std::move(other._mbusReply)), _timer(other._timer), _type(other._type) @@ -37,18 +37,18 @@ VisitorThread::Event& VisitorThread::Event::operator= (Event&& other) noexcept { _visitorId = other._visitorId; - _message = other._message; + _message = std::move(other._message); _mbusReply = std::move(other._mbusReply); _timer = other._timer; _type = other._type; return *this; } -VisitorThread::Event::Event(api::VisitorId visitor, const std::shared_ptr<api::StorageMessage>& msg) +VisitorThread::Event::Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg) : _visitorId(visitor), - _message(msg), + _message(std::move(msg)), _timer(), - _type(PERSISTENCE) + _type(Type::PERSISTENCE) { } @@ -56,7 +56,7 @@ VisitorThread::Event::Event(api::VisitorId visitor, mbus::Reply::UP reply) : _visitorId(visitor), _mbusReply(std::move(reply)), _timer(), - _type(MBUS) + _type(Type::MBUS) { } @@ -102,7 +102,7 @@ VisitorThread::VisitorThread(uint32_t threadIndex, VisitorThread::~VisitorThread() { - if (_thread.get() != 0) { + if (_thread) { _thread->interruptAndJoin(_cond); } } @@ -125,12 +125,11 @@ VisitorThread::shutdown() // Answer all queued up commands and clear queue { std::lock_guard sync(_lock); - for (const Event & event : _queue) - { + for (const Event & event : _queue) { if (event._message.get()) { if (!event._message->getType().isReply() - && (event._message->getType() != api::MessageType::INTERNAL - || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) + && (event._message->getType() != api::MessageType::INTERNAL + || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( static_cast<api::StorageCommand&>(*event._message).makeReply()); @@ -142,9 +141,7 @@ VisitorThread::shutdown() _queue.clear(); } // Close all visitors. Send create visitor replies - for (VisitorMap::iterator it = _visitors.begin(); - it != _visitors.end();) - { + for (auto it = _visitors.begin(); it != _visitors.end();) { LOG(debug, "Force-closing visitor %s as we're shutting down.", it->second->getVisitorName().c_str()); _currentlyRunningVisitor = it++; @@ -158,9 +155,8 @@ VisitorThread::processMessage(api::VisitorId id, const std::shared_ptr<api::StorageMessage>& msg) { { - Event m(id, msg); std::unique_lock sync(_lock); - _queue.push_back(Event(id, msg)); + _queue.emplace_back(id, msg); } _cond.notify_one(); } diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index ebae51ed2b2..226e7c0631b 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -33,15 +33,15 @@ class VisitorThread : public framework::Runnable, private api::MessageHandler, private framework::MetricUpdateHook { - typedef std::map<std::string, std::shared_ptr<VisitorEnvironment> > LibMap; + using LibMap = std::map<std::string, std::shared_ptr<VisitorEnvironment>>; LibMap _libs; - typedef std::map<api::VisitorId, std::shared_ptr<Visitor> > VisitorMap; + using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>; VisitorMap _visitors; - std::deque<std::pair<api::VisitorId, framework::SecondTime> > _recentlyCompleted; + std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted; struct Event { - enum Type { + enum class Type { MBUS, PERSISTENCE, NONE @@ -50,21 +50,20 @@ class VisitorThread : public framework::Runnable, api::VisitorId _visitorId; std::shared_ptr<api::StorageMessage> _message; mbus::Reply::UP _mbusReply; - metrics::MetricTimer _timer; Type _type; - Event() noexcept : _visitorId(0), _message(), _timer(), _type(NONE) {} + Event() noexcept : _visitorId(0), _message(), _timer(), _type(Type::NONE) {} Event(Event&& other) noexcept; Event& operator= (Event&& other) noexcept; Event(const Event& other) = delete; Event& operator= (const Event& other) = delete; Event(api::VisitorId visitor, mbus::Reply::UP reply); - Event(api::VisitorId visitor, const std::shared_ptr<api::StorageMessage>& msg); + Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg); ~Event(); - bool empty() const noexcept { - return (_type == NONE); + [[nodiscard]] bool empty() const noexcept { + return (_type == Type::NONE); } }; @@ -105,9 +104,6 @@ public: void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); } void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor); - /** For unit tests needing to pause thread. */ - std::mutex & getQueueMonitor() { return _lock; } - const VisitorThreadMetrics& getMetrics() const noexcept { return _metrics; } @@ -130,8 +126,6 @@ private: vespalib::asciistream & error); bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override; - - bool onVisitorReply(const std::shared_ptr<api::StorageReply>& reply); bool onInternal(const std::shared_ptr<api::InternalCommand>&) override; bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override; |