aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirstorli@yahoo.no>2017-11-27 15:19:30 +0100
committerGitHub <noreply@github.com>2017-11-27 15:19:30 +0100
commit775cbe184dcf306b38091e07d04f798d77f8873d (patch)
tree57352a349c3bc22a2370b2c57a25a09603c3efc0
parent6d0875eb6ea703da1d707f07047f3f406e5e4454 (diff)
parentea0cf7742e910dbb9c60cb6f3a8ac8ba92b5744e (diff)
Merge pull request #4272 from vespa-engine/toregge/use-standard-locking-in-storage-classes
Use standard locking in storage classes
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp33
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.h12
-rw-r--r--storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp1
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp23
-rw-r--r--storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h8
-rw-r--r--storage/src/vespa/storage/common/bucketoperationlogger.cpp8
-rw-r--r--storage/src/vespa/storage/common/bucketoperationlogger.h4
-rw-r--r--storage/src/vespa/storage/common/storagecomponent.cpp16
-rw-r--r--storage/src/vespa/storage/common/storagecomponent.h4
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.h10
-rw-r--r--storage/src/vespa/storage/common/storagelinkqueued.hpp35
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/thread.cpp11
-rw-r--r--storageframework/src/vespa/storageframework/generic/thread/thread.h4
13 files changed, 94 insertions, 75 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 00fa5c95c9b..677ca0a6309 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -19,12 +19,13 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/config/config.h>
-#include <unordered_map>
+#include <chrono>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".storage.bucketdb.manager");
using document::BucketSpace;
+using namespace std::chrono_literals;
namespace storage {
@@ -33,9 +34,9 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
: StorageLinkQueued("Bucket manager", compReg),
framework::StatusReporter("bucketdb", "Bucket database"),
_configUri(configUri),
- _stateAccess(),
_bucketDBMemoryToken(),
- _workerMonitor(),
+ _workerLock(),
+ _workerCond(),
_clusterStateLock(),
_queueProcessingLock(),
_queuedReplies(),
@@ -81,7 +82,7 @@ void BucketManager::onClose()
{
// Stop internal thread such that we don't send any more messages down.
if (_thread.get() != 0) {
- _thread->interruptAndJoin(&_workerMonitor);
+ _thread->interruptAndJoin(_workerLock, _workerCond);
_thread.reset(0);
}
StorageLinkQueued::onClose();
@@ -268,7 +269,7 @@ void BucketManager::run(framework::ThreadHandle& thread)
bool didWork = false;
BucketInfoRequestMap infoReqs;
{
- vespalib::MonitorGuard monitor(_workerMonitor);
+ std::lock_guard<std::mutex> guard(_workerLock);
infoReqs.swap(_bucketInfoRequests);
}
@@ -277,12 +278,12 @@ void BucketManager::run(framework::ThreadHandle& thread)
}
{
- vespalib::MonitorGuard monitor(_workerMonitor);
+ std::unique_lock<std::mutex> guard(_workerLock);
for (const auto &req : infoReqs) {
assert(req.second.empty());
}
if (!didWork) {
- monitor.wait(1000);
+ _workerCond.wait_for(guard, 1s);
thread.registerTick(framework::WAIT_CYCLE);
} else {
thread.registerTick(framework::PROCESS_CYCLE);
@@ -393,9 +394,9 @@ bool BucketManager::onRequestBucketInfo(
LOG(debug, "Got request bucket info command");
if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) {
- vespalib::MonitorGuard monitor(_workerMonitor);
+ std::lock_guard<std::mutex> guard(_workerLock);
_bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd);
- monitor.signal();
+ _workerCond.notify_all();
LOG(spam, "Scheduled request bucket info request for retrieval");
return true;
}
@@ -472,7 +473,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard()
void
BucketManager::enterQueueProtectedSection()
{
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
++_requestsCurrentlyProcessing;
}
@@ -480,7 +481,7 @@ void
BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard)
{
(void) queueGuard; // Only used to enforce guard is held while calling.
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
assert(_requestsCurrentlyProcessing > 0);
// Full bucket info fetches may be concurrently interleaved with bucket-
// specific fetches outside of the processing thread. We only allow queued
@@ -529,7 +530,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
clusterState->toString().c_str(),
our_hash.c_str());
- vespalib::LockGuard lock(_clusterStateLock);
+ std::lock_guard<std::mutex> clusterStateGuard(_clusterStateLock);
for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) {
// Currently small requests should not be forwarded to worker thread
assert((*it)->hasSystemState());
@@ -629,7 +630,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
size_t
BucketManager::bucketInfoRequestsCurrentlyProcessing() const noexcept
{
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
return _requestsCurrentlyProcessing;
}
@@ -694,7 +695,7 @@ BucketManager::onSetSystemState(
LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str());
const lib::ClusterState& state(cmd->getSystemState());
std::string unified(unifyState(state));
- vespalib::LockGuard lock(_clusterStateLock);
+ std::lock_guard<std::mutex> lock(_clusterStateLock);
if (unified != _lastUnifiedClusterState
|| state.getVersion() != _lastClusterStateSeen + 1)
{
@@ -804,7 +805,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply)
{
// Should very rarely contend, since persistence replies are all sent up
// via a single dispatcher thread.
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing == 0) {
return false; // Nothing to do here; pass through reply.
}
@@ -841,7 +842,7 @@ bool
BucketManager::enqueueAsConflictIfProcessingRequest(
const api::StorageReply::SP& reply)
{
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing != 0) {
LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo",
reply->toString().c_str());
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h
index 3b71230a8ed..5e4e928dfc7 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.h
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h
@@ -27,6 +27,8 @@
#include <list>
#include <unordered_map>
#include <unordered_set>
+#include <mutex>
+#include <condition_variable>
namespace storage {
@@ -45,21 +47,21 @@ private:
config::ConfigUri _configUri;
uint32_t _chunkLevel;
- mutable vespalib::Lock _stateAccess;
framework::MemoryToken::UP _bucketDBMemoryToken;
BucketInfoRequestMap _bucketInfoRequests;
/**
* We have our own thread running, which we use to send messages down.
- * Take worker monitor, add to list and signal for messages to be sent.
+ * Take worker lock, add to list and signal for messages to be sent.
*/
- mutable vespalib::Monitor _workerMonitor;
+ mutable std::mutex _workerLock;
+ std::condition_variable _workerCond;
/**
* Lock kept for access to 3 values below concerning cluster state.
*/
- vespalib::Lock _clusterStateLock;
+ std::mutex _clusterStateLock;
- vespalib::Lock _queueProcessingLock;
+ mutable std::mutex _queueProcessingLock;
using ReplyQueue = std::vector<api::StorageReply::SP>;
using ConflictingBuckets = std::unordered_set<document::BucketId,
document::BucketId::hash>;
diff --git a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
index 5ac4c4b1ee7..ad56a4083f8 100644
--- a/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
+++ b/storage/src/vespa/storage/bucketdb/mapbucketdatabase.cpp
@@ -4,6 +4,7 @@
#include <vespa/storage/common/bucketoperationlogger.h>
#include <vespa/vespalib/util/backtrace.h>
#include <ostream>
+#include <cassert>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".mapbucketdatabase");
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
index fc2c2066b6f..1bc473308a4 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
+++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp
@@ -13,11 +13,13 @@
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/config/helper/configgetter.hpp>
#include <iomanip>
+#include <chrono>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".storage.bucketdb.initializer");
using document::BucketSpace;
+using namespace std::chrono_literals;
namespace storage {
@@ -117,10 +119,11 @@ StorageBucketDBInitializer::Metrics::Metrics(framework::Component& component)
StorageBucketDBInitializer::Metrics::~Metrics() {}
StorageBucketDBInitializer::GlobalState::GlobalState()
- : _insertedCount(0), _infoReadCount(0),
- _infoSetByLoad(0), _dirsListed(0), _dirsToList(0),
- _gottenInitProgress(false), _doneListing(false),
- _doneInitializing(false)
+ : _lists(), _joins(), _infoRequests(), _replies(),
+ _insertedCount(0), _infoReadCount(0),
+ _infoSetByLoad(0), _dirsListed(0), _dirsToList(0),
+ _gottenInitProgress(false), _doneListing(false),
+ _doneInitializing(false), _workerLock(), _workerCond(), _replyLock()
{ }
StorageBucketDBInitializer::GlobalState::~GlobalState() { }
@@ -183,7 +186,7 @@ void
StorageBucketDBInitializer::onClose()
{
if (_system._thread.get() != 0) {
- _system._thread->interruptAndJoin(&_state._workerMonitor);
+ _system._thread->interruptAndJoin(_state._workerLock, _state._workerCond);
_system._thread.reset(0);
}
}
@@ -191,11 +194,11 @@ StorageBucketDBInitializer::onClose()
void
StorageBucketDBInitializer::run(framework::ThreadHandle& thread)
{
- vespalib::MonitorGuard monitor(_state._workerMonitor);
+ std::unique_lock<std::mutex> guard(_state._workerLock);
while (!thread.interrupted() && !_state._doneInitializing) {
std::list<api::StorageMessage::SP> replies;
{
- vespalib::LockGuard lock(_state._replyLock);
+ std::lock_guard<std::mutex> replyGuard(_state._replyLock);
_state._replies.swap(replies);
}
for (std::list<api::StorageMessage::SP>::iterator it = replies.begin();
@@ -218,7 +221,7 @@ StorageBucketDBInitializer::run(framework::ThreadHandle& thread)
updateInitProgress();
}
if (replies.empty()) {
- monitor.wait(10);
+ _state._workerCond.wait_for(guard, 10ms);
thread.registerTick(framework::WAIT_CYCLE);
} else {
thread.registerTick(framework::PROCESS_CYCLE);
@@ -258,7 +261,7 @@ void
StorageBucketDBInitializer::reportHtmlStatus(
std::ostream& out, const framework::HttpUrlPath&) const
{
- vespalib::Monitor monitor(_state._workerMonitor);
+ std::lock_guard<std::mutex> guard(_state._workerLock);
out << "\n <h2>Config</h2>\n"
<< " <table>\n"
<< " <tr><td>Max pending info reads per disk</td><td>"
@@ -583,7 +586,7 @@ StorageBucketDBInitializer::onInternalReply(
case ReadBucketInfoReply::ID:
case InternalBucketJoinReply::ID:
{
- vespalib::LockGuard lock(_state._replyLock);
+ std::lock_guard<std::mutex> guard(_state._replyLock);
_state._replies.push_back(reply);
return true;
}
diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
index 57b95e14f48..642fe43ad8a 100644
--- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
+++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.h
@@ -47,11 +47,12 @@
#include <vespa/storageframework/generic/status/htmlstatusreporter.h>
#include <vespa/storageframework/generic/clock/timer.h>
#include <vespa/vespalib/stllike/hash_map.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vdslib/state/nodestate.h>
#include <vespa/config/subscription/configuri.h>
#include <list>
#include <unordered_map>
+#include <mutex>
+#include <condition_variable>
namespace storage {
@@ -120,9 +121,10 @@ class StorageBucketDBInitializer : public StorageLink,
// This lock is held while the worker thread is working, such that
// status retrieval can lock it. Listing part only grabs it when
// needed to supporting listing in multiple threads
- vespalib::Monitor _workerMonitor;
+ mutable std::mutex _workerLock;
+ std::condition_variable _workerCond;
// This lock protects the reply list.
- vespalib::Monitor _replyLock;
+ std::mutex _replyLock;
GlobalState();
~GlobalState();
diff --git a/storage/src/vespa/storage/common/bucketoperationlogger.cpp b/storage/src/vespa/storage/common/bucketoperationlogger.cpp
index d6df7f928e2..905b704409f 100644
--- a/storage/src/vespa/storage/common/bucketoperationlogger.cpp
+++ b/storage/src/vespa/storage/common/bucketoperationlogger.cpp
@@ -34,7 +34,7 @@ BucketOperationLogger::log(const document::BucketId& id,
bool hasError = false;
{
- vespalib::LockGuard lock(_logLock);
+ std::lock_guard<std:.mutex> guard(_logLock);
BucketMapType::iterator i = _bucketMap.lower_bound(id);
if (i != _bucketMap.end() && i->first == id) {
if (i->second._history.size() >= MAX_ENTRIES) {
@@ -145,7 +145,7 @@ void
BucketOperationLogger::dumpHistoryToLog(const document::BucketId& id) const
{
LogWarnAppender handler;
- vespalib::LockGuard lock(_logLock);
+ std::lock_guard<std::mutex> guard(_logLock);
processHistory(*this, id, handler);
}
@@ -153,7 +153,7 @@ vespalib::string
BucketOperationLogger::getHistory(const document::BucketId& id) const
{
LogStringBuilder handler;
- vespalib::LockGuard lock(_logLock);
+ std::lock_guard<std::mutex> lock(_logLock);
processHistory(*this, id, handler);
return handler.ss.str();
}
@@ -167,7 +167,7 @@ BucketOperationLogger::searchBucketHistories(
ss << "<ul>\n";
// This may block for a while... Assuming such searches run when system
// is otherwise idle.
- vespalib::LockGuard lock(_logLock);
+ std::lock_guard<std::mutex> guard(_logLock);
for (BucketMapType::const_iterator
bIt(_bucketMap.begin()), bEnd(_bucketMap.end());
bIt != bEnd; ++bIt)
diff --git a/storage/src/vespa/storage/common/bucketoperationlogger.h b/storage/src/vespa/storage/common/bucketoperationlogger.h
index dce9334a9cf..af4b539a4c8 100644
--- a/storage/src/vespa/storage/common/bucketoperationlogger.h
+++ b/storage/src/vespa/storage/common/bucketoperationlogger.h
@@ -2,10 +2,10 @@
#pragma once
#include <vespa/vespalib/stllike/string.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/document/bucket/bucketid.h>
#include <map>
#include <list>
+#include <mutex>
/**
* Enable this to log most slotfile operations (such as all mutations) as
@@ -85,7 +85,7 @@ struct BucketOperationLogger
typedef std::map<document::BucketId, State> BucketMapType;
- vespalib::Lock _logLock;
+ std::mutex _logLock;
BucketMapType _bucketMap;
void log(const document::BucketId& id,
diff --git a/storage/src/vespa/storage/common/storagecomponent.cpp b/storage/src/vespa/storage/common/storagecomponent.cpp
index a519e39e2ed..bf387240dc5 100644
--- a/storage/src/vespa/storage/common/storagecomponent.cpp
+++ b/storage/src/vespa/storage/common/storagecomponent.cpp
@@ -28,14 +28,14 @@ StorageComponent::setNodeInfo(vespalib::stringref clusterName,
void
StorageComponent::setDocumentTypeRepo(DocumentTypeRepoSP repo)
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
_docTypeRepo = repo;
}
void
StorageComponent::setLoadTypes(LoadTypeSetSP loadTypes)
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
_loadTypes = loadTypes;
}
@@ -57,14 +57,14 @@ StorageComponent::setBucketIdFactory(const document::BucketIdFactory& factory)
void
StorageComponent::setDistribution(DistributionSP distribution)
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
_distribution = distribution;
}
void
StorageComponent::setNodeStateUpdater(NodeStateUpdater& updater)
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
if (_nodeStateUpdater != 0) {
throw vespalib::IllegalStateException(
"Node state updater is already set", VESPA_STRLOC);
@@ -87,7 +87,7 @@ StorageComponent::StorageComponent(StorageComponentRegister& compReg,
NodeStateUpdater&
StorageComponent::getStateUpdater() const
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
if (_nodeStateUpdater == 0) {
throw vespalib::IllegalStateException(
"Component need node state updater at this time, but it has "
@@ -114,21 +114,21 @@ StorageComponent::getPriority(const documentapi::LoadType& lt) const
StorageComponent::DocumentTypeRepoSP
StorageComponent::getTypeRepo() const
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
return _docTypeRepo;
}
StorageComponent::LoadTypeSetSP
StorageComponent::getLoadTypes() const
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
return _loadTypes;
}
StorageComponent::DistributionSP
StorageComponent::getDistribution() const
{
- vespalib::LockGuard guard(_lock);
+ std::lock_guard<std::mutex> guard(_lock);
return _distribution;
}
diff --git a/storage/src/vespa/storage/common/storagecomponent.h b/storage/src/vespa/storage/common/storagecomponent.h
index f34b6ace745..d469540b55f 100644
--- a/storage/src/vespa/storage/common/storagecomponent.h
+++ b/storage/src/vespa/storage/common/storagecomponent.h
@@ -35,7 +35,7 @@
#include <vespa/storageframework/generic/component/componentregister.h>
#include <vespa/document/bucket/bucketidfactory.h>
#include <vespa/vdslib/state/node.h>
-#include <vespa/vespalib/util/sync.h>
+#include <mutex>
namespace vespa { namespace config { namespace content { namespace core {
namespace internal {
@@ -113,7 +113,7 @@ private:
document::BucketIdFactory _bucketIdFactory;
DistributionSP _distribution;
NodeStateUpdater* _nodeStateUpdater;
- vespalib::Lock _lock;
+ mutable std::mutex _lock;
};
struct StorageComponentRegister : public virtual framework::ComponentRegister
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.h b/storage/src/vespa/storage/common/storagelinkqueued.h
index b7891cc48dc..ee5573bcc22 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.h
+++ b/storage/src/vespa/storage/common/storagelinkqueued.h
@@ -19,6 +19,8 @@
#include <vespa/vespalib/util/document_runnable.h>
#include <deque>
#include <limits>
+#include <mutex>
+#include <condition_variable>
namespace storage {
@@ -75,7 +77,8 @@ private:
protected:
StorageLinkQueued& _parent;
unsigned int _maxQueueSize;
- vespalib::Monitor _sync;
+ std::mutex _sync;
+ std::condition_variable _syncCond;
std::deque< std::shared_ptr<Message> > _messages;
bool _replyDispatcher;
std::unique_ptr<framework::Component> _component;
@@ -92,11 +95,6 @@ private:
void add(const std::shared_ptr<Message>&);
void flush();
- // You can use the given functions if you need to keep the
- // dispatcher thread locked while you process a message. Bucket
- // manager does this during bucket dumps
- vespalib::Monitor& getMonitor() { return _sync; }
- void addWithoutLocking(const std::shared_ptr<Message>&);
virtual void send(const std::shared_ptr<Message> & ) = 0;
};
diff --git a/storage/src/vespa/storage/common/storagelinkqueued.hpp b/storage/src/vespa/storage/common/storagelinkqueued.hpp
index b34ce58bbb7..22c5e9ba5f2 100644
--- a/storage/src/vespa/storage/common/storagelinkqueued.hpp
+++ b/storage/src/vespa/storage/common/storagelinkqueued.hpp
@@ -7,6 +7,7 @@
#include <vespa/storageframework/generic/component/component.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <sstream>
+#include <chrono>
namespace storage {
@@ -16,8 +17,8 @@ StorageLinkQueued::Dispatcher<Message>::terminate() {
if (_thread.get()) {
_thread->interrupt();
{
- vespalib::MonitorGuard sync(_sync);
- sync.signal();
+ std::lock_guard<std::mutex> guard(_sync);
+ _syncCond.notify_one();
}
_thread->join();
_thread.reset(0);
@@ -29,6 +30,7 @@ StorageLinkQueued::Dispatcher<Message>::Dispatcher(StorageLinkQueued& parent, un
: _parent(parent),
_maxQueueSize(maxQueueSize),
_sync(),
+ _syncCond(),
_messages(),
_replyDispatcher(replyDispatcher)
{
@@ -58,34 +60,28 @@ template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::add(
const std::shared_ptr<Message>& m)
{
- vespalib::MonitorGuard sync(_sync);
+ using namespace std::chrono_literals;
+ std::unique_lock<std::mutex> guard(_sync);
if (_thread.get() == 0) start();
while ((_messages.size() > _maxQueueSize) && !_thread->interrupted()) {
- sync.wait(100);
+ _syncCond.wait_for(guard, 100ms);
}
_messages.push_back(m);
- sync.signal();
-}
-
-template<typename Message>
-void StorageLinkQueued::Dispatcher<Message>::addWithoutLocking(
- const std::shared_ptr<Message>& m)
-{
- if (_thread.get() == 0) start();
- _messages.push_back(m);
+ _syncCond.notify_one();
}
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
{
+ using namespace std::chrono_literals;
while (!h.interrupted()) {
h.registerTick(framework::PROCESS_CYCLE);
std::shared_ptr<Message> message;
{
- vespalib::MonitorGuard sync(_sync);
+ std::unique_lock<std::mutex> guard(_sync);
while (!h.interrupted() && _messages.empty()) {
- sync.wait(100);
+ _syncCond.wait_for(guard, 100ms);
h.registerTick(framework::WAIT_CYCLE);
}
if (h.interrupted()) break;
@@ -104,9 +100,9 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
{
// Since flush() only waits for stack to be empty, we must
// pop stack AFTER send have been called.
- vespalib::MonitorGuard sync(_sync);
+ std::lock_guard<std::mutex> guard(_sync);
_messages.pop_front();
- sync.signal();
+ _syncCond.notify_one();
}
}
_parent.logDebug("Finished storage link queued thread");
@@ -115,9 +111,10 @@ void StorageLinkQueued::Dispatcher<Message>::run(framework::ThreadHandle& h)
template<typename Message>
void StorageLinkQueued::Dispatcher<Message>::flush()
{
- vespalib::MonitorGuard sync(_sync);
+ using namespace std::chrono_literals;
+ std::unique_lock<std::mutex> guard(_sync);
while (!_messages.empty()) {
- sync.wait(100);
+ _syncCond.wait_for(guard, 100ms);
}
}
diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp
index 1a88b4d2044..5ed3f7dc5e6 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp
+++ b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp
@@ -17,5 +17,16 @@ Thread::interruptAndJoin(vespalib::Monitor* m)
join();
}
+void
+Thread::interruptAndJoin(std::mutex &m, std::condition_variable &cv)
+{
+ interrupt();
+ {
+ std::lock_guard<std::mutex> guard(m);
+ cv.notify_all();
+ }
+ join();
+}
+
} // framework
} // storage
diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h
index ceeba79ebe2..72054ff725a 100644
--- a/storageframework/src/vespa/storageframework/generic/thread/thread.h
+++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h
@@ -14,6 +14,8 @@
#include "runnable.h"
#include <vespa/vespalib/stllike/string.h>
+#include <mutex>
+#include <condition_variable>
namespace vespalib {
class Monitor;
@@ -58,6 +60,8 @@ public:
* through a monitor after the signalling face.
*/
void interruptAndJoin(vespalib::Monitor* m);
+
+ void interruptAndJoin(std::mutex &m, std::condition_variable &cv);
};
}