diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-14 13:06:26 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-14 13:06:26 +0200 |
commit | bb7814f7e0619a8b481969b407275b7b4c709385 (patch) | |
tree | 6be25744ed37230267e649a91f686f86dc626184 /document | |
parent | f479c8aba5657d961d488d6370406ebbd933e17a (diff) | |
parent | e243bfa017fa78db756edc3e90ee1ca06535340e (diff) |
Merge pull request #14847 from vespa-engine/balder/monitor-2-mutex-and-cond-4
Balder/monitor 2 mutex and cond 4
Diffstat (limited to 'document')
-rw-r--r-- | document/src/vespa/document/util/queue.h | 109 |
1 files changed, 17 insertions, 92 deletions
diff --git a/document/src/vespa/document/util/queue.h b/document/src/vespa/document/util/queue.h index 7e3c98333c8..ad5ad07b8ac 100644 --- a/document/src/vespa/document/util/queue.h +++ b/document/src/vespa/document/util/queue.h @@ -2,7 +2,7 @@ #pragma once #include <queue> -#include <vespa/vespalib/util/sync.h> +#include <mutex> #define UNUSED_PARAM(p) namespace document { @@ -16,35 +16,25 @@ class Semaphore private: int _count; int _numWaiters; - vespalib::Monitor _sync; - - // assignment would be unsafe - Semaphore& operator= (const Semaphore& other); + std::mutex _lock; + std::condition_variable _cond; public: - // XXX is it really safe to just copy other._count here? - Semaphore(const Semaphore& other) : _count(other._count), _numWaiters(0), _sync() {} - - Semaphore(int count=0) : _count(count), _numWaiters(0), _sync() { } + Semaphore(int count=0) : _count(count), _numWaiters(0), _lock() { } - virtual ~Semaphore() { - // XXX alternative: assert(_numWaiters == 0) - while (true) { - vespalib::MonitorGuard guard(_sync); - if (_numWaiters == 0) break; - _count++; - guard.signal(); - } + ~Semaphore() { + std::lock_guard guard(_lock); + assert(_numWaiters == 0); } bool wait(int ms) { bool gotSemaphore = false; - vespalib::MonitorGuard guard(_sync); + std::unique_lock guard(_lock); if (_count == 0) { _numWaiters++; // we could retry if we get a signal but not the semaphore, // but then we risk waiting longer than expected, so // just ignore the return value here. - guard.wait(ms); + _cond.wait_for(guard, std::chrono::milliseconds(ms)); _numWaiters--; } if (_count > 0) { @@ -56,10 +46,10 @@ public: } bool wait() { - vespalib::MonitorGuard guard(_sync); + std::unique_lock guard(_lock); while (_count == 0) { _numWaiters++; - guard.wait(); + _cond.wait(guard); _numWaiters--; } _count--; @@ -68,11 +58,11 @@ public: } void post() { - vespalib::MonitorGuard guard(_sync); + std::unique_lock guard(_lock); assert(_count >= 0); _count++; if (_numWaiters > 0) { - guard.signal(); + _cond.notify_one(); } } }; @@ -82,12 +72,12 @@ template <typename T, typename Q=std::queue<T> > class QueueBase { public: - QueueBase() : _cond(), _count(0), _q() { } + QueueBase() : _lock(), _count(0), _q() { } virtual ~QueueBase() { } size_t size() const { return internal_size(); } bool empty() const { return size() == 0; } protected: - vespalib::Monitor _cond; + std::mutex _lock; document::Semaphore _count; Q _q; @@ -119,7 +109,7 @@ public: (void)timeout; bool retval; { - vespalib::MonitorGuard guard(this->_cond); + std::lock_guard guard(this->_lock); retval = this->internal_push(msg); } this->_count.post(); @@ -131,77 +121,12 @@ public: this->_count.wait() : this->_count.wait(timeout)); if ( retval ) { - vespalib::MonitorGuard guard(this->_cond); + std::lock_guard guard(this->_lock); retval = this->internal_pop(msg); } return retval; } }; -template <typename T, typename Q=std::queue<T> > -class QueueWithMax : public QueueBase<T, Q> -{ -protected: - size_t _size; - size_t storesize() const { return _size; } - virtual void add(const T& UNUSED_PARAM(msg)) { _size++; } - virtual void sub(const T& UNUSED_PARAM(msg)) { _size--; } -private: - size_t _max; - size_t _lowWaterMark; - int _writersWaiting; -public: - QueueWithMax(size_t max_=1000, size_t lowWaterMark_=500) - : QueueBase<T, Q>(), - _size(0), - _max(max_), - _lowWaterMark(lowWaterMark_), - _writersWaiting(0) - { } - bool push(const T& msg, int timeout=-1) - { - bool retval=true; - { - vespalib::MonitorGuard guard(this->_cond); - if (storesize() >= _max) { - ++_writersWaiting; - if (timeout >= 0) { - retval = guard.wait(timeout); - } else { - guard.wait(); - } - --_writersWaiting; - } - if (retval) { - retval = internal_push(msg); - } - if (retval) { - add(msg); - } - } - if (retval) { - this->_count.post(); - } - return retval; - } - bool pop(T& msg, int timeout=-1) - { - bool retval((timeout == -1) ? - this->_count.wait() : - this->_count.wait(timeout)); - if ( retval ) { - vespalib::MonitorGuard guard(this->_cond); - retval = internal_pop(msg); - if (retval) { - sub(msg); - if (_writersWaiting > 0 && storesize() < _lowWaterMark) { - guard.signal(); - } - } - } - return retval; - } -}; - } // namespace document |