diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-11-27 15:19:30 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-27 15:19:30 +0100 |
commit | 775cbe184dcf306b38091e07d04f798d77f8873d (patch) | |
tree | 57352a349c3bc22a2370b2c57a25a09603c3efc0 | |
parent | 6d0875eb6ea703da1d707f07047f3f406e5e4454 (diff) | |
parent | ea0cf7742e910dbb9c60cb6f3a8ac8ba92b5744e (diff) |
Merge pull request #4272 from vespa-engine/toregge/use-standard-locking-in-storage-classes
Use standard locking in storage classes
13 files changed, 94 insertions, 75 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 00fa5c95c9b..677ca0a6309 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -19,12 +19,13 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/config/config.h> -#include <unordered_map> +#include <chrono> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".storage.bucketdb.manager"); using document::BucketSpace; +using namespace std::chrono_literals; namespace storage { @@ -33,9 +34,9 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, : StorageLinkQueued("Bucket manager", compReg), framework::StatusReporter("bucketdb", "Bucket database"), _configUri(configUri), - _stateAccess(), _bucketDBMemoryToken(), - _workerMonitor(), + _workerLock(), + _workerCond(), _clusterStateLock(), _queueProcessingLock(), _queuedReplies(), @@ -81,7 +82,7 @@ void BucketManager::onClose() { // Stop internal thread such that we don't send any more messages down. if (_thread.get() != 0) { - _thread->interruptAndJoin(&_workerMonitor); + _thread->interruptAndJoin(_workerLock, _workerCond); _thread.reset(0); } StorageLinkQueued::onClose(); @@ -268,7 +269,7 @@ void BucketManager::run(framework::ThreadHandle& thread) bool didWork = false; BucketInfoRequestMap infoReqs; { - vespalib::MonitorGuard monitor(_workerMonitor); + std::lock_guard<std::mutex> guard(_workerLock); infoReqs.swap(_bucketInfoRequests); } @@ -277,12 +278,12 @@ void BucketManager::run(framework::ThreadHandle& thread) } { - vespalib::MonitorGuard monitor(_workerMonitor); + std::unique_lock<std::mutex> guard(_workerLock); for (const auto &req : infoReqs) { assert(req.second.empty()); } if (!didWork) { - monitor.wait(1000); + _workerCond.wait_for(guard, 1s); thread.registerTick(framework::WAIT_CYCLE); } else { thread.registerTick(framework::PROCESS_CYCLE); @@ -393,9 +394,9 @@ bool BucketManager::onRequestBucketInfo( LOG(debug, "Got request bucket info command"); if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { - vespalib::MonitorGuard monitor(_workerMonitor); + std::lock_guard<std::mutex> guard(_workerLock); _bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd); - monitor.signal(); + _workerCond.notify_all(); LOG(spam, "Scheduled request bucket info request for retrieval"); return true; } @@ -472,7 +473,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard() void BucketManager::enterQueueProtectedSection() { - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); ++_requestsCurrentlyProcessing; } @@ -480,7 +481,7 @@ void BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) { (void) queueGuard; // Only used to enforce guard is held while calling. - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); assert(_requestsCurrentlyProcessing > 0); // Full bucket info fetches may be concurrently interleaved with bucket- // specific fetches outside of the processing thread. We only allow queued @@ -529,7 +530,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac clusterState->toString().c_str(), our_hash.c_str()); - vespalib::LockGuard lock(_clusterStateLock); + std::lock_guard<std::mutex> clusterStateGuard(_clusterStateLock); for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) { // Currently small requests should not be forwarded to worker thread assert((*it)->hasSystemState()); @@ -629,7 +630,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac size_t BucketManager::bucketInfoRequestsCurrentlyProcessing() const noexcept { - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); return _requestsCurrentlyProcessing; } @@ -694,7 +695,7 @@ BucketManager::onSetSystemState( LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str()); const lib::ClusterState& state(cmd->getSystemState()); std::string unified(unifyState(state)); - vespalib::LockGuard lock(_clusterStateLock); + std::lock_guard<std::mutex> lock(_clusterStateLock); if (unified != _lastUnifiedClusterState || state.getVersion() != _lastClusterStateSeen + 1) { @@ -804,7 +805,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply) { // Should very rarely contend, since persistence replies are all sent up // via a single dispatcher thread. - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); if (_requestsCurrentlyProcessing == 0) { return false; // Nothing to do here; pass through reply. } @@ -841,7 +842,7 @@ bool BucketManager::enqueueAsConflictIfProcessingRequest( const api::StorageReply::SP& reply) { - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); if (_requestsCurrentlyProcessing != 0) { LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", reply->toString().c_str()); diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index 3b71230a8ed..5e4e928dfc7 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -27,6 +27,8 @@ #include <list> #include <unordered_map> #include <unordered_set> +#include <mutex> +#include <condition_variable> namespace storage { @@ -45,21 +47,21 @@ private: config::ConfigUri _configUri; uint32_t _chunkLevel; - mutable vespalib::Lock _stateAccess; framework::MemoryToken::UP _bucketDBMemoryToken; BucketInfoRequestMap _bucketInfoRequests; /** * We have our own thread running, which we use to send messages down. - * Take worker monitor, add to list and signal for messages to be sent. + * Take worker lock, add to list and signal for messages to be sent. */ - mutable vespalib::Monitor _workerMonitor; + mutable std::mutex _workerLock; + std::condition_variable _workerCond; /** * Lock kept for access to 3 values below concerning cluster state. */ - vespalib::Lock _clusterStateLock; + std::mutex _clusterStateLock; - vespalib::Lock _queueProcessingLock; + mutable std::mutex _queueProcessingLock; using ReplyQueue = std::vector<api::StorageReply::SP>; using ConflictingBuckets = std::unordered_set<document::BucketId, document::BucketId::hash>; diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp index 5ac4c4b1ee7..ad56a4083f8 100644 --- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp +++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/common/bucketoperationlogger.h> #include <vespa/vespalib/util/backtrace.h> #include <ostream> +#include <cassert> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".mapbucketdatabase"); diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp index fc2c2066b6f..1bc473308a4 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp @@ -13,11 +13,13 @@ #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/config/helper/configgetter.hpp> #include <iomanip> +#include <chrono> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".storage.bucketdb.initializer"); using document::BucketSpace; +using namespace std::chrono_literals; namespace storage { @@ -117,10 +119,11 @@ StorageBucketDBInitializer::Metrics::Metrics(framework::Component& component) StorageBucketDBInitializer::Metrics::~Metrics() {} StorageBucketDBInitializer::GlobalState::GlobalState() - : _insertedCount(0), _infoReadCount(0), - _infoSetByLoad(0), _dirsListed(0), _dirsToList(0), - _gottenInitProgress(false), _doneListing(false), - _doneInitializing(false) + : _lists(), _joins(), _infoRequests(), _replies(), + _insertedCount(0), _infoReadCount(0), + _infoSetByLoad(0), _dirsListed(0), _dirsToList(0), + _gottenInitProgress(false), _doneListing(false), + _doneInitializing(false), _workerLock(), _workerCond(), _replyLock() { } StorageBucketDBInitializer::GlobalState::~GlobalState() { } @@ -183,7 +186,7 @@ void StorageBucketDBInitializer::onClose() { if (_system._thread.get() != 0) { - _system._thread->interruptAndJoin(&_state._workerMonitor); + _system._thread->interruptAndJoin(_state._workerLock, _state._workerCond); _system._thread.reset(0); } } @@ -191,11 +194,11 @@ StorageBucketDBInitializer::onClose() void StorageBucketDBInitializer::run(framework::ThreadHandle& thread) { - vespalib::MonitorGuard monitor(_state._workerMonitor); + std::unique_lock<std::mutex> guard(_state._workerLock); while (!thread.interrupted() && !_state._doneInitializing) { std::list<api::StorageMessage::SP> replies; { - vespalib::LockGuard lock(_state._replyLock); + std::lock_guard<std::mutex> replyGuard(_state._replyLock); _state._replies.swap(replies); } for (std::list<api::StorageMessage::SP>::iterator it = replies.begin(); @@ -218,7 +221,7 @@ StorageBucketDBInitializer::run(framework::ThreadHandle& thread) updateInitProgress(); } if (replies.empty()) { - monitor.wait(10); + _state._workerCond.wait_for(guard, 10ms); thread.registerTick(framework::WAIT_CYCLE); } else { thread.registerTick(framework::PROCESS_CYCLE); @@ -258,7 +261,7 @@ void StorageBucketDBInitializer::reportHtmlStatus( std::ostream& out, const framework::HttpUrlPath&) const { - vespalib::Monitor monitor(_state._workerMonitor); + std::lock_guard<std::mutex> guard(_state._workerLock); out << "\n <h2>Config</h2>\n" << " <table>\n" << " <tr><td>Max pending info reads per disk</td><td>" @@ -583,7 +586,7 @@ StorageBucketDBInitializer::onInternalReply( case ReadBucketInfoReply::ID: case InternalBucketJoinReply::ID: { - vespalib::LockGuard lock(_state._replyLock); + std::lock_guard<std::mutex> guard(_state._replyLock); _state._replies.push_back(reply); return true; } diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h index 57b95e14f48..642fe43ad8a 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h @@ -47,11 +47,12 @@ #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/clock/timer.h> #include <vespa/vespalib/stllike/hash_map.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vdslib/state/nodestate.h> #include <vespa/config/subscription/configuri.h> #include <list> #include <unordered_map> +#include <mutex> +#include <condition_variable> namespace storage { @@ -120,9 +121,10 @@ class StorageBucketDBInitializer : public StorageLink, // This lock is held while the worker thread is working, such that // status retrieval can lock it. Listing part only grabs it when // needed to supporting listing in multiple threads - vespalib::Monitor _workerMonitor; + mutable std::mutex _workerLock; + std::condition_variable _workerCond; // This lock protects the reply list. - vespalib::Monitor _replyLock; + std::mutex _replyLock; GlobalState(); ~GlobalState(); diff --git a/storage/src/vespa/storage/common/bucketoperationlogger.cpp b/storage/src/vespa/storage/common/bucketoperationlogger.cpp index d6df7f928e2..905b704409f 100644 --- a/storage/src/vespa/storage/common/bucketoperationlogger.cpp +++ b/storage/src/vespa/storage/common/bucketoperationlogger.cpp @@ -34,7 +34,7 @@ BucketOperationLogger::log(const document::BucketId& id, bool hasError = false; { - vespalib::LockGuard lock(_logLock); + std::lock_guard<std:.mutex> guard(_logLock); BucketMapType::iterator i = _bucketMap.lower_bound(id); if (i != _bucketMap.end() && i->first == id) { if (i->second._history.size() >= MAX_ENTRIES) { @@ -145,7 +145,7 @@ void BucketOperationLogger::dumpHistoryToLog(const document::BucketId& id) const { LogWarnAppender handler; - vespalib::LockGuard lock(_logLock); + std::lock_guard<std::mutex> guard(_logLock); processHistory(*this, id, handler); } @@ -153,7 +153,7 @@ vespalib::string BucketOperationLogger::getHistory(const document::BucketId& id) const { LogStringBuilder handler; - vespalib::LockGuard lock(_logLock); + std::lock_guard<std::mutex> lock(_logLock); processHistory(*this, id, handler); return handler.ss.str(); } @@ -167,7 +167,7 @@ BucketOperationLogger::searchBucketHistories( ss << "<ul>\n"; // This may block for a while... Assuming such searches run when system // is otherwise idle. - vespalib::LockGuard lock(_logLock); + std::lock_guard<std::mutex> guard(_logLock); for (BucketMapType::const_iterator bIt(_bucketMap.begin()), bEnd(_bucketMap.end()); bIt != bEnd; ++bIt) diff --git a/storage/src/vespa/storage/common/bucketoperationlogger.h b/storage/src/vespa/storage/common/bucketoperationlogger.h index dce9334a9cf..af4b539a4c8 100644 --- a/storage/src/vespa/storage/common/bucketoperationlogger.h +++ b/storage/src/vespa/storage/common/bucketoperationlogger.h @@ -2,10 +2,10 @@ #pragma once #include <vespa/vespalib/stllike/string.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/document/bucket/bucketid.h> #include <map> #include <list> +#include <mutex> /** * Enable this to log most slotfile operations (such as all mutations) as @@ -85,7 +85,7 @@ struct BucketOperationLogger typedef std::map<document::BucketId, State> BucketMapType; - vespalib::Lock _logLock; + std::mutex _logLock; BucketMapType _bucketMap; void log(const document::BucketId& id, diff --git a/storage/src/vespa/storage/common/storagecomponent.cpp b/storage/src/vespa/storage/common/storagecomponent.cpp index a519e39e2ed..bf387240dc5 100644 --- a/storage/src/vespa/storage/common/storagecomponent.cpp +++ b/storage/src/vespa/storage/common/storagecomponent.cpp @@ -28,14 +28,14 @@ StorageComponent::setNodeInfo(vespalib::stringref clusterName, void StorageComponent::setDocumentTypeRepo(DocumentTypeRepoSP repo) { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _docTypeRepo = repo; } void StorageComponent::setLoadTypes(LoadTypeSetSP loadTypes) { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _loadTypes = loadTypes; } @@ -57,14 +57,14 @@ StorageComponent::setBucketIdFactory(const document::BucketIdFactory& factory) void StorageComponent::setDistribution(DistributionSP distribution) { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); _distribution = distribution; } void StorageComponent::setNodeStateUpdater(NodeStateUpdater& updater) { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); if (_nodeStateUpdater != 0) { throw vespalib::IllegalStateException( "Node state updater is already set", VESPA_STRLOC); @@ -87,7 +87,7 @@ StorageComponent::StorageComponent(StorageComponentRegister& compReg, NodeStateUpdater& StorageComponent::getStateUpdater() const { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); if (_nodeStateUpdater == 0) { throw vespalib::IllegalStateException( "Component need node state updater at this time, but it has " @@ -114,21 +114,21 @@ StorageComponent::getPriority(const documentapi::LoadType& lt) const StorageComponent::DocumentTypeRepoSP StorageComponent::getTypeRepo() const { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); return _docTypeRepo; } StorageComponent::LoadTypeSetSP StorageComponent::getLoadTypes() const { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); return _loadTypes; } StorageComponent::DistributionSP StorageComponent::getDistribution() const { - vespalib::LockGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); return _distribution; } diff --git a/storage/src/vespa/storage/common/storagecomponent.h b/storage/src/vespa/storage/common/storagecomponent.h index f34b6ace745..d469540b55f 100644 --- a/storage/src/vespa/storage/common/storagecomponent.h +++ b/storage/src/vespa/storage/common/storagecomponent.h @@ -35,7 +35,7 @@ #include <vespa/storageframework/generic/component/componentregister.h> #include <vespa/document/bucket/bucketidfactory.h> #include <vespa/vdslib/state/node.h> -#include <vespa/vespalib/util/sync.h> +#include <mutex> namespace vespa { namespace config { namespace content { namespace core { namespace internal { @@ -113,7 +113,7 @@ private: document::BucketIdFactory _bucketIdFactory; DistributionSP _distribution; NodeStateUpdater* _nodeStateUpdater; - vespalib::Lock _lock; + mutable std::mutex _lock; }; struct StorageComponentRegister : public virtual framework::ComponentRegister diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h index b7891cc48dc..ee5573bcc22 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.h +++ b/storage/src/vespa/storage/common/storagelinkqueued.h @@ -19,6 +19,8 @@ #include <vespa/vespalib/util/document_runnable.h> #include <deque> #include <limits> +#include <mutex> +#include <condition_variable> namespace storage { @@ -75,7 +77,8 @@ private: protected: StorageLinkQueued& _parent; unsigned int _maxQueueSize; - vespalib::Monitor _sync; + std::mutex _sync; + std::condition_variable _syncCond; std::deque< std::shared_ptr<Message> > _messages; bool _replyDispatcher; std::unique_ptr<framework::Component> _component; @@ -92,11 +95,6 @@ private: void add(const std::shared_ptr<Message>&); void flush(); - // You can use the given functions if you need to keep the - // dispatcher thread locked while you process a message. Bucket - // manager does this during bucket dumps - vespalib::Monitor& getMonitor() { return _sync; } - void addWithoutLocking(const std::shared_ptr<Message>&); virtual void send(const std::shared_ptr<Message> & ) = 0; }; diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp index b34ce58bbb7..22c5e9ba5f2 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.hpp +++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp @@ -7,6 +7,7 @@ #include <vespa/storageframework/generic/component/component.h> #include <vespa/vespalib/util/stringfmt.h> #include <sstream> +#include <chrono> namespace storage { @@ -16,8 +17,8 @@ StorageLinkQueued::Dispatcher<Message>::terminate() { if (_thread.get()) { _thread->interrupt(); { - vespalib::MonitorGuard sync(_sync); - sync.signal(); + std::lock_guard<std::mutex> guard(_sync); + _syncCond.notify_one(); } _thread->join(); _thread.reset(0); @@ -29,6 +30,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un : _parent(parent), _maxQueueSize(maxQueueSize), _sync(), + _syncCond(), _messages(), _replyDispatcher(replyDispatcher) { @@ -58,34 +60,28 @@ template<typename Message> void StorageLinkQueued::Dispatcher<Message>::add( const std::shared_ptr<Message>& m) { - vespalib::MonitorGuard sync(_sync); + using namespace std::chrono_literals; + std::unique_lock<std::mutex> guard(_sync); if (_thread.get() == 0) start(); while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) { - sync.wait(100); + _syncCond.wait_for(guard, 100ms); } _messages.push_back(m); - sync.signal(); -} - -template<typename Message> -void StorageLinkQueued::Dispatcher<Message>::addWithoutLocking( - const std::shared_ptr<Message>& m) -{ - if (_thread.get() == 0) start(); - _messages.push_back(m); + _syncCond.notify_one(); } template<typename Message> void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) { + using namespace std::chrono_literals; while (!h.interrupted()) { h.registerTick(framework::PROCESS_CYCLE); std::shared_ptr<Message> message; { - vespalib::MonitorGuard sync(_sync); + std::unique_lock<std::mutex> guard(_sync); while (!h.interrupted() && _messages.empty()) { - sync.wait(100); + _syncCond.wait_for(guard, 100ms); h.registerTick(framework::WAIT_CYCLE); } if (h.interrupted()) break; @@ -104,9 +100,9 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) { // Since flush() only waits for stack to be empty, we must // pop stack AFTER send have been called. - vespalib::MonitorGuard sync(_sync); + std::lock_guard<std::mutex> guard(_sync); _messages.pop_front(); - sync.signal(); + _syncCond.notify_one(); } } _parent.logDebug("Finished storage link queued thread"); @@ -115,9 +111,10 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) template<typename Message> void StorageLinkQueued::Dispatcher<Message>::flush() { - vespalib::MonitorGuard sync(_sync); + using namespace std::chrono_literals; + std::unique_lock<std::mutex> guard(_sync); while (!_messages.empty()) { - sync.wait(100); + _syncCond.wait_for(guard, 100ms); } } diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp index 1a88b4d2044..5ed3f7dc5e6 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp @@ -17,5 +17,16 @@ Thread::interruptAndJoin(vespalib::Monitor* m) join(); } +void +Thread::interruptAndJoin(std::mutex &m, std::condition_variable &cv) +{ + interrupt(); + { + std::lock_guard<std::mutex> guard(m); + cv.notify_all(); + } + join(); +} + } // framework } // storage diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h index ceeba79ebe2..72054ff725a 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -14,6 +14,8 @@ #include "runnable.h" #include <vespa/vespalib/stllike/string.h> +#include <mutex> +#include <condition_variable> namespace vespalib { class Monitor; @@ -58,6 +60,8 @@ public: * through a monitor after the signalling face. */ void interruptAndJoin(vespalib::Monitor* m); + + void interruptAndJoin(std::mutex &m, std::condition_variable &cv); }; } |