diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-24 11:34:12 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-24 11:41:27 +0000 |
commit | e47d6d47edec6779f11a5160e469aef4649293d5 (patch) | |
tree | 5fc80b54aed3b328f2f21dfe2963cb0cc51cb0bf /storage | |
parent | fe86f187aafd0c4955789a2aac5a72f08091082f (diff) |
Use standard locking in StorageLinkQueued.
Diffstat (limited to 'storage')
-rw-r--r-- | storage/src/vespa/storage/common/storagelinkqueued.h | 5 | ||||
-rw-r--r-- | storage/src/vespa/storage/common/storagelinkqueued.hpp | 27 |
2 files changed, 20 insertions, 12 deletions
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h index 570ff262901..ee5573bcc22 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.h +++ b/storage/src/vespa/storage/common/storagelinkqueued.h @@ -19,6 +19,8 @@ #include <vespa/vespalib/util/document_runnable.h> #include <deque> #include <limits> +#include <mutex> +#include <condition_variable> namespace storage { @@ -75,7 +77,8 @@ private: protected: StorageLinkQueued& _parent; unsigned int _maxQueueSize; - vespalib::Monitor _sync; + std::mutex _sync; + std::condition_variable _syncCond; std::deque< std::shared_ptr<Message> > _messages; bool _replyDispatcher; std::unique_ptr<framework::Component> _component; diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp index 1210603980a..22c5e9ba5f2 100644 --- a/storage/src/vespa/storage/common/storagelinkqueued.hpp +++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp @@ -7,6 +7,7 @@ #include <vespa/storageframework/generic/component/component.h> #include <vespa/vespalib/util/stringfmt.h> #include <sstream> +#include <chrono> namespace storage { @@ -16,8 +17,8 @@ StorageLinkQueued::Dispatcher<Message>::terminate() { if (_thread.get()) { _thread->interrupt(); { - vespalib::MonitorGuard sync(_sync); - sync.signal(); + std::lock_guard<std::mutex> guard(_sync); + _syncCond.notify_one(); } _thread->join(); _thread.reset(0); @@ -29,6 +30,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un : _parent(parent), _maxQueueSize(maxQueueSize), _sync(), + _syncCond(), _messages(), _replyDispatcher(replyDispatcher) { @@ -58,26 +60,28 @@ template<typename Message> void StorageLinkQueued::Dispatcher<Message>::add( const std::shared_ptr<Message>& m) { - vespalib::MonitorGuard sync(_sync); + using namespace std::chrono_literals; + std::unique_lock<std::mutex> guard(_sync); if (_thread.get() == 0) start(); while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) { - sync.wait(100); + _syncCond.wait_for(guard, 100ms); } _messages.push_back(m); - sync.signal(); + _syncCond.notify_one(); } template<typename Message> void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) { + using namespace std::chrono_literals; while (!h.interrupted()) { h.registerTick(framework::PROCESS_CYCLE); std::shared_ptr<Message> message; { - vespalib::MonitorGuard sync(_sync); + std::unique_lock<std::mutex> guard(_sync); while (!h.interrupted() && _messages.empty()) { - sync.wait(100); + _syncCond.wait_for(guard, 100ms); h.registerTick(framework::WAIT_CYCLE); } if (h.interrupted()) break; @@ -96,9 +100,9 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) { // Since flush() only waits for stack to be empty, we must // pop stack AFTER send have been called. - vespalib::MonitorGuard sync(_sync); + std::lock_guard<std::mutex> guard(_sync); _messages.pop_front(); - sync.signal(); + _syncCond.notify_one(); } } _parent.logDebug("Finished storage link queued thread"); @@ -107,9 +111,10 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h) template<typename Message> void StorageLinkQueued::Dispatcher<Message>::flush() { - vespalib::MonitorGuard sync(_sync); + using namespace std::chrono_literals; + std::unique_lock<std::mutex> guard(_sync); while (!_messages.empty()) { - sync.wait(100); + _syncCond.wait_for(guard, 100ms); } } |