diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-12 18:27:24 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-12 18:27:24 +0000 |
commit | e072003cb1cc8d863ed0766fac78e6fb46b80176 (patch) | |
tree | 140ead97b0d81168f76a13f4db773b32efae1bae | |
parent | 01de6039781e8064d0846b41843b1b26133319cc (diff) |
vespalib::Monitor -> std::mutex and std::condition_variable
11 files changed, 53 insertions, 54 deletions
diff --git a/storage/src/tests/common/metricstest.cpp b/storage/src/tests/common/metricstest.cpp index 1e0144e9efb..c2e937da130 100644 --- a/storage/src/tests/common/metricstest.cpp +++ b/storage/src/tests/common/metricstest.cpp @@ -13,7 +13,6 @@ #include <vespa/config/common/exceptions.h> #include <vespa/vespalib/stllike/hash_map.hpp> #include <vespa/vespalib/gtest/gtest.h> -#include <vespa/vespalib/util/time.h> #include <gmock/gmock.h> #include <thread> diff --git a/storage/src/tests/persistence/persistencetestutils.h b/storage/src/tests/persistence/persistencetestutils.h index d4cbd896d11..d889deabbd5 100644 --- a/storage/src/tests/persistence/persistencetestutils.h +++ b/storage/src/tests/persistence/persistencetestutils.h @@ -123,7 +123,7 @@ public: return tracker->getResult(); } std::shared_ptr<api::StorageMessage> msg; - _replySender.queue.getNext(msg, 60000); + _replySender.queue.getNext(msg, 60s); return dynamic_cast<api::StorageReply &>(*msg).getResult(); } diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 9c2d6da39cb..39dae51721a 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -82,7 +82,7 @@ void BucketManager::onClose() { // Stop internal thread such that we don't send any more messages down. if (_thread) { - _thread->interruptAndJoin(_workerLock, _workerCond); + _thread->interruptAndJoin(_workerCond); _thread.reset(); } } diff --git a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp index ed83ad268e5..c5d70fda1ad 100644 --- a/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp +++ b/storage/src/vespa/storage/bucketdb/storagebucketdbinitializer.cpp @@ -187,9 +187,9 @@ StorageBucketDBInitializer::onOpen() void StorageBucketDBInitializer::onClose() { - if (_system._thread.get() != 0) { - _system._thread->interruptAndJoin(_state._workerLock, _state._workerCond); - _system._thread.reset(0); + if (_system._thread) { + _system._thread->interruptAndJoin(_state._workerCond); + _system._thread.reset(); } } diff --git a/storage/src/vespa/storage/common/storagelink.cpp b/storage/src/vespa/storage/common/storagelink.cpp index 0b6476909a3..8e889e5cc0b 100644 --- a/storage/src/vespa/storage/common/storagelink.cpp +++ b/storage/src/vespa/storage/common/storagelink.cpp @@ -241,7 +241,8 @@ StorageLink::stateToString(State state) } } -std::ostream& operator<<(std::ostream& out, StorageLink& link) { +std::ostream& +operator<<(std::ostream& out, StorageLink& link) { link.printChain(out); return out; } @@ -249,8 +250,9 @@ std::ostream& operator<<(std::ostream& out, StorageLink& link) { Queue::Queue() = default; Queue::~Queue() = default; -bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { - vespalib::MonitorGuard sync(_queueMonitor); +bool +Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, vespalib::duration timeout) { + std::unique_lock sync(_lock); bool first = true; while (true) { // Max twice if (!_queue.empty()) { @@ -259,29 +261,33 @@ bool Queue::getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout) { _queue.pop(); return true; } - if (timeout == 0 || !first) { + if ((timeout == vespalib::duration::zero()) || !first) { return false; } - sync.wait(timeout); + _cond.wait_for(sync, timeout); first = false; } return false; } -void Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { - vespalib::MonitorGuard sync(_queueMonitor); - _queue.emplace(std::move(msg)); - sync.unsafeSignalUnlock(); +void +Queue::enqueue(std::shared_ptr<api::StorageMessage> msg) { + { + std::lock_guard sync(_lock); + _queue.emplace(std::move(msg)); + } + _cond.notify_one(); } -void Queue::signal() { - vespalib::MonitorGuard sync(_queueMonitor); - sync.unsafeSignalUnlock(); +void +Queue::signal() { + _cond.notify_one(); } -size_t Queue::size() const { - vespalib::MonitorGuard sync(_queueMonitor); +size_t +Queue::size() const { + std::lock_guard guard(_lock); return _queue.size(); } diff --git a/storage/src/vespa/storage/common/storagelink.h b/storage/src/vespa/storage/common/storagelink.h index b61c74b2236..b6460979d81 100644 --- a/storage/src/vespa/storage/common/storagelink.h +++ b/storage/src/vespa/storage/common/storagelink.h @@ -23,9 +23,10 @@ #include <vespa/storageapi/messageapi/messagehandler.h> #include <vespa/storageapi/messageapi/storagemessage.h> #include <vespa/document/util/printable.h> -#include <vespa/vespalib/util/sync.h> #include <atomic> #include <queue> +#include <mutex> +#include <condition_variable> namespace storage { @@ -185,8 +186,9 @@ private: class Queue { private: using QueueType = std::queue<std::shared_ptr<api::StorageMessage>>; - QueueType _queue; - vespalib::Monitor _queueMonitor; + QueueType _queue; + mutable std::mutex _lock; + std::condition_variable _cond; public: Queue(); @@ -199,7 +201,7 @@ public: * (0 = don't wait, -1 = forever) * @return true or false if the queue was empty. */ - bool getNext(std::shared_ptr<api::StorageMessage>& msg, int timeout); + bool getNext(std::shared_ptr<api::StorageMessage>& msg, vespalib::duration timeout); /** * Enqueue msg in FIFO order. diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp index fad99e4e5d3..505828b853e 100644 --- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp +++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.cpp @@ -13,29 +13,26 @@ using document::BucketSpace; namespace storage { -DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, - AppKiller::UP killer) +DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, AppKiller::UP killer) : framework::HtmlStatusReporter("deadlockdetector", "Dead lock detector"), _killer(std::move(killer)), _states(), - _waiter(), + _lock(), + _cond(), _enableWarning(true), _enableShutdown(false), _processSlackMs(30 * 1000), _waitSlackMs(5 * 1000), _reportedBucketDBLocksAtState(OK) { - DistributorComponentRegister* dComp( - dynamic_cast<DistributorComponentRegister*>(&compReg)); + auto* dComp(dynamic_cast<DistributorComponentRegister*>(&compReg)); if (dComp) { - _dComponent.reset(new DistributorComponent(*dComp, "deadlockdetector")); + _dComponent = std::make_unique<DistributorComponent>(*dComp, "deadlockdetector"); _component = _dComponent.get(); } else { - ServiceLayerComponentRegister* slComp( - dynamic_cast<ServiceLayerComponentRegister*>(&compReg)); + auto* slComp(dynamic_cast<ServiceLayerComponentRegister*>(&compReg)); assert(slComp != 0); - _slComponent.reset(new ServiceLayerComponent( - *slComp, "deadlockdetector")); + _slComponent = std::make_unique<ServiceLayerComponent>(*slComp, "deadlockdetector"); _component = _slComponent.get(); } _component->registerStatusPage(*this); @@ -44,8 +41,8 @@ DeadLockDetector::DeadLockDetector(StorageComponentRegister& compReg, DeadLockDetector::~DeadLockDetector() { - if (_thread.get() != 0) { - _thread->interruptAndJoin(&_waiter); + if (_thread) { + _thread->interruptAndJoin(_cond); } } @@ -229,12 +226,12 @@ DeadLockDetector::handleDeadlock(const framework::MilliSecTime& currentTime, void DeadLockDetector::run(framework::ThreadHandle& thread) { - vespalib::MonitorGuard sync(_waiter); + std::unique_lock sync(_lock); while (!thread.interrupted()) { framework::MilliSecTime time(_component->getClock().getTimeInMillis()); ThreadChecker checker(*this, time); visitThreads(checker); - sync.wait(1000); + _cond.wait_for(sync, 1s); thread.registerTick(framework::WAIT_CYCLE); } } @@ -263,8 +260,7 @@ namespace { ~ThreadTable(); }; - ThreadTable::~ThreadTable() { - } + ThreadTable::~ThreadTable() = default; struct ThreadStatusWriter : public DeadLockDetector::ThreadVisitor { ThreadTable& _table; @@ -309,7 +305,7 @@ DeadLockDetector::reportHtmlStatus(std::ostream& os, vespalib::asciistream out; out << "<h2>Overview of latest thread ticks</h2>\n"; ThreadTable threads; - vespalib::MonitorGuard monitor(_waiter); + std::lock_guard guard(_lock); framework::MilliSecTime time(_component->getClock().getTimeInMillis()); ThreadStatusWriter writer(threads, time, getProcessSlack(), getWaitSlack()); visitThreads(writer); diff --git a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h index 04a390c36b8..d438b4ab476 100644 --- a/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h +++ b/storage/src/vespa/storage/frameworkimpl/thread/deadlockdetector.h @@ -17,7 +17,6 @@ #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storageframework/generic/status/htmlstatusreporter.h> #include <vespa/storageframework/generic/thread/threadpool.h> -#include <vespa/vespalib/util/sync.h> #include <map> #include <atomic> @@ -78,7 +77,8 @@ struct DeadLockDetector : private framework::Runnable, private: AppKiller::UP _killer; mutable std::map<vespalib::string, State> _states; - vespalib::Monitor _waiter; + mutable std::mutex _lock; + std::condition_variable _cond; bool _enableWarning; bool _enableShutdown; std::atomic<uint64_t> _processSlackMs; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 8a7cc8ffc61..ad43fddfb2d 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -352,7 +352,7 @@ void CommunicationManager::onClose() std::shared_ptr<api::StorageMessage> msg; api::ReturnCode code(api::ReturnCode::ABORTED, "Node shutting down"); while (_eventQueue.size() > 0) { - assert(_eventQueue.getNext(msg, 0)); + assert(_eventQueue.getNext(msg, 0ms)); if (!msg->getType().isReply()) { std::shared_ptr<api::StorageReply> reply(static_cast<api::StorageCommand&>(*msg).makeReply()); reply->setResult(code); @@ -763,7 +763,7 @@ CommunicationManager::run(framework::ThreadHandle& thread) while (!thread.interrupted()) { thread.registerTick(); std::shared_ptr<api::StorageMessage> msg; - if (_eventQueue.getNext(msg, 100)) { + if (_eventQueue.getNext(msg, 100ms)) { process(msg); } std::lock_guard<std::mutex> guard(_earlierGenerationsLock); diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp index 5ed3f7dc5e6..2a53b1de329 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.cpp +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.cpp @@ -3,14 +3,13 @@ #include "thread.h" #include <vespa/vespalib/util/sync.h> -namespace storage { -namespace framework { +namespace storage::framework { void Thread::interruptAndJoin(vespalib::Monitor* m) { interrupt(); - if (m != 0) { + if (m != nullptr) { vespalib::MonitorGuard monitorGuard(*m); monitorGuard.broadcast(); } @@ -18,15 +17,13 @@ Thread::interruptAndJoin(vespalib::Monitor* m) } void -Thread::interruptAndJoin(std::mutex &m, std::condition_variable &cv) +Thread::interruptAndJoin(std::condition_variable &cv) { interrupt(); { - std::lock_guard<std::mutex> guard(m); cv.notify_all(); } join(); } -} // framework -} // storage +} diff --git a/storageframework/src/vespa/storageframework/generic/thread/thread.h b/storageframework/src/vespa/storageframework/generic/thread/thread.h index 72054ff725a..26c59ce1330 100644 --- a/storageframework/src/vespa/storageframework/generic/thread/thread.h +++ b/storageframework/src/vespa/storageframework/generic/thread/thread.h @@ -14,7 +14,6 @@ #include "runnable.h" #include <vespa/vespalib/stllike/string.h> -#include <mutex> #include <condition_variable> namespace vespalib { @@ -61,7 +60,7 @@ public: */ void interruptAndJoin(vespalib::Monitor* m); - void interruptAndJoin(std::mutex &m, std::condition_variable &cv); + void interruptAndJoin(std::condition_variable &cv); }; } |