diff options
10 files changed, 90 insertions, 89 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index dc0a01448af..454db5d2404 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -414,7 +414,7 @@ TEST_F(DistributorTest, tick_processes_status_requests) { } ASSERT_TRUE(tick()); - tp->interruptAndJoin(nullptr); + tp->interruptAndJoin(); EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)")); } diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 0ce305fb8bb..bd78f71712b 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -778,7 +778,7 @@ TEST_F(VisitorManagerTest, visitor_queue_timeout) { _manager->enforceQueueUsage(); { - vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor()); + std::lock_guard guard(_manager->getThread(0).getQueueMonitor()); auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", ""); cmd->addBucketToBeVisited(document::BucketId(16, 3)); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 7999514ac00..34d16c6c1d3 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -271,7 +271,7 @@ MergeThrottler::onClose() // Avoid getting config on shutdown _configFetcher.close(); { - vespalib::MonitorGuard guard(_messageLock); + std::lock_guard guard(_messageLock); // Note: used to prevent taking locks in different order if onFlush // and abortOutdatedMerges are called concurrently, as these need to // take both locks in differing orders. @@ -283,7 +283,7 @@ MergeThrottler::onClose() _merges.size(), _queue.size()); } if (_thread) { - _thread->interruptAndJoin(&_messageLock); + _thread->interruptAndJoin(_messageCond); _thread.reset(); } } @@ -294,7 +294,7 @@ MergeThrottler::onFlush(bool /*downwards*/) // Lock state before messages since the latter must be unlocked // before the guard starts hauling messages up the chain. MessageGuard msgGuard(_stateLock, *this); - vespalib::MonitorGuard lock(_messageLock); + std::lock_guard lock(_messageLock); // Abort active merges, queued and up/down pending std::vector<api::StorageMessage::SP> flushable; @@ -593,19 +593,19 @@ MergeThrottler::processQueuedMerges(MessageGuard& msgGuard) } void -MergeThrottler::handleRendezvous(vespalib::MonitorGuard& guard) +MergeThrottler::handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { if (_rendezvous != RENDEZVOUS_NONE) { LOG(spam, "rendezvous requested by external thread; establishing"); assert(_rendezvous == RENDEZVOUS_REQUESTED); _rendezvous = RENDEZVOUS_ESTABLISHED; - guard.broadcast(); + cond.notify_all(); while (_rendezvous != RENDEZVOUS_RELEASED) { - guard.wait(); + cond.wait(guard); } LOG(spam, "external thread rendezvous released"); _rendezvous = RENDEZVOUS_NONE; - guard.broadcast(); + cond.notify_all(); } } @@ -617,7 +617,7 @@ MergeThrottler::run(framework::ThreadHandle& thread) std::vector<api::StorageMessage::SP> up; std::vector<api::StorageMessage::SP> down; { - vespalib::MonitorGuard msgLock(_messageLock); + std::unique_lock msgLock(_messageLock); // If a rendezvous is requested, we must do this here _before_ we // swap the message queues. This is so the caller can remove aborted // messages from the queues when it knows exactly where this thread @@ -628,10 +628,10 @@ MergeThrottler::run(framework::ThreadHandle& thread) && !thread.interrupted() && _rendezvous == RENDEZVOUS_NONE) { - msgLock.wait(1000); + _messageCond.wait_for(msgLock, 1000ms); thread.registerTick(framework::WAIT_CYCLE); } - handleRendezvous(msgLock); + handleRendezvous(msgLock, _messageCond); down.swap(_messagesDown); up.swap(_messagesUp); } @@ -1068,9 +1068,11 @@ bool MergeThrottler::onDown(const std::shared_ptr<api::StorageMessage>& msg) { if (isMergeCommand(*msg) || isMergeReply(*msg)) { - vespalib::MonitorGuard lock(_messageLock); - _messagesDown.push_back(msg); - lock.broadcast(); + { + std::lock_guard lock(_messageLock); + _messagesDown.push_back(msg); + } + _messageCond.notify_all(); return true; } else if (isDiffCommand(*msg)) { std::lock_guard lock(_stateLock); @@ -1133,51 +1135,54 @@ MergeThrottler::onUp(const std::shared_ptr<api::StorageMessage>& msg) LOG(spam, "Received %s from persistence layer", mergeReply.toString().c_str()); - vespalib::MonitorGuard lock(_messageLock); - _messagesUp.push_back(msg); - lock.broadcast(); + { + std::lock_guard lock(_messageLock); + _messagesUp.push_back(msg); + } + _messageCond.notify_all(); return true; } return false; } void -MergeThrottler::rendezvousWithWorkerThread(vespalib::MonitorGuard& guard) +MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { LOG(spam, "establishing rendezvous with worker thread"); assert(_rendezvous == RENDEZVOUS_NONE); _rendezvous = RENDEZVOUS_REQUESTED; - guard.broadcast(); + cond.notify_all(); while (_rendezvous != RENDEZVOUS_ESTABLISHED) { - guard.wait(); + cond.wait(guard); } LOG(spam, "rendezvous established with worker thread"); } void -MergeThrottler::releaseWorkerThreadRendezvous(vespalib::MonitorGuard& guard) +MergeThrottler::releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond) { _rendezvous = RENDEZVOUS_RELEASED; - guard.broadcast(); + cond.notify_all(); while (_rendezvous != RENDEZVOUS_NONE) { - guard.wait(); + cond.wait(guard); } } class ThreadRendezvousGuard { MergeThrottler& _throttler; - vespalib::MonitorGuard& _guard; + std::unique_lock<std::mutex> & _guard; + std::condition_variable & _cond; public: - ThreadRendezvousGuard(MergeThrottler& throttler, - vespalib::MonitorGuard& guard) + ThreadRendezvousGuard(MergeThrottler& throttler, std::unique_lock<std::mutex> & guard, std::condition_variable & cond) : _throttler(throttler), - _guard(guard) + _guard(guard), + _cond(cond) { - _throttler.rendezvousWithWorkerThread(_guard); + _throttler.rendezvousWithWorkerThread(_guard, _cond); } ~ThreadRendezvousGuard() { - _throttler.releaseWorkerThreadRendezvous(_guard); + _throttler.releaseWorkerThreadRendezvous(_guard, _cond); } }; @@ -1193,8 +1198,8 @@ MergeThrottler::handleOutdatedMerges(const api::SetSystemStateCommand& cmd) // receiving merges, but this uses the _server_ object's cluster state, // which isn't set yet at the time we get the new state command, so // there exists a time window where outdated merges can be accepted. Blarg! - vespalib::MonitorGuard guard(_messageLock); - ThreadRendezvousGuard rzGuard(*this, guard); + std::unique_lock guard(_messageLock); + ThreadRendezvousGuard rzGuard(*this, guard, _messageCond); if (_closing) return; // Shutting down anyway. diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index b2087600105..9e0e9c08b3c 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -154,7 +154,8 @@ private: std::size_t _maxQueueSize; mbus::StaticThrottlePolicy::UP _throttlePolicy; uint64_t _queueSequence; // TODO: move into a stable priority queue class - vespalib::Monitor _messageLock; + mutable std::mutex _messageLock; + std::condition_variable _messageCond; mutable std::mutex _stateLock; config::ConfigFetcher _configFetcher; // Messages pending to be processed by the worker thread @@ -204,7 +205,7 @@ public: const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; } mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; } // For unit testing only - vespalib::Monitor& getMonitor() { return _messageLock; } + std::mutex & getMonitor() { return _messageLock; } std::mutex & getStateLock() { return _stateLock; } Metrics& getMetrics() { return *_metrics; } @@ -365,9 +366,9 @@ private: void rejectOutdatedQueuedMerges(MessageGuard& msgGuard, uint32_t rejectLessThanVersion); bool attemptProcessNextQueuedMerge(MessageGuard& msgGuard); bool processQueuedMerges(MessageGuard& msgGuard); - void handleRendezvous(vespalib::MonitorGuard& guard); - void rendezvousWithWorkerThread(vespalib::MonitorGuard&); - void releaseWorkerThreadRendezvous(vespalib::MonitorGuard&); + void handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond); + void rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond); + void releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond); bool isDiffCommand(const api::StorageMessage& msg) const; bool isMergeCommand(const api::StorageMessage& msg) const; bool isMergeReply(const api::StorageMessage& msg) const; diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index db4dfd0323e..7dc6b20a2ef 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -42,7 +42,7 @@ StateManager::StateManager(StorageComponentRegister& compReg, _nextSystemState(), _stateListeners(), _queuedStateRequests(), - _threadMonitor(), + _threadLock(), _lastProgressUpdateCausingSend(0), _progressLastInitStateSend(-1), _systemStateHistory(), @@ -54,8 +54,7 @@ StateManager::StateManager(StorageComponentRegister& compReg, _notifyingListeners(false) { _nodeState->setMinUsedBits(58); - _nodeState->setStartTimestamp( - _component.getClock().getTimeInSeconds().getTime()); + _nodeState->setStartTimestamp(_component.getClock().getTimeInSeconds().getTime()); _component.registerStatusPage(*this); } @@ -65,7 +64,7 @@ StateManager::~StateManager() LOG(debug, "Deleting link %s.", toString().c_str()); if (_thread) { LOG(debug, "onClose() not called before destructor"); - _thread->interruptAndJoin(&_threadMonitor); + _thread->interruptAndJoin(_threadCond); } } @@ -82,7 +81,7 @@ void StateManager::onClose() { if (_thread) { - _thread->interruptAndJoin(&_threadMonitor); + _thread->interruptAndJoin(_threadCond); _thread.reset(); } sendGetNodeStateReplies(); @@ -532,14 +531,14 @@ StateManager::run(framework::ThreadHandle& thread) { while (true) { thread.registerTick(); - vespalib::MonitorGuard guard(_threadMonitor); + std::unique_lock guard(_threadLock); // Take lock before doing stuff, to be sure we don't wait after // destructor have grabbed lock to stop() us. if (thread.interrupted()) { break; } tick(); - guard.wait(1000); + _threadCond.wait_for(guard, 1000ms); } } @@ -551,8 +550,7 @@ StateManager::tick() { } bool -StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime, - uint16_t node) +StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime, uint16_t node) { std::vector<std::shared_ptr<api::GetNodeStateReply>> replies; { diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index b6f43eb270e..215e344e4a4 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -55,7 +55,8 @@ class StateManager : public NodeStateUpdater, typedef std::pair<framework::MilliSecTime, api::GetNodeStateCommand::SP> TimeStatePair; std::list<TimeStatePair> _queuedStateRequests; - mutable vespalib::Monitor _threadMonitor; + mutable std::mutex _threadLock; + std::condition_variable _threadCond; framework::MilliSecTime _lastProgressUpdateCausingSend; vespalib::Double _progressLastInitStateSend; using TimeSysStatePair = std::pair<framework::MilliSecTime, std::shared_ptr<const ClusterStateBundle>>; @@ -73,7 +74,7 @@ class StateManager : public NodeStateUpdater, public: explicit StateManager(StorageComponentRegister&, metrics::MetricManager&, std::unique_ptr<HostInfo>, bool testMode = false); - ~StateManager(); + ~StateManager() override; void onOpen() override; void onClose() override; diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp index 73f4a70d80d..e1daeb729b7 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.cpp +++ b/storage/src/vespa/storage/visiting/visitorthread.cpp @@ -77,7 +77,8 @@ VisitorThread::VisitorThread(uint32_t threadIndex, : _visitors(), _recentlyCompleted(), _queue(), - _queueMonitor(), + _lock(), + _cond(), _currentlyRunningVisitor(_visitors.end()), _messageSender(sender), _metrics(metrics), @@ -104,13 +105,13 @@ VisitorThread::VisitorThread(uint32_t threadIndex, VisitorThread::~VisitorThread() { if (_thread.get() != 0) { - _thread->interruptAndJoin(&_queueMonitor); + _thread->interruptAndJoin(_cond); } } void VisitorThread::updateMetrics(const MetricLockGuard &) { - vespalib::MonitorGuard sync(_queueMonitor); + std::lock_guard sync(_lock); _metrics.queueSize.addValue(_queue.size()); } @@ -118,27 +119,24 @@ void VisitorThread::shutdown() { // Stop event thread - if (_thread.get() != 0) { - _thread->interruptAndJoin(&_queueMonitor); - _thread.reset(0); + if (_thread) { + _thread->interruptAndJoin(_cond); + _thread.reset(); } // Answer all queued up commands and clear queue { - vespalib::MonitorGuard sync(_queueMonitor); - for (std::deque<Event>::iterator it = _queue.begin(); - it != _queue.end(); ++it) + std::lock_guard sync(_lock); + for (const Event & event : _queue) { - if (it->_message.get()) { - if (!it->_message->getType().isReply() - && (it->_message->getType() != api::MessageType::INTERNAL - || static_cast<const api::InternalCommand&>(*it->_message) - .getType() != PropagateVisitorConfig::ID)) + if (event._message.get()) { + if (!event._message->getType().isReply() + && (event._message->getType() != api::MessageType::INTERNAL + || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID)) { std::shared_ptr<api::StorageReply> reply( - static_cast<api::StorageCommand&>(*it->_message).makeReply()); - reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, - "Shutting down storage node.")); + static_cast<api::StorageCommand&>(*event._message).makeReply()); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node.")); _messageSender.send(reply); } } @@ -161,16 +159,18 @@ void VisitorThread::processMessage(api::VisitorId id, const std::shared_ptr<api::StorageMessage>& msg) { - Event m(id, msg); - vespalib::MonitorGuard sync(_queueMonitor); - _queue.push_back(Event(id, msg)); - sync.signal(); + { + Event m(id, msg); + std::unique_lock sync(_lock); + _queue.push_back(Event(id, msg)); + } + _cond.notify_one(); } VisitorThread::Event VisitorThread::popNextQueuedEventIfAvailable() { - vespalib::MonitorGuard guard(_queueMonitor); + std::lock_guard guard(_lock); if (!_queue.empty()) { Event e(std::move(_queue.front())); _queue.pop_front(); @@ -194,9 +194,9 @@ VisitorThread::run(framework::ThreadHandle& thread) if (entry.empty()) { // If none, give visitors something to trigger of. tick(); - vespalib::MonitorGuard guard(_queueMonitor); + std::unique_lock guard(_lock); if (_queue.empty()) { - guard.wait(_timeBetweenTicks.load(std::memory_order_relaxed)); + _cond.wait_for(guard, std::chrono::milliseconds(_timeBetweenTicks.load(std::memory_order_relaxed))); thread.registerTick(framework::WAIT_CYCLE); } continue; @@ -545,12 +545,13 @@ VisitorThread::onCreateVisitor( } void -VisitorThread::handleMessageBusReply(mbus::Reply::UP reply, - Visitor& visitor) +VisitorThread::handleMessageBusReply(mbus::Reply::UP reply, Visitor& visitor) { - vespalib::MonitorGuard sync(_queueMonitor); - _queue.emplace_back(visitor.getVisitorId(), std::move(reply)); - sync.broadcast(); + { + std::lock_guard sync(_lock); + _queue.emplace_back(visitor.getVisitorId(), std::move(reply)); + } + _cond.notify_all(); } bool diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h index 2b15468fd3f..a682544ec23 100644 --- a/storage/src/vespa/storage/visiting/visitorthread.h +++ b/storage/src/vespa/storage/visiting/visitorthread.h @@ -22,7 +22,6 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/metrics/metrictimer.h> #include <vespa/vespalib/util/document_runnable.h> -#include <vespa/vespalib/util/sync.h> #include <atomic> #include <deque> @@ -70,8 +69,9 @@ class VisitorThread : public framework::Runnable, } }; - std::deque<Event> _queue; - vespalib::Monitor _queueMonitor; + std::deque<Event> _queue; + std::mutex _lock; + std::condition_variable _cond; VisitorMap::iterator _currentlyRunningVisitor; VisitorMessageHandler& _messageSender; @@ -99,7 +99,7 @@ public: VisitorFactory::Map&, VisitorThreadMetrics& metrics, VisitorMessageHandler& sender); - ~VisitorThread(); + ~VisitorThread() override; void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg); void shutdown(); @@ -107,7 +107,7 @@ public: void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor); /** For unit tests needing to pause thread. */ - vespalib::Monitor& getQueueMonitor() { return _queueMonitor; } + std::mutex & getQueueMonitor() { return _lock; } const VisitorThreadMetrics& getMetrics() const noexcept { return _metrics; diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp index 24ed28d077b..ee92753a23c 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp @@ -1,18 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "thread.h" -#include <vespa/vespalib/util/sync.h> namespace storage::framework { void -Thread::interruptAndJoin(vespalib::Monitor* m) +Thread::interruptAndJoin() { interrupt(); - if (m != nullptr) { - vespalib::MonitorGuard monitorGuard(*m); - monitorGuard.broadcast(); - } join(); } diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h index 26c59ce1330..814686eb4fa 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -58,7 +58,7 @@ public: * Utility function to interrupt and join a thread, possibly broadcasting * through a monitor after the signalling face. */ - void interruptAndJoin(vespalib::Monitor* m); + void interruptAndJoin(); void interruptAndJoin(std::condition_variable &cv); }; |