summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-24 11:06:21 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-24 11:41:26 +0000
commitae7390c209c4ae06d14ba048ed8220af6493e732 (patch)
tree36aefbc1779f25b3c174f545379a11b6725809d4
parent49752dd142b687d74dcd00dfe1198d8933e74ee1 (diff)
Use standard locking in BucketManager.
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp33
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.h12
2 files changed, 24 insertions, 21 deletions
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 00fa5c95c9b..677ca0a6309 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -19,12 +19,13 @@
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/vespalib/stllike/hash_map.hpp>
#include <vespa/config/config.h>
-#include <unordered_map>
+#include <chrono>
#include <vespa/log/bufferedlogger.h>
LOG_SETUP(".storage.bucketdb.manager");
using document::BucketSpace;
+using namespace std::chrono_literals;
namespace storage {
@@ -33,9 +34,9 @@ BucketManager::BucketManager(const config::ConfigUri & configUri,
: StorageLinkQueued("Bucket manager", compReg),
framework::StatusReporter("bucketdb", "Bucket database"),
_configUri(configUri),
- _stateAccess(),
_bucketDBMemoryToken(),
- _workerMonitor(),
+ _workerLock(),
+ _workerCond(),
_clusterStateLock(),
_queueProcessingLock(),
_queuedReplies(),
@@ -81,7 +82,7 @@ void BucketManager::onClose()
{
// Stop internal thread such that we don't send any more messages down.
if (_thread.get() != 0) {
- _thread->interruptAndJoin(&_workerMonitor);
+ _thread->interruptAndJoin(_workerLock, _workerCond);
_thread.reset(0);
}
StorageLinkQueued::onClose();
@@ -268,7 +269,7 @@ void BucketManager::run(framework::ThreadHandle& thread)
bool didWork = false;
BucketInfoRequestMap infoReqs;
{
- vespalib::MonitorGuard monitor(_workerMonitor);
+ std::lock_guard<std::mutex> guard(_workerLock);
infoReqs.swap(_bucketInfoRequests);
}
@@ -277,12 +278,12 @@ void BucketManager::run(framework::ThreadHandle& thread)
}
{
- vespalib::MonitorGuard monitor(_workerMonitor);
+ std::unique_lock<std::mutex> guard(_workerLock);
for (const auto &req : infoReqs) {
assert(req.second.empty());
}
if (!didWork) {
- monitor.wait(1000);
+ _workerCond.wait_for(guard, 1s);
thread.registerTick(framework::WAIT_CYCLE);
} else {
thread.registerTick(framework::PROCESS_CYCLE);
@@ -393,9 +394,9 @@ bool BucketManager::onRequestBucketInfo(
LOG(debug, "Got request bucket info command");
if (cmd->getBuckets().size() == 0 && cmd->hasSystemState()) {
- vespalib::MonitorGuard monitor(_workerMonitor);
+ std::lock_guard<std::mutex> guard(_workerLock);
_bucketInfoRequests[cmd->getBucketSpace()].push_back(cmd);
- monitor.signal();
+ _workerCond.notify_all();
LOG(spam, "Scheduled request bucket info request for retrieval");
return true;
}
@@ -472,7 +473,7 @@ BucketManager::ScopedQueueDispatchGuard::~ScopedQueueDispatchGuard()
void
BucketManager::enterQueueProtectedSection()
{
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
++_requestsCurrentlyProcessing;
}
@@ -480,7 +481,7 @@ void
BucketManager::leaveQueueProtectedSection(ScopedQueueDispatchGuard& queueGuard)
{
(void) queueGuard; // Only used to enforce guard is held while calling.
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
assert(_requestsCurrentlyProcessing > 0);
// Full bucket info fetches may be concurrently interleaved with bucket-
// specific fetches outside of the processing thread. We only allow queued
@@ -529,7 +530,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
clusterState->toString().c_str(),
our_hash.c_str());
- vespalib::LockGuard lock(_clusterStateLock);
+ std::lock_guard<std::mutex> clusterStateGuard(_clusterStateLock);
for (auto it = reqs.rbegin(); it != reqs.rend(); ++it) {
// Currently small requests should not be forwarded to worker thread
assert((*it)->hasSystemState());
@@ -629,7 +630,7 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
size_t
BucketManager::bucketInfoRequestsCurrentlyProcessing() const noexcept
{
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
return _requestsCurrentlyProcessing;
}
@@ -694,7 +695,7 @@ BucketManager::onSetSystemState(
LOG(debug, "onSetSystemState(%s)", cmd->toString().c_str());
const lib::ClusterState& state(cmd->getSystemState());
std::string unified(unifyState(state));
- vespalib::LockGuard lock(_clusterStateLock);
+ std::lock_guard<std::mutex> lock(_clusterStateLock);
if (unified != _lastUnifiedClusterState
|| state.getVersion() != _lastClusterStateSeen + 1)
{
@@ -804,7 +805,7 @@ BucketManager::enqueueIfBucketHasConflicts(const api::BucketReply::SP& reply)
{
// Should very rarely contend, since persistence replies are all sent up
// via a single dispatcher thread.
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing == 0) {
return false; // Nothing to do here; pass through reply.
}
@@ -841,7 +842,7 @@ bool
BucketManager::enqueueAsConflictIfProcessingRequest(
const api::StorageReply::SP& reply)
{
- vespalib::LockGuard guard(_queueProcessingLock);
+ std::lock_guard<std::mutex> guard(_queueProcessingLock);
if (_requestsCurrentlyProcessing != 0) {
LOG(debug, "Enqueued %s due to concurrent RequestBucketInfo",
reply->toString().c_str());
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h
index 3b71230a8ed..5e4e928dfc7 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.h
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h
@@ -27,6 +27,8 @@
#include <list>
#include <unordered_map>
#include <unordered_set>
+#include <mutex>
+#include <condition_variable>
namespace storage {
@@ -45,21 +47,21 @@ private:
config::ConfigUri _configUri;
uint32_t _chunkLevel;
- mutable vespalib::Lock _stateAccess;
framework::MemoryToken::UP _bucketDBMemoryToken;
BucketInfoRequestMap _bucketInfoRequests;
/**
* We have our own thread running, which we use to send messages down.
- * Take worker monitor, add to list and signal for messages to be sent.
+ * Take worker lock, add to list and signal for messages to be sent.
*/
- mutable vespalib::Monitor _workerMonitor;
+ mutable std::mutex _workerLock;
+ std::condition_variable _workerCond;
/**
* Lock kept for access to 3 values below concerning cluster state.
*/
- vespalib::Lock _clusterStateLock;
+ std::mutex _clusterStateLock;
- vespalib::Lock _queueProcessingLock;
+ mutable std::mutex _queueProcessingLock;
using ReplyQueue = std::vector<api::StorageReply::SP>;
using ConflictingBuckets = std::unordered_set<document::BucketId,
document::BucketId::hash>;