summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-12 20:24:58 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-10-12 20:24:58 +0000
commit1da49f753629e2c84d09c0cad4cb1bb88c4dc0e5 (patch)
tree3a6ecfb09a7f21a2e132e07fb447221c6b8e6cd1
parente1996c4514f65c66aa31fc9290406a2f0f4a83c3 (diff)
vespalib::Monitor -> std::mutex and std::condition_variable
-rw-r--r--storage/src/tests/distributor/distributortest.cpp2
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp65
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h11
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp16
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h5
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.cpp59
-rw-r--r--storage/src/vespa/storage/visiting/visitorthread.h10
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/thread.cpp7
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/thread.h2
10 files changed, 90 insertions, 89 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index dc0a01448af..454db5d2404 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -414,7 +414,7 @@ TEST_F(DistributorTest, tick_processes_status_requests) {
}
ASSERT_TRUE(tick());
- tp->interruptAndJoin(nullptr);
+ tp->interruptAndJoin();
EXPECT_THAT(thread.getResult(), HasSubstr("BucketId(0x4000000000000001)"));
}
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 0ce305fb8bb..bd78f71712b 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -778,7 +778,7 @@ TEST_F(VisitorManagerTest, visitor_queue_timeout) {
_manager->enforceQueueUsage();
{
- vespalib::MonitorGuard guard(_manager->getThread(0).getQueueMonitor());
+ std::lock_guard guard(_manager->getThread(0).getQueueMonitor());
auto cmd = std::make_shared<api::CreateVisitorCommand>(makeBucketSpace(), "DumpVisitor", "testvis", "");
cmd->addBucketToBeVisited(document::BucketId(16, 3));
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 7999514ac00..34d16c6c1d3 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -271,7 +271,7 @@ MergeThrottler::onClose()
// Avoid getting config on shutdown
_configFetcher.close();
{
- vespalib::MonitorGuard guard(_messageLock);
+ std::lock_guard guard(_messageLock);
// Note: used to prevent taking locks in different order if onFlush
// and abortOutdatedMerges are called concurrently, as these need to
// take both locks in differing orders.
@@ -283,7 +283,7 @@ MergeThrottler::onClose()
_merges.size(), _queue.size());
}
if (_thread) {
- _thread->interruptAndJoin(&_messageLock);
+ _thread->interruptAndJoin(_messageCond);
_thread.reset();
}
}
@@ -294,7 +294,7 @@ MergeThrottler::onFlush(bool /*downwards*/)
// Lock state before messages since the latter must be unlocked
// before the guard starts hauling messages up the chain.
MessageGuard msgGuard(_stateLock, *this);
- vespalib::MonitorGuard lock(_messageLock);
+ std::lock_guard lock(_messageLock);
// Abort active merges, queued and up/down pending
std::vector<api::StorageMessage::SP> flushable;
@@ -593,19 +593,19 @@ MergeThrottler::processQueuedMerges(MessageGuard& msgGuard)
}
void
-MergeThrottler::handleRendezvous(vespalib::MonitorGuard& guard)
+MergeThrottler::handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
if (_rendezvous != RENDEZVOUS_NONE) {
LOG(spam, "rendezvous requested by external thread; establishing");
assert(_rendezvous == RENDEZVOUS_REQUESTED);
_rendezvous = RENDEZVOUS_ESTABLISHED;
- guard.broadcast();
+ cond.notify_all();
while (_rendezvous != RENDEZVOUS_RELEASED) {
- guard.wait();
+ cond.wait(guard);
}
LOG(spam, "external thread rendezvous released");
_rendezvous = RENDEZVOUS_NONE;
- guard.broadcast();
+ cond.notify_all();
}
}
@@ -617,7 +617,7 @@ MergeThrottler::run(framework::ThreadHandle& thread)
std::vector<api::StorageMessage::SP> up;
std::vector<api::StorageMessage::SP> down;
{
- vespalib::MonitorGuard msgLock(_messageLock);
+ std::unique_lock msgLock(_messageLock);
// If a rendezvous is requested, we must do this here _before_ we
// swap the message queues. This is so the caller can remove aborted
// messages from the queues when it knows exactly where this thread
@@ -628,10 +628,10 @@ MergeThrottler::run(framework::ThreadHandle& thread)
&& !thread.interrupted()
&& _rendezvous == RENDEZVOUS_NONE)
{
- msgLock.wait(1000);
+ _messageCond.wait_for(msgLock, 1000ms);
thread.registerTick(framework::WAIT_CYCLE);
}
- handleRendezvous(msgLock);
+ handleRendezvous(msgLock, _messageCond);
down.swap(_messagesDown);
up.swap(_messagesUp);
}
@@ -1068,9 +1068,11 @@ bool
MergeThrottler::onDown(const std::shared_ptr<api::StorageMessage>& msg)
{
if (isMergeCommand(*msg) || isMergeReply(*msg)) {
- vespalib::MonitorGuard lock(_messageLock);
- _messagesDown.push_back(msg);
- lock.broadcast();
+ {
+ std::lock_guard lock(_messageLock);
+ _messagesDown.push_back(msg);
+ }
+ _messageCond.notify_all();
return true;
} else if (isDiffCommand(*msg)) {
std::lock_guard lock(_stateLock);
@@ -1133,51 +1135,54 @@ MergeThrottler::onUp(const std::shared_ptr<api::StorageMessage>& msg)
LOG(spam, "Received %s from persistence layer",
mergeReply.toString().c_str());
- vespalib::MonitorGuard lock(_messageLock);
- _messagesUp.push_back(msg);
- lock.broadcast();
+ {
+ std::lock_guard lock(_messageLock);
+ _messagesUp.push_back(msg);
+ }
+ _messageCond.notify_all();
return true;
}
return false;
}
void
-MergeThrottler::rendezvousWithWorkerThread(vespalib::MonitorGuard& guard)
+MergeThrottler::rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
LOG(spam, "establishing rendezvous with worker thread");
assert(_rendezvous == RENDEZVOUS_NONE);
_rendezvous = RENDEZVOUS_REQUESTED;
- guard.broadcast();
+ cond.notify_all();
while (_rendezvous != RENDEZVOUS_ESTABLISHED) {
- guard.wait();
+ cond.wait(guard);
}
LOG(spam, "rendezvous established with worker thread");
}
void
-MergeThrottler::releaseWorkerThreadRendezvous(vespalib::MonitorGuard& guard)
+MergeThrottler::releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
{
_rendezvous = RENDEZVOUS_RELEASED;
- guard.broadcast();
+ cond.notify_all();
while (_rendezvous != RENDEZVOUS_NONE) {
- guard.wait();
+ cond.wait(guard);
}
}
class ThreadRendezvousGuard {
MergeThrottler& _throttler;
- vespalib::MonitorGuard& _guard;
+ std::unique_lock<std::mutex> & _guard;
+ std::condition_variable & _cond;
public:
- ThreadRendezvousGuard(MergeThrottler& throttler,
- vespalib::MonitorGuard& guard)
+ ThreadRendezvousGuard(MergeThrottler& throttler, std::unique_lock<std::mutex> & guard, std::condition_variable & cond)
: _throttler(throttler),
- _guard(guard)
+ _guard(guard),
+ _cond(cond)
{
- _throttler.rendezvousWithWorkerThread(_guard);
+ _throttler.rendezvousWithWorkerThread(_guard, _cond);
}
~ThreadRendezvousGuard() {
- _throttler.releaseWorkerThreadRendezvous(_guard);
+ _throttler.releaseWorkerThreadRendezvous(_guard, _cond);
}
};
@@ -1193,8 +1198,8 @@ MergeThrottler::handleOutdatedMerges(const api::SetSystemStateCommand& cmd)
// receiving merges, but this uses the _server_ object's cluster state,
// which isn't set yet at the time we get the new state command, so
// there exists a time window where outdated merges can be accepted. Blarg!
- vespalib::MonitorGuard guard(_messageLock);
- ThreadRendezvousGuard rzGuard(*this, guard);
+ std::unique_lock guard(_messageLock);
+ ThreadRendezvousGuard rzGuard(*this, guard, _messageCond);
if (_closing) return; // Shutting down anyway.
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index b2087600105..9e0e9c08b3c 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -154,7 +154,8 @@ private:
std::size_t _maxQueueSize;
mbus::StaticThrottlePolicy::UP _throttlePolicy;
uint64_t _queueSequence; // TODO: move into a stable priority queue class
- vespalib::Monitor _messageLock;
+ mutable std::mutex _messageLock;
+ std::condition_variable _messageCond;
mutable std::mutex _stateLock;
config::ConfigFetcher _configFetcher;
// Messages pending to be processed by the worker thread
@@ -204,7 +205,7 @@ public:
const mbus::StaticThrottlePolicy& getThrottlePolicy() const { return *_throttlePolicy; }
mbus::StaticThrottlePolicy& getThrottlePolicy() { return *_throttlePolicy; }
// For unit testing only
- vespalib::Monitor& getMonitor() { return _messageLock; }
+ std::mutex & getMonitor() { return _messageLock; }
std::mutex & getStateLock() { return _stateLock; }
Metrics& getMetrics() { return *_metrics; }
@@ -365,9 +366,9 @@ private:
void rejectOutdatedQueuedMerges(MessageGuard& msgGuard, uint32_t rejectLessThanVersion);
bool attemptProcessNextQueuedMerge(MessageGuard& msgGuard);
bool processQueuedMerges(MessageGuard& msgGuard);
- void handleRendezvous(vespalib::MonitorGuard& guard);
- void rendezvousWithWorkerThread(vespalib::MonitorGuard&);
- void releaseWorkerThreadRendezvous(vespalib::MonitorGuard&);
+ void handleRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond);
+ void rendezvousWithWorkerThread(std::unique_lock<std::mutex> & guard, std::condition_variable & cond);
+ void releaseWorkerThreadRendezvous(std::unique_lock<std::mutex> & guard, std::condition_variable & cond);
bool isDiffCommand(const api::StorageMessage& msg) const;
bool isMergeCommand(const api::StorageMessage& msg) const;
bool isMergeReply(const api::StorageMessage& msg) const;
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index db4dfd0323e..7dc6b20a2ef 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -42,7 +42,7 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_nextSystemState(),
_stateListeners(),
_queuedStateRequests(),
- _threadMonitor(),
+ _threadLock(),
_lastProgressUpdateCausingSend(0),
_progressLastInitStateSend(-1),
_systemStateHistory(),
@@ -54,8 +54,7 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_notifyingListeners(false)
{
_nodeState->setMinUsedBits(58);
- _nodeState->setStartTimestamp(
- _component.getClock().getTimeInSeconds().getTime());
+ _nodeState->setStartTimestamp(_component.getClock().getTimeInSeconds().getTime());
_component.registerStatusPage(*this);
}
@@ -65,7 +64,7 @@ StateManager::~StateManager()
LOG(debug, "Deleting link %s.", toString().c_str());
if (_thread) {
LOG(debug, "onClose() not called before destructor");
- _thread->interruptAndJoin(&_threadMonitor);
+ _thread->interruptAndJoin(_threadCond);
}
}
@@ -82,7 +81,7 @@ void
StateManager::onClose()
{
if (_thread) {
- _thread->interruptAndJoin(&_threadMonitor);
+ _thread->interruptAndJoin(_threadCond);
_thread.reset();
}
sendGetNodeStateReplies();
@@ -532,14 +531,14 @@ StateManager::run(framework::ThreadHandle& thread)
{
while (true) {
thread.registerTick();
- vespalib::MonitorGuard guard(_threadMonitor);
+ std::unique_lock guard(_threadLock);
// Take lock before doing stuff, to be sure we don't wait after
// destructor have grabbed lock to stop() us.
if (thread.interrupted()) {
break;
}
tick();
- guard.wait(1000);
+ _threadCond.wait_for(guard, 1000ms);
}
}
@@ -551,8 +550,7 @@ StateManager::tick() {
}
bool
-StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime,
- uint16_t node)
+StateManager::sendGetNodeStateReplies(framework::MilliSecTime olderThanTime, uint16_t node)
{
std::vector<std::shared_ptr<api::GetNodeStateReply>> replies;
{
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index b6f43eb270e..215e344e4a4 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -55,7 +55,8 @@ class StateManager : public NodeStateUpdater,
typedef std::pair<framework::MilliSecTime,
api::GetNodeStateCommand::SP> TimeStatePair;
std::list<TimeStatePair> _queuedStateRequests;
- mutable vespalib::Monitor _threadMonitor;
+ mutable std::mutex _threadLock;
+ std::condition_variable _threadCond;
framework::MilliSecTime _lastProgressUpdateCausingSend;
vespalib::Double _progressLastInitStateSend;
using TimeSysStatePair = std::pair<framework::MilliSecTime, std::shared_ptr<const ClusterStateBundle>>;
@@ -73,7 +74,7 @@ class StateManager : public NodeStateUpdater,
public:
explicit StateManager(StorageComponentRegister&, metrics::MetricManager&,
std::unique_ptr<HostInfo>, bool testMode = false);
- ~StateManager();
+ ~StateManager() override;
void onOpen() override;
void onClose() override;
diff --git a/storage/src/vespa/storage/visiting/visitorthread.cpp b/storage/src/vespa/storage/visiting/visitorthread.cpp
index 73f4a70d80d..e1daeb729b7 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.cpp
+++ b/storage/src/vespa/storage/visiting/visitorthread.cpp
@@ -77,7 +77,8 @@ VisitorThread::VisitorThread(uint32_t threadIndex,
: _visitors(),
_recentlyCompleted(),
_queue(),
- _queueMonitor(),
+ _lock(),
+ _cond(),
_currentlyRunningVisitor(_visitors.end()),
_messageSender(sender),
_metrics(metrics),
@@ -104,13 +105,13 @@ VisitorThread::VisitorThread(uint32_t threadIndex,
VisitorThread::~VisitorThread()
{
if (_thread.get() != 0) {
- _thread->interruptAndJoin(&_queueMonitor);
+ _thread->interruptAndJoin(_cond);
}
}
void
VisitorThread::updateMetrics(const MetricLockGuard &) {
- vespalib::MonitorGuard sync(_queueMonitor);
+ std::lock_guard sync(_lock);
_metrics.queueSize.addValue(_queue.size());
}
@@ -118,27 +119,24 @@ void
VisitorThread::shutdown()
{
// Stop event thread
- if (_thread.get() != 0) {
- _thread->interruptAndJoin(&_queueMonitor);
- _thread.reset(0);
+ if (_thread) {
+ _thread->interruptAndJoin(_cond);
+ _thread.reset();
}
// Answer all queued up commands and clear queue
{
- vespalib::MonitorGuard sync(_queueMonitor);
- for (std::deque<Event>::iterator it = _queue.begin();
- it != _queue.end(); ++it)
+ std::lock_guard sync(_lock);
+ for (const Event & event : _queue)
{
- if (it->_message.get()) {
- if (!it->_message->getType().isReply()
- && (it->_message->getType() != api::MessageType::INTERNAL
- || static_cast<const api::InternalCommand&>(*it->_message)
- .getType() != PropagateVisitorConfig::ID))
+ if (event._message.get()) {
+ if (!event._message->getType().isReply()
+ && (event._message->getType() != api::MessageType::INTERNAL
+ || static_cast<const api::InternalCommand&>(*event._message).getType() != PropagateVisitorConfig::ID))
{
std::shared_ptr<api::StorageReply> reply(
- static_cast<api::StorageCommand&>(*it->_message).makeReply());
- reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED,
- "Shutting down storage node."));
+ static_cast<api::StorageCommand&>(*event._message).makeReply());
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "Shutting down storage node."));
_messageSender.send(reply);
}
}
@@ -161,16 +159,18 @@ void
VisitorThread::processMessage(api::VisitorId id,
const std::shared_ptr<api::StorageMessage>& msg)
{
- Event m(id, msg);
- vespalib::MonitorGuard sync(_queueMonitor);
- _queue.push_back(Event(id, msg));
- sync.signal();
+ {
+ Event m(id, msg);
+ std::unique_lock sync(_lock);
+ _queue.push_back(Event(id, msg));
+ }
+ _cond.notify_one();
}
VisitorThread::Event
VisitorThread::popNextQueuedEventIfAvailable()
{
- vespalib::MonitorGuard guard(_queueMonitor);
+ std::lock_guard guard(_lock);
if (!_queue.empty()) {
Event e(std::move(_queue.front()));
_queue.pop_front();
@@ -194,9 +194,9 @@ VisitorThread::run(framework::ThreadHandle& thread)
if (entry.empty()) {
// If none, give visitors something to trigger of.
tick();
- vespalib::MonitorGuard guard(_queueMonitor);
+ std::unique_lock guard(_lock);
if (_queue.empty()) {
- guard.wait(_timeBetweenTicks.load(std::memory_order_relaxed));
+ _cond.wait_for(guard, std::chrono::milliseconds(_timeBetweenTicks.load(std::memory_order_relaxed)));
thread.registerTick(framework::WAIT_CYCLE);
}
continue;
@@ -545,12 +545,13 @@ VisitorThread::onCreateVisitor(
}
void
-VisitorThread::handleMessageBusReply(mbus::Reply::UP reply,
- Visitor& visitor)
+VisitorThread::handleMessageBusReply(mbus::Reply::UP reply, Visitor& visitor)
{
- vespalib::MonitorGuard sync(_queueMonitor);
- _queue.emplace_back(visitor.getVisitorId(), std::move(reply));
- sync.broadcast();
+ {
+ std::lock_guard sync(_lock);
+ _queue.emplace_back(visitor.getVisitorId(), std::move(reply));
+ }
+ _cond.notify_all();
}
bool
diff --git a/storage/src/vespa/storage/visiting/visitorthread.h b/storage/src/vespa/storage/visiting/visitorthread.h
index 2b15468fd3f..a682544ec23 100644
--- a/storage/src/vespa/storage/visiting/visitorthread.h
+++ b/storage/src/vespa/storage/visiting/visitorthread.h
@@ -22,7 +22,6 @@
#include <vespa/storageapi/messageapi/messagehandler.h>
#include <vespa/metrics/metrictimer.h>
#include <vespa/vespalib/util/document_runnable.h>
-#include <vespa/vespalib/util/sync.h>
#include <atomic>
#include <deque>
@@ -70,8 +69,9 @@ class VisitorThread : public framework::Runnable,
}
};
- std::deque<Event> _queue;
- vespalib::Monitor _queueMonitor;
+ std::deque<Event> _queue;
+ std::mutex _lock;
+ std::condition_variable _cond;
VisitorMap::iterator _currentlyRunningVisitor;
VisitorMessageHandler& _messageSender;
@@ -99,7 +99,7 @@ public:
VisitorFactory::Map&,
VisitorThreadMetrics& metrics,
VisitorMessageHandler& sender);
- ~VisitorThread();
+ ~VisitorThread() override;
void processMessage(api::VisitorId visitorId, const std::shared_ptr<api::StorageMessage>& msg);
void shutdown();
@@ -107,7 +107,7 @@ public:
void handleMessageBusReply(std::unique_ptr<mbus::Reply> reply, Visitor& visitor);
/** For unit tests needing to pause thread. */
- vespalib::Monitor& getQueueMonitor() { return _queueMonitor; }
+ std::mutex & getQueueMonitor() { return _lock; }
const VisitorThreadMetrics& getMetrics() const noexcept {
return _metrics;
diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp
index 24ed28d077b..ee92753a23c 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp
@@ -1,18 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "thread.h"
-#include <vespa/vespalib/util/sync.h>
namespace storage::framework {
void
-Thread::interruptAndJoin(vespalib::Monitor* m)
+Thread::interruptAndJoin()
{
interrupt();
- if (m != nullptr) {
- vespalib::MonitorGuard monitorGuard(*m);
- monitorGuard.broadcast();
- }
join();
}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h
index 26c59ce1330..814686eb4fa 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/thread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h
@@ -58,7 +58,7 @@ public:
* Utility function to interrupt and join a thread, possibly broadcasting
* through a monitor after the signalling face.
*/
- void interruptAndJoin(vespalib::Monitor* m);
+ void interruptAndJoin();
void interruptAndJoin(std::condition_variable &cv);
};