diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-12-11 13:09:18 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-12-11 13:09:18 +0000 |
commit | a4a51fb869eb3c20431440a1b77e3e7ad5c81402 (patch) | |
tree | 6df6ca1986ef49767ab6e91d03de02167891ef79 /searchcore | |
parent | 361f32665f54c061640c4def02d6132f760d6026 (diff) |
Use standard locking in searchcore/proton/server (pass 2).
Diffstat (limited to 'searchcore')
8 files changed, 42 insertions, 32 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.cpp b/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.cpp index c021d786216..d5deb4482c4 100644 --- a/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.cpp @@ -9,13 +9,13 @@ using document::BucketId; using vespalib::makeClosure; using vespalib::makeTask; -using vespalib::MonitorGuard; namespace proton { -FrozenBucketsMap::FrozenBucketsMap() : - _lock(), - _map() +FrozenBucketsMap::FrozenBucketsMap() + : _lock(), + _cond(), + _map() { } FrozenBucketsMap::~FrozenBucketsMap() { @@ -25,12 +25,12 @@ FrozenBucketsMap::~FrozenBucketsMap() { void FrozenBucketsMap::freezeBucket(BucketId bucket) { - MonitorGuard guard(_lock); + std::unique_lock<std::mutex> guard(_lock); std::pair<BucketId, FrozenBucket> tryVal(std::make_pair(bucket, FrozenBucket(FrozenBucket::Reader))); std::pair<Map::iterator, bool> res; for (res = _map.insert(tryVal); !res.second && (res.first->second.isExclusive()); res = _map.insert(tryVal)) { - guard.wait(); + _cond.wait(guard); } if (!res.second) { @@ -42,7 +42,7 @@ FrozenBucketsMap::freezeBucket(BucketId bucket) { bool FrozenBucketsMap::thawBucket(BucketId bucket) { - MonitorGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); Map::iterator it(_map.find(bucket)); assert(it != _map.end()); assert(it->second.hasReaders()); @@ -52,7 +52,7 @@ FrozenBucketsMap::thawBucket(BucketId bucket) isLastAndContended = true; } _map.erase(it); - guard.broadcast(); + _cond.notify_all(); } else { it->second.removeReader(); } @@ -63,7 +63,7 @@ FrozenBucketsMap::thawBucket(BucketId bucket) IFrozenBucketHandler::ExclusiveBucketGuard::UP FrozenBucketsMap::acquireExclusiveBucket(document::BucketId bucket) { - MonitorGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); Map::iterator it(_map.find(bucket)); if (it != _map.end()) { assert(it->second.hasReaders()); @@ -77,11 +77,11 @@ FrozenBucketsMap::acquireExclusiveBucket(document::BucketId bucket) void FrozenBucketsMap::releaseExclusiveBucket(document::BucketId bucket) { - MonitorGuard guard(_lock); + std::lock_guard<std::mutex> guard(_lock); Map::const_iterator it(_map.find(bucket)); assert ((it != _map.end()) && (it->second.isExclusive())); _map.erase(it); - guard.broadcast(); + _cond.notify_all(); } FrozenBuckets::FrozenBuckets(IThreadService &masterThread) : diff --git a/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.h b/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.h index 9f389b89adf..be4d28f1e9e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.h +++ b/searchcore/src/vespa/searchcore/proton/server/frozenbuckets.h @@ -4,7 +4,9 @@ #include "ifrozenbuckethandler.h" #include "ibucketfreezer.h" -#include <vespa/vespalib/util/sync.h> +#include <mutex> +#include <condition_variable> +#include <cassert> #include <map> #include <vector> @@ -66,8 +68,9 @@ private: bool _notifyWriter; }; typedef std::map<document::BucketId, FrozenBucket> Map; - vespalib::Monitor _lock; - Map _map; + std::mutex _lock; + std::condition_variable _cond; + Map _map; }; /** diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index b6b995ee69b..8ce7e0c79fe 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -36,7 +36,6 @@ LOG_SETUP(".proton.server.proton"); using document::DocumentTypeRepo; using vespalib::FileHeader; using vespalib::IllegalStateException; -using vespalib::MonitorGuard; using vespalib::Slime; using vespalib::slime::ArrayInserter; using vespalib::slime::Cursor; @@ -195,7 +194,6 @@ Proton::Proton(const config::ConfigUri & configUri, _queryLimiter(), _clock(0.010), _threadPool(128 * 1024), - _configGenMonitor(), _configGen(0), _distributionKey(-1), _isInitializing(true), diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 532c6651cfa..8d1026340ca 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -116,7 +116,6 @@ private: matching::QueryLimiter _queryLimiter; vespalib::Clock _clock; FastOS_ThreadPool _threadPool; - vespalib::Monitor _configGenMonitor; int64_t _configGen; uint32_t _distributionKey; bool _isInitializing; diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index 7f6fdf0e06f..913efa469d3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -4,6 +4,7 @@ #include "proton.h" #include <vespa/vespalib/util/closuretask.h> #include <vespa/fnet/frt/supervisor.h> +#include <chrono> #include <vespa/log/log.h> LOG_SETUP(".proton.server.rtchooks"); @@ -35,8 +36,8 @@ RPCHooksBase::checkState(StateArg::UP arg) { fastos::TimeStamp now(fastos::ClockSystem::now()); if (now < arg->_dueTime) { - MonitorGuard guard(_stateMonitor); - if ( guard.wait(std::min(1000L, (arg->_dueTime - now)/fastos::TimeStamp::MS)) ) { + std::unique_lock<std::mutex> guard(_stateLock); + if (_stateCond.wait_for(guard, std::chrono::milliseconds(std::min(1000L, (arg->_dueTime - now)/fastos::TimeStamp::MS))) == std::cv_status::no_timeout) { LOG(debug, "state has changed"); reportState(*arg->_session, arg->_req); arg->_req->Return(); @@ -203,6 +204,8 @@ RPCHooksBase::RPCHooksBase(Params ¶ms) _docsumByRPC(new DocsumByRPC(_proton.getDocsumBySlime())), _orb(std::make_unique<FRT_Supervisor>()), _regAPI(*_orb, params.slobrok_config), + _stateLock(), + _stateCond(), _executor(48, 128 * 1024) { } @@ -225,8 +228,8 @@ RPCHooksBase::close() _orb->ShutDown(true); _executor.shutdown(); { - MonitorGuard guard(_stateMonitor); - guard.broadcast(); + std::lock_guard<std::mutex> guard(_stateLock); + _stateCond.notify_all(); } _executor.sync(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h index ab5980a39fa..994699ac480 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.h @@ -8,6 +8,8 @@ #include <vespa/vespalib/stllike/string.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/searchlib/common/packets.h> +#include <mutex> +#include <condition_variable> namespace proton { @@ -61,7 +63,8 @@ private: std::unique_ptr<DocsumByRPC> _docsumByRPC; std::unique_ptr<FRT_Supervisor> _orb; slobrok::api::RegisterAPI _regAPI; - vespalib::Monitor _stateMonitor; + std::mutex _stateLock; + std::condition_variable _stateCond; vespalib::ThreadStackExecutor _executor; void initRPC(); diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp index 95f31f141d7..a271e34b6b7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.cpp @@ -16,7 +16,8 @@ TransactionLogManagerBase::TransactionLogManagerBase( _tlc(tlsSpec), _tlcSession(), _domainName(domainName), - _replayMonitor(), + _replayLock(), + _replayCond(), _replayDone(false), _replayStarted(false), _replayStartTime(0) @@ -65,7 +66,7 @@ TransactionLogManagerBase::init() void TransactionLogManagerBase::internalStartReplay() { - vespalib::MonitorGuard guard(_replayMonitor); + std::lock_guard<std::mutex> guard(_replayLock); _replayStarted = true; _replayDone = false; FastOS_Time timer; @@ -76,23 +77,23 @@ TransactionLogManagerBase::internalStartReplay() void TransactionLogManagerBase::markReplayStarted() { - vespalib::MonitorGuard guard(_replayMonitor); + std::lock_guard<std::mutex> guard(_replayLock); _replayStarted = true; } void TransactionLogManagerBase::changeReplayDone() { - vespalib::MonitorGuard guard(_replayMonitor); + std::lock_guard<std::mutex> guard(_replayLock); _replayDone = true; - guard.broadcast(); + _replayCond.notify_all(); } void TransactionLogManagerBase::waitForReplayDone() const { - vespalib::MonitorGuard guard(_replayMonitor); + std::unique_lock<std::mutex> guard(_replayLock); while (_replayStarted && !_replayDone) { - guard.wait(); + _replayCond.wait(guard); } } @@ -115,12 +116,12 @@ TransLogClient::Visitor::UP TransactionLogManagerBase::createTlcVisitor( } bool TransactionLogManagerBase::getReplayDone() const { - vespalib::MonitorGuard guard(_replayMonitor); + std::lock_guard<std::mutex> guard(_replayLock); return _replayDone; } bool TransactionLogManagerBase::isDoingReplay() const { - vespalib::MonitorGuard guard(_replayMonitor); + std::lock_guard<std::mutex> guard(_replayLock); return _replayStarted && !_replayDone; } diff --git a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h index 1b109d8d9e1..9f4e63842cd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h +++ b/searchcore/src/vespa/searchcore/proton/server/transactionlogmanagerbase.h @@ -3,6 +3,8 @@ #pragma once #include <vespa/searchlib/transactionlog/translogclient.h> +#include <mutex> +#include <condition_variable> namespace proton { @@ -14,7 +16,8 @@ class TransactionLogManagerBase { search::transactionlog::TransLogClient _tlc; search::transactionlog::TransLogClient::Session::UP _tlcSession; vespalib::string _domainName; - vespalib::Monitor _replayMonitor; + mutable std::mutex _replayLock; + mutable std::condition_variable _replayCond; volatile bool _replayDone; bool _replayStarted; double _replayStartTime; |