summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-24 11:34:12 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-24 11:41:27 +0000
commite47d6d47edec6779f11a5160e469aef4649293d5 (patch)
tree5fc80b54aed3b328f2f21dfe2963cb0cc51cb0bf
parentfe86f187aafd0c4955789a2aac5a72f08091082f (diff)
Use standard locking in StorageLinkQueued.
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.h5
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.hpp27
2 files changed, 20 insertions, 12 deletions
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h
index 570ff262901..ee5573bcc22 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.h
+++ b/storage/src/vespa/storage/common/storagelinkqueued.h
@@ -19,6 +19,8 @@
#include <vespa/vespalib/util/document_runnable.h>
#include <deque>
#include <limits>
+#include <mutex>
+#include <condition_variable>
namespace storage {
@@ -75,7 +77,8 @@ private:
protected:
StorageLinkQueued& _parent;
unsigned int _maxQueueSize;
- vespalib::Monitor _sync;
+ std::mutex _sync;
+ std::condition_variable _syncCond;
std::deque< std::shared_ptr<Message> > _messages;
bool _replyDispatcher;
std::unique_ptr<framework::Component> _component;
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp
index 1210603980a..22c5e9ba5f2 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.hpp
+++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp
@@ -7,6 +7,7 @@
#include <vespa/storageframework/generic/component/component.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <sstream>
+#include <chrono>
namespace storage {
@@ -16,8 +17,8 @@ StorageLinkQueued::Dispatcher<Message>::terminate() {
if (_thread.get()) {
_thread->interrupt();
{
- vespalib::MonitorGuard sync(_sync);
- sync.signal();
+ std::lock_guard<std::mutex> guard(_sync);
+ _syncCond.notify_one();
}
_thread->join();
_thread.reset(0);
@@ -29,6 +30,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un
: _parent(parent),
_maxQueueSize(maxQueueSize),
_sync(),
+ _syncCond(),
_messages(),
_replyDispatcher(replyDispatcher)
{
@@ -58,26 +60,28 @@ template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::add(
const std::shared_ptr<Message>& m)
{
- vespalib::MonitorGuard sync(_sync);
+ using namespace std::chrono_literals;
+ std::unique_lock<std::mutex> guard(_sync);
if (_thread.get() == 0) start();
while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) {
- sync.wait(100);
+ _syncCond.wait_for(guard, 100ms);
}
_messages.push_back(m);
- sync.signal();
+ _syncCond.notify_one();
}
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
{
+ using namespace std::chrono_literals;
while (!h.interrupted()) {
h.registerTick(framework::PROCESS_CYCLE);
std::shared_ptr<Message> message;
{
- vespalib::MonitorGuard sync(_sync);
+ std::unique_lock<std::mutex> guard(_sync);
while (!h.interrupted() && _messages.empty()) {
- sync.wait(100);
+ _syncCond.wait_for(guard, 100ms);
h.registerTick(framework::WAIT_CYCLE);
}
if (h.interrupted()) break;
@@ -96,9 +100,9 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
{
// Since flush() only waits for stack to be empty, we must
// pop stack AFTER send have been called.
- vespalib::MonitorGuard sync(_sync);
+ std::lock_guard<std::mutex> guard(_sync);
_messages.pop_front();
- sync.signal();
+ _syncCond.notify_one();
}
}
_parent.logDebug("Finished storage link queued thread");
@@ -107,9 +111,10 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::flush()
{
- vespalib::MonitorGuard sync(_sync);
+ using namespace std::chrono_literals;
+ std::unique_lock<std::mutex> guard(_sync);
while (!_messages.empty()) {
- sync.wait(100);
+ _syncCond.wait_for(guard, 100ms);
}
}