diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-24 11:06:21 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-24 11:41:26 +0000 |
commit | ae7390c209c4ae06d14ba048ed8220af6493e732 (patch) | |
tree | 36aefbc1779f25b3c174f545379a11b6725809d4 | |
parent | 49752dd142b687d74dcd00dfe1198d8933e74ee1 (diff) |
Use standard locking in BucketManager.
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.cpp | 33 | ||||
-rw-r--r-- | storage/src/vespa/storage/bucketdb/bucketmanager.h | 12 |
2 files changed, 24 insertions, 21 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 00fa5c95c9b..677ca0a6309 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -19,12 +19,13 @@ #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/config/config.h> -#include <unordered_map> +#include <chrono> #include <vespa/log/bufferedlogger.h> LOG_SETUP(".storage.bucketdb.manager"); using document::BucketSpace; +using namespace std::chrono_literals; namespace storage { @@ -33,9 +34,9 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, : StorageLinkQueued("Bucket manager", compReg), framework::StatusReporter("bucketdb", "Bucket database"), _configUri(configUri), - _stateAccess(), _bucketDBMemoryToken(), - _workerMonitor(), + _workerLock(), + _workerCond(), _clusterStateLock(), _queueProcessingLock(), _queuedReplies(), @@ -81,7 +82,7 @@ void BucketManager::onClose() { // Stop internal thread such that we don't send any more messages down. if (_thread.get() != 0) { - _thread->interruptAndJoin(&_workerMonitor); + _thread->interruptAndJoin(_workerLock, _workerCond); _thread.reset(0); } StorageLinkQueued::onClose(); @@ -268,7 +269,7 @@ void BucketManager::run(framework::ThreadHandle& thread) bool didWork = false; BucketInfoRequestMap infoReqs; { - vespalib::MonitorGuard monitor(_workerMonitor); + std::lock_guard<std::mutex> guard(_workerLock); infoReqs.swap(_bucketInfoRequests); } @@ -277,12 +278,12 @@ void BucketManager::run(framework::ThreadHandle& thread) } { - vespalib::MonitorGuard monitor(_workerMonitor); + std::unique_lock<std::mutex> guard(_workerLock); for (const auto &req : infoReqs) { assert(req.second.empty()); } if (!didWork) { - monitor.wait(1000); + _workerCond.wait_for(guard, 1s); thread.registerTick(framework::WAIT_CYCLE); } else { thread.registerTick(framework::PROCESS_CYCLE); @@ -393,9 +394,9 @@ bool BucketManager::onRequestBucketInfo( LOG(debug, "Got request bucket info command"); if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) { - vespalib::MonitorGuard monitor(_workerMonitor); + std::lock_guard<std::mutex> guard(_workerLock); _bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd); - monitor.signal(); + _workerCond.notify_all(); LOG(spam, "Scheduled request bucket info request for retrieval"); return true; } @@ -472,7 +473,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard() void BucketManager::enterQueueProtectedSection() { - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); ++_requestsCurrentlyProcessing; } @@ -480,7 +481,7 @@ void BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard) { (void) queueGuard; // Only used to enforce guard is held while calling. - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); assert(_requestsCurrentlyProcessing > 0); // Full bucket info fetches may be concurrently interleaved with bucket- // specific fetches outside of the processing thread. We only allow queued @@ -529,7 +530,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac clusterState->toString().c_str(), our_hash.c_str()); - vespalib::LockGuard lock(_clusterStateLock); + std::lock_guard<std::mutex> clusterStateGuard(_clusterStateLock); for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) { // Currently small requests should not be forwarded to worker thread assert((*it)->hasSystemState()); @@ -629,7 +630,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac size_t BucketManager::bucketInfoRequestsCurrentlyProcessing() const noexcept { - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); return _requestsCurrentlyProcessing; } @@ -694,7 +695,7 @@ BucketManager::onSetSystemState( LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str()); const lib::ClusterState& state(cmd->getSystemState()); std::string unified(unifyState(state)); - vespalib::LockGuard lock(_clusterStateLock); + std::lock_guard<std::mutex> lock(_clusterStateLock); if (unified != _lastUnifiedClusterState || state.getVersion() != _lastClusterStateSeen + 1) { @@ -804,7 +805,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply) { // Should very rarely contend, since persistence replies are all sent up // via a single dispatcher thread. - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); if (_requestsCurrentlyProcessing == 0) { return false; // Nothing to do here; pass through reply. } @@ -841,7 +842,7 @@ bool BucketManager::enqueueAsConflictIfProcessingRequest( const api::StorageReply::SP& reply) { - vespalib::LockGuard guard(_queueProcessingLock); + std::lock_guard<std::mutex> guard(_queueProcessingLock); if (_requestsCurrentlyProcessing != 0) { LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo", reply->toString().c_str()); diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index 3b71230a8ed..5e4e928dfc7 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -27,6 +27,8 @@ #include <list> #include <unordered_map> #include <unordered_set> +#include <mutex> +#include <condition_variable> namespace storage { @@ -45,21 +47,21 @@ private: config::ConfigUri _configUri; uint32_t _chunkLevel; - mutable vespalib::Lock _stateAccess; framework::MemoryToken::UP _bucketDBMemoryToken; BucketInfoRequestMap _bucketInfoRequests; /** * We have our own thread running, which we use to send messages down. - * Take worker monitor, add to list and signal for messages to be sent. + * Take worker lock, add to list and signal for messages to be sent. */ - mutable vespalib::Monitor _workerMonitor; + mutable std::mutex _workerLock; + std::condition_variable _workerCond; /** * Lock kept for access to 3 values below concerning cluster state. */ - vespalib::Lock _clusterStateLock; + std::mutex _clusterStateLock; - vespalib::Lock _queueProcessingLock; + mutable std::mutex _queueProcessingLock; using ReplyQueue = std::vector<api::StorageReply::SP>; using ConflictingBuckets = std::unordered_set<document::BucketId, document::BucketId::hash>; |