diff options
author | Tor Egge <Tor.Egge@oath.com> | 2017-11-28 13:14:53 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2017-11-28 13:21:08 +0000 |
commit | 58e1f3b9b4dde495f6b9554f670367f45aed49ef (patch) | |
tree | 77f73cd5069943f344ccdd4a4a4e8f40b1cf8541 /searchcore | |
parent | 366549a4805df6e42d97c814216f6693ffc1a7b6 (diff) |
Use standard locking in FlushEngine.
Diffstat (limited to 'searchcore')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp | 64 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h | 17 |
2 files changed, 43 insertions, 38 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index fb25f8cf161..00b9b5c8fd7 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -12,10 +12,10 @@ #include <vespa/log/log.h> LOG_SETUP(".proton.flushengine.flushengine"); -using vespalib::MonitorGuard; typedef vespalib::Executor::Task Task; using searchcorespi::IFlushTarget; using searchcorespi::FlushStats; +using namespace std::chrono_literals; namespace proton { @@ -71,11 +71,13 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> _strategy(strategy), _priorityStrategy(), _executor(numThreads, 128 * 1024), - _monitor(), + _lock(), + _cond(), _handlers(), _flushing(), + _setStrategyLock(), _strategyLock(), - _strategyMonitor(), + _strategyCond(), _tlsStatsFactory(tlsStatsFactory), _pendingPrune() { @@ -100,10 +102,10 @@ FlushEngine & FlushEngine::close() { { - MonitorGuard strategyGuard(_strategyMonitor); - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); + std::lock_guard<std::mutex> guard(_lock); _closed = true; - guard.broadcast(); + _cond.notify_all(); } _threadPool.Close(); _executor.shutdown(); @@ -120,13 +122,13 @@ FlushEngine::triggerFlush() void FlushEngine::kick() { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); LOG(debug, "Kicking flush engine"); - guard.broadcast(); + _cond.notify_all(); } bool -FlushEngine::canFlushMore(const MonitorGuard & guard) const +FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &guard) const { (void) guard; return _maxConcurrent > _flushing.size(); @@ -135,12 +137,12 @@ FlushEngine::canFlushMore(const MonitorGuard & guard) const bool FlushEngine::wait(size_t minimumWaitTimeIfReady) { - MonitorGuard guard(_monitor); + std::unique_lock<std::mutex> guard(_lock); if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard) && _pendingPrune.empty()) { - guard.wait(minimumWaitTimeIfReady); + _cond.wait_for(guard, std::chrono::milliseconds(minimumWaitTimeIfReady)); } while ( ! canFlushMore(guard) && _pendingPrune.empty()) { - guard.wait(1000); // broadcast when flush done + _cond.wait_for(guard, 1s); // broadcast when flush done } return !_closed; } @@ -174,7 +176,7 @@ FlushEngine::prune() { std::set<IFlushHandler::SP> toPrune; { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); if (_pendingPrune.empty()) { return false; } @@ -187,7 +189,7 @@ FlushEngine::prune() return true; } -bool FlushEngine::isFlushing(const MonitorGuard & guard, const vespalib::string & name) const +bool FlushEngine::isFlushing(const std::lock_guard<std::mutex> & guard, const vespalib::string & name) const { (void) guard; for(const auto & it : _flushing) { @@ -203,7 +205,7 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const { FlushContext::List ret; { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); for (const auto & it : _handlers) { IFlushHandler & handler(*it.second); search::SerialNum serial(handler.getCurrentSerialNumber()); @@ -227,7 +229,7 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const } std::pair<FlushContext::List,bool> -FlushEngine::getSortedTargetList(MonitorGuard &strategyGuard) const +FlushEngine::getSortedTargetList(const std::lock_guard<std::mutex> &strategyGuard) const { (void) strategyGuard; FlushContext::List unsortedTargets = getTargetList(false); @@ -291,14 +293,14 @@ FlushEngine::flushAll(const FlushContext::List &lst) vespalib::string FlushEngine::flushNextTarget(const vespalib::string & name) { - MonitorGuard strategyGuard(_strategyMonitor); + std::lock_guard<std::mutex> strategyGuard(_strategyLock); std::pair<FlushContext::List,bool> lst = getSortedTargetList(strategyGuard); if (lst.second) { // Everything returned from a priority strategy should be flushed flushAll(lst.first); _executor.sync(); _priorityStrategy.reset(); - strategyGuard.broadcast(); + _strategyCond.notify_all(); return ""; } if (lst.first.empty()) { @@ -340,7 +342,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) { fastos::TimeStamp duration; { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); duration = fastos::TimeStamp(fastos::ClockSystem::now()) - _flushing[taskId].getStart(); } if (LOG_WOULD_LOG(event)) { @@ -351,20 +353,20 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) stats.getPathElementsToLog()); } LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec()); - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); _flushing.erase(taskId); assert(ctx.getHandler()); if (_handlers.hasHandler(ctx.getHandler())) { _pendingPrune.insert(ctx.getHandler()); } - guard.broadcast(); + _cond.notify_all(); } IFlushHandler::SP FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler::SP &flushHandler) { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler)); if (result) { _pendingPrune.erase(result); @@ -376,14 +378,14 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, IFlushHandler::SP FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); return _handlers.getHandler(docTypeName); } IFlushHandler::SP FlushEngine::removeFlushHandler(const DocTypeName &docTypeName) { - MonitorGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); IFlushHandler::SP result(_handlers.removeHandler(docTypeName)); _pendingPrune.erase(result); return std::move(result); @@ -393,7 +395,7 @@ FlushEngine::FlushMetaSet FlushEngine::getCurrentlyFlushingSet() const { FlushMetaSet s; - vespalib::LockGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); for (const auto & it : _flushing) { s.insert(it.second); } @@ -405,7 +407,7 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP { uint32_t taskId(0); { - vespalib::LockGuard guard(_monitor); + std::lock_guard<std::mutex> guard(_lock); taskId = _taskId++; vespalib::string name(FlushContext::createName(*handler, *target)); FlushInfo flush(taskId, target, name); @@ -419,19 +421,19 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP void FlushEngine::setStrategy(IFlushStrategy::SP strategy) { - vespalib::LockGuard strategyLock(_strategyLock); - MonitorGuard strategyGuard(_strategyMonitor); + std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock); + std::unique_lock<std::mutex> strategyGuard(_strategyLock); if (_closed) { return; } assert(!_priorityStrategy); _priorityStrategy = strategy; { - MonitorGuard guard(_monitor); - guard.broadcast(); + std::lock_guard<std::mutex> guard(_lock); + _cond.notify_all(); } while (_priorityStrategy) { - strategyGuard.wait(); + _strategyCond.wait(strategyGuard); } } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 7f4698610c8..7346810c5a5 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -5,10 +5,11 @@ #include "iflushstrategy.h" #include <vespa/searchcore/proton/common/handlermap.hpp> #include <vespa/searchcore/proton/common/doctypename.h> -#include <vespa/vespalib/util/sync.h> #include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/fastos/thread.h> #include <set> +#include <mutex> +#include <condition_variable> namespace proton { @@ -53,16 +54,18 @@ private: IFlushStrategy::SP _strategy; mutable IFlushStrategy::SP _priorityStrategy; vespalib::ThreadStackExecutor _executor; - vespalib::Monitor _monitor; + mutable std::mutex _lock; + std::condition_variable _cond; FlushHandlerMap _handlers; FlushMap _flushing; - vespalib::Lock _strategyLock; // serialize setStrategy calls - vespalib::Monitor _strategyMonitor; + std::mutex _setStrategyLock; // serialize setStrategy calls + std::mutex _strategyLock; + std::condition_variable _strategyCond; std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory; std::set<IFlushHandler::SP> _pendingPrune; FlushContext::List getTargetList(bool includeFlushingTargets) const; - std::pair<FlushContext::List,bool> getSortedTargetList(vespalib::MonitorGuard &strategyGuard) const; + std::pair<FlushContext::List,bool> getSortedTargetList(const std::lock_guard<std::mutex> &strategyGuard) const; FlushContext::SP initNextFlush(const FlushContext::List &lst); vespalib::string flushNextTarget(const vespalib::string & name); void flushAll(const FlushContext::List &lst); @@ -70,9 +73,9 @@ private: uint32_t initFlush(const FlushContext &ctx); uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); void flushDone(const FlushContext &ctx, uint32_t taskId); - bool canFlushMore(const vespalib::MonitorGuard & guard) const; + bool canFlushMore(const std::unique_lock<std::mutex> &guard) const; bool wait(size_t minimumWaitTimeIfReady); - bool isFlushing(const vespalib::MonitorGuard & guard, const vespalib::string & name) const; + bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const; friend class FlushTask; friend class FlushEngineExplorer; |