summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2022-03-21 14:42:11 +0100
committerGitHub <noreply@github.com>2022-03-21 14:42:11 +0100
commit34cf5db6ceeb838f1850558984cbe5e06d63870f (patch)
tree4ca69b11d7ad4acc9946d60d944b65d7144e59d5
parent90db7b1ee93c737e3ab26530c321dfe66f558f53 (diff)
parent9aeff921406eb238bedf0210803b1bea441030aa (diff)
Merge pull request #21757 from vespa-engine/vekterli/fix-visitor-mgr-test-tsan-deadlock-warning
Fix visitor manager test TSan mutex inversion warning and refactor deadline handling [run-systemtest]
-rw-r--r--storage/src/tests/visiting/commandqueuetest.cpp45
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp20
-rw-r--r--storage/src/vespa/storage/visiting/commandqueue.h80
-rw-r--r--storage/src/vespa/storage/visiting/visitor.h2
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp122
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h8
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp28
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h22
-rw-r--r--storageapi/src/vespa/storageapi/messageapi/storagecommand.h4
-rw-r--r--storageframework/src/vespa/storageframework/generic/clock/clock.h2
10 files changed, 165 insertions, 168 deletions
diff --git a/storage/src/tests/visiting/commandqueuetest.cpp b/storage/src/tests/visiting/commandqueuetest.cpp
index ac4251ba21e..1e24fafed4f 100644
--- a/storage/src/tests/visiting/commandqueuetest.cpp
+++ b/storage/src/tests/visiting/commandqueuetest.cpp
@@ -53,10 +53,11 @@ TEST(CommandQueueTest, fifo) {
ASSERT_FALSE(queue.empty());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- queue.releaseNextCommand().first);
- if (cmd.get() == 0) break;
- commands.push_back(cmd);
+ auto cmd = queue.releaseNextCommand().first;
+ if (!cmd) {
+ break;
+ }
+ commands.emplace_back(std::move(cmd));
}
ASSERT_EQ(7, commands.size());
EXPECT_EQ("first t=1 p=0", getCommandString(commands[0]));
@@ -89,12 +90,12 @@ TEST(CommandQueueTest, fifo_with_priorities) {
ASSERT_FALSE(queue.empty());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
- std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekNextCommand());
- std::shared_ptr<api::CreateVisitorCommand> cmd(queue.releaseNextCommand().first);
- if (cmd.get() == 0 || cmdPeek != cmd) {
+ auto cmdPeek = queue.peekNextCommand();
+ auto cmd = queue.releaseNextCommand().first;
+ if (!cmd || cmdPeek != cmd) {
break;
}
- commands.push_back(cmd);
+ commands.emplace_back(std::move(cmd));
}
ASSERT_EQ(7, commands.size());
EXPECT_EQ("seventh t=7 p=0", getCommandString(commands[0]));
@@ -119,17 +120,14 @@ TEST(CommandQueueTest, release_oldest) {
queue.add(getCommand("seventh", 700ms));
ASSERT_EQ(7u, queue.size());
- using CommandEntry = CommandQueue<api::CreateVisitorCommand>::CommandEntry;
- std::list<CommandEntry> timedOut(queue.releaseTimedOut());
+ auto timedOut = queue.releaseTimedOut();
ASSERT_TRUE(timedOut.empty());
clock.addMilliSecondsToTime(400);
timedOut = queue.releaseTimedOut();
ASSERT_EQ(4, timedOut.size());
std::ostringstream ost;
- for (std::list<CommandEntry>::const_iterator it = timedOut.begin();
- it != timedOut.end(); ++it)
- {
- ost << getCommandString(it->_command) << "\n";
+ for (const auto& timed_out_entry : timedOut) {
+ ost << getCommandString(timed_out_entry._command) << "\n";
}
EXPECT_EQ("fourth t=5 p=0\n"
"first t=10 p=0\n"
@@ -154,13 +152,12 @@ TEST(CommandQueueTest, release_lowest_priority) {
std::vector<std::shared_ptr<api::CreateVisitorCommand>> commands;
for (;;) {
- std::shared_ptr<api::CreateVisitorCommand> cmdPeek(queue.peekLowestPriorityCommand());
- std::pair<std::shared_ptr<api::CreateVisitorCommand>, uint64_t> cmd(
- queue.releaseLowestPriorityCommand());
- if (cmd.first.get() == 0 || cmdPeek != cmd.first) {
+ auto cmdPeek = queue.peekLowestPriorityCommand();
+ auto cmd = queue.releaseLowestPriorityCommand().first;
+ if (!cmd || cmdPeek != cmd) {
break;
}
- commands.push_back(cmd.first);
+ commands.emplace_back(std::move(cmd));
}
ASSERT_EQ(7, commands.size());
EXPECT_EQ("sixth t=14 p=50", getCommandString(commands[0]));
@@ -185,20 +182,18 @@ TEST(CommandQueueTest, delete_iterator) {
queue.add(getCommand("seventh", 700ms));
ASSERT_EQ(7u, queue.size());
- CommandQueue<api::CreateVisitorCommand>::iterator it = queue.begin();
+ auto it = queue.begin();
++it; ++it;
queue.erase(it);
ASSERT_EQ(6u, queue.size());
std::vector<std::shared_ptr<api::CreateVisitorCommand>> cmds;
for (;;) {
- std::shared_ptr<api::CreateVisitorCommand> cmd(
- std::dynamic_pointer_cast<api::CreateVisitorCommand>(
- queue.releaseNextCommand().first));
- if (cmd.get() == 0) {
+ auto cmd = queue.releaseNextCommand().first;
+ if (!cmd) {
break;
}
- cmds.push_back(cmd);
+ cmds.emplace_back(std::move(cmd));
}
ASSERT_EQ(6, cmds.size());
EXPECT_EQ("first t=10 p=0", getCommandString(cmds[0]));
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 99172264d47..be4e7270c69 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -53,7 +53,7 @@ protected:
~VisitorManagerTest();
// Not using setUp since can't throw exception out of it.
- void initializeTest();
+ void initializeTest(bool defer_manager_thread_start = false);
void addSomeRemoves(bool removeAll = false);
void TearDown() override;
TestVisitorMessageSession& getSession(uint32_t n);
@@ -78,7 +78,7 @@ VisitorManagerTest::~VisitorManagerTest() = default;
uint32_t VisitorManagerTest::docCount = 10;
void
-VisitorManagerTest::initializeTest()
+VisitorManagerTest::initializeTest(bool defer_manager_thread_start)
{
vdstestlib::DirConfig config(getStandardConfig(true));
config.getConfig("stor-visitor").set("visitorthreads", "1");
@@ -88,7 +88,11 @@ VisitorManagerTest::initializeTest()
_node->setupDummyPersistence();
_node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1"));
_top = std::make_unique<DummyStorageLink>();
- auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), _node->getComponentRegister(), *_messageSessionFactory);
+ auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()),
+ _node->getComponentRegister(),
+ *_messageSessionFactory,
+ VisitorFactory::Map(),
+ defer_manager_thread_start);
_manager = vm.get();
_top->push_back(std::move(vm));
_top->push_back(std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), _node->getPersistenceProvider(),
@@ -753,22 +757,22 @@ TEST_F(VisitorManagerTest, abort_on_field_path_error) {
}
TEST_F(VisitorManagerTest, visitor_queue_timeout) {
- ASSERT_NO_FATAL_FAILURE(initializeTest());
+ ASSERT_NO_FATAL_FAILURE(initializeTest(true));
_manager->enforceQueueUsage();
{
- std::lock_guard guard(_manager->getThread(0).getQueueMonitor());
-
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
cmd->setAddress(_Address);
cmd->setQueueTimeout(1ms);
cmd->setTimeout(100 * 1000 * 1000ms);
+ // The manager thread isn't running yet so the visitor stays on the queue
_top->sendDown(cmd);
-
- _node->getClock().addSecondsToTime(1000);
}
+ _node->getClock().addSecondsToTime(1000);
+ _manager->create_and_start_manager_thread();
+
// Don't answer any messages. Make sure we timeout anyways.
_top->waitForMessages(1, 60);
const msg_ptr_vector replies = _top->getRepliesOnce();
diff --git a/storage/src/vespa/storage/visiting/commandqueue.h b/storage/src/vespa/storage/visiting/commandqueue.h
index 70f5b2421c3..3232baf50de 100644
--- a/storage/src/vespa/storage/visiting/commandqueue.h
+++ b/storage/src/vespa/storage/visiting/commandqueue.h
@@ -18,7 +18,7 @@
#include <vespa/vespalib/util/printable.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/storageframework/generic/clock/clock.h>
-#include <list>
+#include <vector>
#include <ostream>
namespace storage {
@@ -28,17 +28,21 @@ class CommandQueue : public vespalib::Printable
{
public:
struct CommandEntry {
- typedef typename Command::Priority PriorityType;
+ using PriorityType = typename Command::Priority;
+
std::shared_ptr<Command> _command;
- uint64_t _time;
- uint64_t _sequenceId;
- PriorityType _priority;
+ vespalib::steady_time _deadline;
+ uint64_t _sequenceId;
+ PriorityType _priority;
CommandEntry(const std::shared_ptr<Command>& cmd,
- uint64_t time,
+ vespalib::steady_time deadline,
uint64_t sequenceId,
PriorityType priority)
- : _command(cmd), _time(time), _sequenceId(sequenceId), _priority(priority)
+ : _command(cmd),
+ _deadline(deadline),
+ _sequenceId(sequenceId),
+ _priority(priority)
{}
// Sort on both priority and sequence ID
@@ -51,23 +55,22 @@ public:
};
private:
- typedef boost::multi_index::multi_index_container<
+ using CommandList = boost::multi_index::multi_index_container<
CommandEntry,
boost::multi_index::indexed_by<
boost::multi_index::ordered_unique<
boost::multi_index::identity<CommandEntry>
>,
boost::multi_index::ordered_non_unique<
- boost::multi_index::member<CommandEntry, uint64_t, &CommandEntry::_time>
+ boost::multi_index::member<CommandEntry, vespalib::steady_time, &CommandEntry::_deadline>
>
>
- > CommandList;
- typedef typename boost::multi_index
- ::nth_index<CommandList, 1>::type timelist;
+ >;
+ using timelist = typename boost::multi_index::nth_index<CommandList, 1>::type;
const framework::Clock& _clock;
- mutable CommandList _commands;
- uint64_t _sequenceId;
+ mutable CommandList _commands;
+ uint64_t _sequenceId;
public:
typedef typename CommandList::iterator iterator;
@@ -76,7 +79,7 @@ public:
typedef typename CommandList::const_reverse_iterator const_reverse_iterator;
typedef typename timelist::const_iterator const_titerator;
- CommandQueue(const framework::Clock& clock)
+ explicit CommandQueue(const framework::Clock& clock)
: _clock(clock),
_sequenceId(0) {}
@@ -89,22 +92,22 @@ public:
const_iterator end() const { return _commands.end(); }
const_titerator tbegin() const {
- timelist& tl = boost::multi_index::get<1>(_commands);
+ auto& tl = boost::multi_index::get<1>(_commands);
return tl.begin();
}
const_titerator tend() const {
- timelist& tl = boost::multi_index::get<1>(_commands);
+ auto& tl = boost::multi_index::get<1>(_commands);
return tl.end();
}
bool empty() const { return _commands.empty(); }
uint32_t size() const { return _commands.size(); }
- std::pair<std::shared_ptr<Command>, time_t> releaseNextCommand();
+ std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseNextCommand();
std::shared_ptr<Command> peekNextCommand() const;
void add(const std::shared_ptr<Command>& msg);
void erase(iterator it) { _commands.erase(it); }
- std::list<CommandEntry> releaseTimedOut();
- std::pair<std::shared_ptr<Command>, time_t> releaseLowestPriorityCommand();
+ std::vector<CommandEntry> releaseTimedOut();
+ std::pair<std::shared_ptr<Command>, vespalib::steady_time> releaseLowestPriorityCommand();
std::shared_ptr<Command> peekLowestPriorityCommand() const;
void clear() { return _commands.clear(); }
@@ -113,15 +116,14 @@ public:
template<class Command>
-std::pair<std::shared_ptr<Command>, time_t>
+std::pair<std::shared_ptr<Command>, vespalib::steady_time>
CommandQueue<Command>::releaseNextCommand()
{
- std::pair<std::shared_ptr<Command>, time_t> retVal(
- std::shared_ptr<Command>(), 0);
+ std::pair<std::shared_ptr<Command>, vespalib::steady_time> retVal;
if (!_commands.empty()) {
iterator first = _commands.begin();
retVal.first = first->_command;
- retVal.second = first->_time;
+ retVal.second = first->_deadline;
_commands.erase(first);
}
return retVal;
@@ -143,38 +145,36 @@ template<class Command>
void
CommandQueue<Command>::add(const std::shared_ptr<Command>& cmd)
{
- framework::MicroSecTime time(_clock.getTimeInMicros()
- + framework::MicroSecTime(vespalib::count_us(cmd->getQueueTimeout())));
- _commands.insert(CommandEntry(cmd, time.getTime(), ++_sequenceId, cmd->getPriority()));
+ auto deadline = _clock.getMonotonicTime() + cmd->getQueueTimeout();
+ _commands.insert(CommandEntry(cmd, deadline, ++_sequenceId, cmd->getPriority()));
}
template<class Command>
-std::list<typename CommandQueue<Command>::CommandEntry>
+std::vector<typename CommandQueue<Command>::CommandEntry>
CommandQueue<Command>::releaseTimedOut()
{
- std::list<CommandEntry> mylist;
- framework::MicroSecTime time(_clock.getTimeInMicros());
- while (!empty() && tbegin()->_time <= time.getTime()) {
- mylist.push_back(*tbegin());
+ std::vector<CommandEntry> timed_out;
+ auto now = _clock.getMonotonicTime();
+ while (!empty() && (tbegin()->_deadline <= now)) {
+ timed_out.emplace_back(*tbegin());
timelist& tl = boost::multi_index::get<1>(_commands);
tl.erase(tbegin());
}
- return mylist;
+ return timed_out;
}
template <class Command>
-std::pair<std::shared_ptr<Command>, time_t>
+std::pair<std::shared_ptr<Command>, vespalib::steady_time>
CommandQueue<Command>::releaseLowestPriorityCommand()
{
if (!_commands.empty()) {
iterator last = (++_commands.rbegin()).base();
- time_t time = last->_time;
+ auto deadline = last->_deadline;
std::shared_ptr<Command> cmd(last->_command);
_commands.erase(last);
- return std::pair<std::shared_ptr<Command>, time_t>(cmd, time);
+ return {cmd, deadline};
} else {
- return std::pair<std::shared_ptr<Command>, time_t>(
- std::shared_ptr<Command>(), 0);
+ return {};
}
}
@@ -198,12 +198,12 @@ CommandQueue<Command>::print(std::ostream& out, bool verbose, const std::string&
out << "Insert order:\n";
for (const_iterator it = begin(); it != end(); ++it) {
out << indent << *it->_command << ", priority " << it->_priority
- << ", time " << it->_time << "\n";
+ << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch()) << "\n";
}
out << indent << "Time order:";
for (const_titerator it = tbegin(); it != tend(); ++it) {
out << "\n" << indent << *it->_command << ", priority " << it->_priority
- << ", time " << it->_time;
+ << ", time " << vespalib::count_ms(it->_deadline.time_since_epoch());
}
}
diff --git a/storage/src/vespa/storage/visiting/visitor.h b/storage/src/vespa/storage/visiting/visitor.h
index 11192021577..aa1c34d10b7 100644
--- a/storage/src/vespa/storage/visiting/visitor.h
+++ b/storage/src/vespa/storage/visiting/visitor.h
@@ -70,7 +70,7 @@ public:
*/
virtual void closed(api::VisitorId id) = 0;
- virtual ~VisitorMessageHandler() {}
+ virtual ~VisitorMessageHandler() = default;
};
/**
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index eb5c927873c..282299ebbc1 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -22,7 +22,8 @@ namespace storage {
VisitorManager::VisitorManager(const config::ConfigUri & configUri,
StorageComponentRegister& componentRegister,
VisitorMessageSessionFactory& messageSF,
- const VisitorFactory::Map& externalFactories)
+ const VisitorFactory::Map& externalFactories,
+ bool defer_manager_thread_start)
: StorageLink("Visitor Manager"),
framework::HtmlStatusReporter("visitorman", "Visitor Manager"),
_componentRegister(componentRegister),
@@ -51,13 +52,15 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
_configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this);
_configFetcher->start();
_component.registerMetric(*_metrics);
- _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ);
+ if (!defer_manager_thread_start) {
+ create_and_start_manager_thread();
+ }
_component.registerMetricUpdateHook(*this, framework::SecondTime(5));
- _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>();
+ _visitorFactories["dumpvisitor"] = std::make_shared<DumpVisitorSingleFactory>();
_visitorFactories["dumpvisitorsingle"] = std::make_shared<DumpVisitorSingleFactory>();
- _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>();
- _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>();
- _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>();
+ _visitorFactories["testvisitor"] = std::make_shared<TestVisitorFactory>();
+ _visitorFactories["countvisitor"] = std::make_shared<CountVisitorFactory>();
+ _visitorFactories["recoveryvisitor"] = std::make_shared<RecoveryVisitorFactory>();
_visitorFactories["reindexingvisitor"] = std::make_shared<ReindexingVisitorFactory>();
_component.registerStatusPage(*this);
}
@@ -65,7 +68,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
VisitorManager::~VisitorManager() {
closeNextLink();
LOG(debug, "Deleting link %s.", toString().c_str());
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interrupt();
_visitorCond.notify_all();
_thread->join();
@@ -74,6 +77,13 @@ VisitorManager::~VisitorManager() {
}
void
+VisitorManager::create_and_start_manager_thread()
+{
+ assert(!_thread);
+ _thread = _component.startThread(*this, 30s, 1s, 1, vespalib::CpuUsage::Category::READ);
+}
+
+void
VisitorManager::updateMetrics(const MetricLockGuard &)
{
_metrics->queueSize.addValue(_visitorQueue.size());
@@ -86,17 +96,15 @@ VisitorManager::onClose()
_configFetcher->close();
{
std::lock_guard sync(_visitorLock);
- for (CommandQueue<api::CreateVisitorCommand>::iterator it
- = _visitorQueue.begin(); it != _visitorQueue.end(); ++it)
- {
- auto reply = std::make_shared<api::CreateVisitorReply>(*it->_command);
+ for (auto& enqueued : _visitorQueue) {
+ auto reply = std::make_shared<api::CreateVisitorReply>(*enqueued._command);
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
sendUp(reply);
}
_visitorQueue.clear();
}
- for (uint32_t i=0; i<_visitorThread.size(); ++i) {
- _visitorThread[i].first->shutdown();
+ for (auto& visitor_thread : _visitorThread) {
+ visitor_thread.first->shutdown();
}
}
@@ -145,7 +153,7 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
maxConcurrentVisitorsVariable = 0;
}
- bool liveUpdate = (_visitorThread.size() > 0);
+ bool liveUpdate = !_visitorThread.empty();
if (liveUpdate) {
if (_visitorThread.size() != static_cast<uint32_t>(config->visitorthreads)) {
LOG(warning, "Ignoring config change requesting %u visitor "
@@ -177,7 +185,10 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
_metrics->initThreads(config->visitorthreads);
for (int32_t i=0; i<config->visitorthreads; ++i) {
_visitorThread.emplace_back(
- new VisitorThread(i, _componentRegister, _messageSessionFactory, _visitorFactories, *_metrics->threads[i], *this),
+ // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager
+ std::shared_ptr<VisitorThread>(
+ new VisitorThread(i, _componentRegister, _messageSessionFactory,
+ _visitorFactories, *_metrics->threads[i], *this)),
std::map<api::VisitorId, std::string>());
}
}
@@ -196,8 +207,8 @@ VisitorManager::run(framework::ThreadHandle& thread)
{
LOG(debug, "Started visitor manager thread with pid %d.", getpid());
typedef CommandQueue<api::CreateVisitorCommand> CQ;
- std::list<CQ::CommandEntry> timedOut;
- // Run forever, dump messages in the visitor queue that times out.
+ std::vector<CQ::CommandEntry> timedOut;
+ // Run forever, dump messages in the visitor queue that times out.
while (true) {
thread.registerTick(framework::PROCESS_CYCLE);
{
@@ -207,13 +218,12 @@ VisitorManager::run(framework::ThreadHandle& thread)
}
timedOut = _visitorQueue.releaseTimedOut();
}
- framework::MicroSecTime currentTime(_component.getClock().getTimeInMicros());
- for (std::list<CQ::CommandEntry>::iterator it = timedOut.begin();
- it != timedOut.end(); ++it)
- {
- _metrics->queueTimeoutWaitTime.addValue(currentTime.getTime() - it->_time);
- std::shared_ptr<api::StorageReply> reply(it->_command->makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::BUSY,"Visitor timed out in visitor queue"));
+ const auto currentTime = _component.getClock().getMonotonicTime();
+ for (auto& timed_out_entry : timedOut) {
+ // TODO is this really tracking what the metric description implies it's tracking...?
+ _metrics->queueTimeoutWaitTime.addValue(vespalib::to_s(currentTime - timed_out_entry._deadline) * 1000.0); // Double metric in millis
+ std::shared_ptr<api::StorageReply> reply(timed_out_entry._command->makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::BUSY, "Visitor timed out in visitor queue"));
sendUp(reply);
}
{
@@ -224,10 +234,10 @@ VisitorManager::run(framework::ThreadHandle& thread)
_visitorCond.wait_for(waiter, 1000ms);
thread.registerTick(framework::WAIT_CYCLE);
} else {
- uint64_t timediff = (_visitorQueue.tbegin()->_time- currentTime.getTime()) / 1000000;
- timediff = std::min(timediff, uint64_t(1000));
- if (timediff > 0) {
- _visitorCond.wait_for(waiter, std::chrono::milliseconds(timediff));
+ auto time_diff = _visitorQueue.tbegin()->_deadline - currentTime;
+ time_diff = (time_diff < 1000ms) ? time_diff : 1000ms;
+ if (time_diff.count() > 0) {
+ _visitorCond.wait_for(waiter, time_diff);
thread.registerTick(framework::WAIT_CYCLE);
}
}
@@ -256,8 +266,8 @@ VisitorManager::getActiveVisitorCount() const
{
std::lock_guard sync(_visitorLock);
uint32_t totalCount = 0;
- for (uint32_t i=0; i<_visitorThread.size(); ++i) {
- totalCount += _visitorThread[i].second.size();
+ for (const auto& visitor_thread : _visitorThread) {
+ totalCount += visitor_thread.second.size();
}
return totalCount;
}
@@ -274,8 +284,8 @@ void
VisitorManager::setTimeBetweenTicks(uint32_t time)
{
std::lock_guard sync(_visitorLock);
- for (uint32_t i=0; i<_visitorThread.size(); ++i) {
- _visitorThread[i].first->setTimeBetweenTicks(time);
+ for (auto& visitor_thread : _visitorThread) {
+ visitor_thread.first->setTimeBetweenTicks(time);
}
}
@@ -285,8 +295,8 @@ VisitorManager::scheduleVisitor(
MonitorGuard& visitorLock)
{
api::VisitorId id;
- typedef std::map<std::string, api::VisitorId> NameToIdMap;
- typedef std::pair<std::string, api::VisitorId> NameIdPair;
+ using NameToIdMap = std::map<std::string, api::VisitorId>;
+ using NameIdPair = std::pair<std::string, api::VisitorId>;
std::pair<NameToIdMap::iterator, bool> newEntry;
{
uint32_t totCount;
@@ -307,12 +317,13 @@ VisitorManager::scheduleVisitor(
std::shared_ptr<api::CreateVisitorCommand> tail(_visitorQueue.peekLowestPriorityCommand());
// Lower int ==> higher pri
if (cmd->getPriority() < tail->getPriority()) {
- std::pair<api::CreateVisitorCommand::SP, time_t> evictCommand(_visitorQueue.releaseLowestPriorityCommand());
+ auto evictCommand = _visitorQueue.releaseLowestPriorityCommand();
assert(tail == evictCommand.first);
_visitorQueue.add(cmd);
_visitorCond.notify_one();
- framework::MicroSecTime t(_component.getClock().getTimeInMicros());
- _metrics->queueEvictedWaitTime.addValue(t.getTime() - evictCommand.second);
+ auto now = _component.getClock().getMonotonicTime();
+ // TODO is this really tracking what the metric description implies it's tracking...?
+ _metrics->queueEvictedWaitTime.addValue(vespalib::to_s(now - evictCommand.second) * 1000.0); // Double metric in millis
failCommand = evictCommand.first;
} else {
failCommand = cmd;
@@ -445,8 +456,7 @@ VisitorManager::send(const std::shared_ptr<api::StorageCommand>& cmd,
// Only add to internal state if not destroy iterator command, as
// these are considered special-cased fire-and-forget commands
// that don't have replies.
- if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID)
- {
+ if (static_cast<const api::InternalCommand&>(*cmd).getType() != DestroyIteratorCommand::ID) {
MessageInfo inf;
inf.id = visitor.getVisitorId();
inf.timestamp = _component.getClock().getTimeInSeconds().getTime();
@@ -488,14 +498,15 @@ VisitorManager::attemptScheduleQueuedVisitor(MonitorGuard& visitorLock)
uint32_t totCount;
getLeastLoadedThread(_visitorThread, totCount);
- std::shared_ptr<api::CreateVisitorCommand> cmd(_visitorQueue.peekNextCommand());
+ auto cmd = _visitorQueue.peekNextCommand();
assert(cmd.get());
if (totCount < maximumConcurrent(*cmd)) {
- std::pair<api::CreateVisitorCommand::SP, time_t> cmd2(_visitorQueue.releaseNextCommand());
+ auto cmd2 = _visitorQueue.releaseNextCommand();
assert(cmd == cmd2.first);
scheduleVisitor(cmd, true, visitorLock);
- framework::MicroSecTime time(_component.getClock().getTimeInMicros());
- _metrics->queueWaitTime.addValue(time.getTime() - cmd2.second);
+ auto now = _component.getClock().getMonotonicTime();
+ // TODO is this really tracking what the metric description implies it's tracking...?
+ _metrics->queueWaitTime.addValue(vespalib::to_s(now - cmd2.second) * 1000.0); // Double metric in millis
// visitorLock is unlocked at this point
return true;
}
@@ -506,9 +517,9 @@ void
VisitorManager::closed(api::VisitorId id)
{
std::unique_lock sync(_visitorLock);
- std::map<api::VisitorId, std::string>& usedIds(_visitorThread[id % _visitorThread.size()].second);
+ auto& usedIds(_visitorThread[id % _visitorThread.size()].second);
- std::map<api::VisitorId, std::string>::iterator it = usedIds.find(id);
+ auto it = usedIds.find(id);
if (it == usedIds.end()) {
LOG(warning, "VisitorManager::closed() called multiple times for the "
"same visitor. This was not intended.");
@@ -570,35 +581,30 @@ VisitorManager::reportHtmlStatus(std::ostream& out,
for (uint32_t i=0; i<_visitorThread.size(); ++i) {
visitorCount += _visitorThread[i].second.size();
out << "Thread " << i << ":";
- if (_visitorThread[i].second.size() == 0) {
+ if (_visitorThread[i].second.empty()) {
out << " none";
} else {
- for (std::map<api::VisitorId,std::string>::const_iterator it
- = _visitorThread[i].second.begin();
- it != _visitorThread[i].second.end(); it++)
- {
- out << " " << it->second << " (" << it->first << ")";
+ for (const auto& id_and_visitor : _visitorThread[i].second) {
+ out << " " << id_and_visitor.second << " (" << id_and_visitor.first << ")";
}
}
out << "<br>\n";
}
out << "<h3>Queued visitors</h3>\n<ul>\n";
- framework::MicroSecTime time(_component.getClock().getTimeInMicros());
- for (CommandQueue<api::CreateVisitorCommand>::const_iterator it
- = _visitorQueue.begin(); it != _visitorQueue.end(); ++it)
- {
- std::shared_ptr<api::CreateVisitorCommand> cmd(it->_command);
+ const auto now = _component.getClock().getMonotonicTime();
+ for (const auto& enqueued : _visitorQueue) {
+ auto& cmd = enqueued._command;
assert(cmd);
out << "<li>" << cmd->getInstanceId() << " - "
<< vespalib::count_ms(cmd->getQueueTimeout()) << ", remaining timeout "
- << (it->_time - time.getTime()) / 1000000 << " ms\n";
+ << vespalib::count_ms(enqueued._deadline - now) << " ms\n";
}
if (_visitorQueue.empty()) {
out << "None\n";
}
out << "</ul>\n";
- if (_visitorMessages.size() > 0) {
+ if (!_visitorMessages.empty()) {
out << "<h3>Waiting for the following visitor replies</h3>"
<< "\n<table><tr>"
<< "<th>Storage API message id</th>"
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 888c0873e9c..33703b392bc 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -84,9 +84,11 @@ private:
bool _enforceQueueUse;
VisitorFactory::Map _visitorFactories;
public:
- VisitorManager(const config::ConfigUri & configUri, StorageComponentRegister&,
+ VisitorManager(const config::ConfigUri & configUri,
+ StorageComponentRegister&,
VisitorMessageSessionFactory&,
- const VisitorFactory::Map& external = VisitorFactory::Map());
+ const VisitorFactory::Map& external = VisitorFactory::Map(),
+ bool defer_manager_thread_start = false);
~VisitorManager() override;
void onClose() override;
@@ -115,6 +117,8 @@ public:
}
/** For unit testing */
bool hasPendingMessageState() const;
+ // Must be called exactly once iff manager was created with defer_manager_thread_start == true
+ void create_and_start_manager_thread();
void enforceQueueUsage() { _enforceQueueUse = true; }
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index ef01a684901..cc3e709a848 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -24,7 +24,7 @@ namespace storage {
VisitorThread::Event::Event(Event&& other) noexcept
: _visitorId(other._visitorId),
- _message(other._message),
+ _message(std::move(other._message)),
_mbusReply(std::move(other._mbusReply)),
_timer(other._timer),
_type(other._type)
@@ -37,18 +37,18 @@ VisitorThread::Event&
VisitorThread::Event::operator= (Event&& other) noexcept
{
_visitorId = other._visitorId;
- _message = other._message;
+ _message = std::move(other._message);
_mbusReply = std::move(other._mbusReply);
_timer = other._timer;
_type = other._type;
return *this;
}
-VisitorThread::Event::Event(api::VisitorId visitor, const std::shared_ptr<api::StorageMessage>& msg)
+VisitorThread::Event::Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg)
: _visitorId(visitor),
- _message(msg),
+ _message(std::move(msg)),
_timer(),
- _type(PERSISTENCE)
+ _type(Type::PERSISTENCE)
{
}
@@ -56,7 +56,7 @@ VisitorThread::Event::Event(api::VisitorId visitor, mbus::Reply::UP reply)
: _visitorId(visitor),
_mbusReply(std::move(reply)),
_timer(),
- _type(MBUS)
+ _type(Type::MBUS)
{
}
@@ -102,7 +102,7 @@ VisitorThread::VisitorThread(uint32_t threadIndex,
VisitorThread::~VisitorThread()
{
- if (_thread.get() != 0) {
+ if (_thread) {
_thread->interruptAndJoin(_cond);
}
}
@@ -125,12 +125,11 @@ VisitorThread::shutdown()
// Answer all queued up commands and clear queue
{
std::lock_guard sync(_lock);
- for (const Event & event : _queue)
- {
+ for (const Event & event : _queue) {
if (event._message.get()) {
if (!event._message->getType().isReply()
- && (event._message->getType() != api::MessageType::INTERNAL
- || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
+ && (event._message->getType() != api::MessageType::INTERNAL
+ || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
{
std::shared_ptr<api::StorageReply> reply(
static_cast<api::StorageCommand&>(*event._message).makeReply());
@@ -142,9 +141,7 @@ VisitorThread::shutdown()
_queue.clear();
}
// Close all visitors. Send create visitor replies
- for (VisitorMap::iterator it = _visitors.begin();
- it != _visitors.end();)
- {
+ for (auto it = _visitors.begin(); it != _visitors.end();) {
LOG(debug, "Force-closing visitor %s as we're shutting down.",
it->second->getVisitorName().c_str());
_currentlyRunningVisitor = it++;
@@ -158,9 +155,8 @@ VisitorThread::processMessage(api::VisitorId id,
const std::shared_ptr<api::StorageMessage>& msg)
{
{
- Event m(id, msg);
std::unique_lock sync(_lock);
- _queue.push_back(Event(id, msg));
+ _queue.emplace_back(id, msg);
}
_cond.notify_one();
}
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index ebae51ed2b2..226e7c0631b 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -33,15 +33,15 @@ class VisitorThread : public framework::Runnable,
private api::MessageHandler,
private framework::MetricUpdateHook
{
- typedef std::map<std::string, std::shared_ptr<VisitorEnvironment> > LibMap;
+ using LibMap = std::map<std::string, std::shared_ptr<VisitorEnvironment>>;
LibMap _libs;
- typedef std::map<api::VisitorId, std::shared_ptr<Visitor> > VisitorMap;
+ using VisitorMap = std::map<api::VisitorId, std::shared_ptr<Visitor>>;
VisitorMap _visitors;
- std::deque<std::pair<api::VisitorId, framework::SecondTime> > _recentlyCompleted;
+ std::deque<std::pair<api::VisitorId, framework::SecondTime>> _recentlyCompleted;
struct Event {
- enum Type {
+ enum class Type {
MBUS,
PERSISTENCE,
NONE
@@ -50,21 +50,20 @@ class VisitorThread : public framework::Runnable,
api::VisitorId _visitorId;
std::shared_ptr<api::StorageMessage> _message;
mbus::Reply::UP _mbusReply;
-
metrics::MetricTimer _timer;
Type _type;
- Event() noexcept : _visitorId(0), _message(), _timer(), _type(NONE) {}
+ Event() noexcept : _visitorId(0), _message(), _timer(), _type(Type::NONE) {}
Event(Event&& other) noexcept;
Event& operator= (Event&& other) noexcept;
Event(const Event& other) = delete;
Event& operator= (const Event& other) = delete;
Event(api::VisitorId visitor, mbus::Reply::UP reply);
- Event(api::VisitorId visitor, const std::shared_ptr<api::StorageMessage>& msg);
+ Event(api::VisitorId visitor, std::shared_ptr<api::StorageMessage> msg);
~Event();
- bool empty() const noexcept {
- return (_type == NONE);
+ [[nodiscard]] bool empty() const noexcept {
+ return (_type == Type::NONE);
}
};
@@ -105,9 +104,6 @@ public:
void setTimeBetweenTicks(uint32_t time) { _timeBetweenTicks.store(time, std::memory_order_relaxed); }
void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor);
- /** For unit tests needing to pause thread. */
- std::mutex & getQueueMonitor() { return _lock; }
-
const VisitorThreadMetrics& getMetrics() const noexcept {
return _metrics;
}
@@ -130,8 +126,6 @@ private:
vespalib::asciistream & error);
bool onCreateVisitor(const std::shared_ptr<api::CreateVisitorCommand>&) override;
-
- bool onVisitorReply(const std::shared_ptr<api::StorageReply>& reply);
bool onInternal(const std::shared_ptr<api::InternalCommand>&) override;
bool onInternalReply(const std::shared_ptr<api::InternalReply>&) override;
diff --git a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
index b4157c7860a..30d59e5fe4b 100644
--- a/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
+++ b/storageapi/src/vespa/storageapi/messageapi/storagecommand.h
@@ -36,9 +36,7 @@ public:
void setSourceIndex(uint16_t sourceIndex) { _sourceIndex = sourceIndex; }
uint16_t getSourceIndex() const { return _sourceIndex; }
- /** Set timeout in milliseconds. */
- void setTimeout(duration milliseconds) { _timeout = milliseconds; }
- /** Get timeout in milliseconds. */
+ void setTimeout(duration timeout) { _timeout = timeout; }
duration getTimeout() const { return _timeout; }
/** Used to set a new id so the message can be resent. */
diff --git a/storageframework/src/vespa/storageframework/generic/clock/clock.h b/storageframework/src/vespa/storageframework/generic/clock/clock.h
index becd17da8a8..c9b8f652bfe 100644
--- a/storageframework/src/vespa/storageframework/generic/clock/clock.h
+++ b/storageframework/src/vespa/storageframework/generic/clock/clock.h
@@ -21,7 +21,7 @@ namespace storage::framework {
struct Clock {
using UP = std::unique_ptr<Clock>;
- virtual ~Clock() {}
+ virtual ~Clock() = default;
virtual MicroSecTime getTimeInMicros() const = 0;
virtual MilliSecTime getTimeInMillis() const = 0;