summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@oath.com>2017-11-28 13:14:53 +0000
committerTor Egge <Tor.Egge@oath.com>2017-11-28 13:21:08 +0000
commit58e1f3b9b4dde495f6b9554f670367f45aed49ef (patch)
tree77f73cd5069943f344ccdd4a4a4e8f40b1cf8541 /searchcore
parent366549a4805df6e42d97c814216f6693ffc1a7b6 (diff)
Use standard locking in FlushEngine.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp64
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h17
2 files changed, 43 insertions, 38 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index fb25f8cf161..00b9b5c8fd7 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -12,10 +12,10 @@
#include <vespa/log/log.h>
LOG_SETUP(".proton.flushengine.flushengine");
-using vespalib::MonitorGuard;
typedef vespalib::Executor::Task Task;
using searchcorespi::IFlushTarget;
using searchcorespi::FlushStats;
+using namespace std::chrono_literals;
namespace proton {
@@ -71,11 +71,13 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory>
_strategy(strategy),
_priorityStrategy(),
_executor(numThreads, 128 * 1024),
- _monitor(),
+ _lock(),
+ _cond(),
_handlers(),
_flushing(),
+ _setStrategyLock(),
_strategyLock(),
- _strategyMonitor(),
+ _strategyCond(),
_tlsStatsFactory(tlsStatsFactory),
_pendingPrune()
{
@@ -100,10 +102,10 @@ FlushEngine &
FlushEngine::close()
{
{
- MonitorGuard strategyGuard(_strategyMonitor);
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> strategyGuard(_strategyLock);
+ std::lock_guard<std::mutex> guard(_lock);
_closed = true;
- guard.broadcast();
+ _cond.notify_all();
}
_threadPool.Close();
_executor.shutdown();
@@ -120,13 +122,13 @@ FlushEngine::triggerFlush()
void
FlushEngine::kick()
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
LOG(debug, "Kicking flush engine");
- guard.broadcast();
+ _cond.notify_all();
}
bool
-FlushEngine::canFlushMore(const MonitorGuard & guard) const
+FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &guard) const
{
(void) guard;
return _maxConcurrent > _flushing.size();
@@ -135,12 +137,12 @@ FlushEngine::canFlushMore(const MonitorGuard & guard) const
bool
FlushEngine::wait(size_t minimumWaitTimeIfReady)
{
- MonitorGuard guard(_monitor);
+ std::unique_lock<std::mutex> guard(_lock);
if ( (minimumWaitTimeIfReady > 0) && canFlushMore(guard) && _pendingPrune.empty()) {
- guard.wait(minimumWaitTimeIfReady);
+ _cond.wait_for(guard, std::chrono::milliseconds(minimumWaitTimeIfReady));
}
while ( ! canFlushMore(guard) && _pendingPrune.empty()) {
- guard.wait(1000); // broadcast when flush done
+ _cond.wait_for(guard, 1s); // broadcast when flush done
}
return !_closed;
}
@@ -174,7 +176,7 @@ FlushEngine::prune()
{
std::set<IFlushHandler::SP> toPrune;
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
if (_pendingPrune.empty()) {
return false;
}
@@ -187,7 +189,7 @@ FlushEngine::prune()
return true;
}
-bool FlushEngine::isFlushing(const MonitorGuard & guard, const vespalib::string & name) const
+bool FlushEngine::isFlushing(const std::lock_guard<std::mutex> & guard, const vespalib::string & name) const
{
(void) guard;
for(const auto & it : _flushing) {
@@ -203,7 +205,7 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
{
FlushContext::List ret;
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
for (const auto & it : _handlers) {
IFlushHandler & handler(*it.second);
search::SerialNum serial(handler.getCurrentSerialNumber());
@@ -227,7 +229,7 @@ FlushEngine::getTargetList(bool includeFlushingTargets) const
}
std::pair<FlushContext::List,bool>
-FlushEngine::getSortedTargetList(MonitorGuard &strategyGuard) const
+FlushEngine::getSortedTargetList(const std::lock_guard<std::mutex> &strategyGuard) const
{
(void) strategyGuard;
FlushContext::List unsortedTargets = getTargetList(false);
@@ -291,14 +293,14 @@ FlushEngine::flushAll(const FlushContext::List &lst)
vespalib::string
FlushEngine::flushNextTarget(const vespalib::string & name)
{
- MonitorGuard strategyGuard(_strategyMonitor);
+ std::lock_guard<std::mutex> strategyGuard(_strategyLock);
std::pair<FlushContext::List,bool> lst = getSortedTargetList(strategyGuard);
if (lst.second) {
// Everything returned from a priority strategy should be flushed
flushAll(lst.first);
_executor.sync();
_priorityStrategy.reset();
- strategyGuard.broadcast();
+ _strategyCond.notify_all();
return "";
}
if (lst.first.empty()) {
@@ -340,7 +342,7 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
{
fastos::TimeStamp duration;
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
duration = fastos::TimeStamp(fastos::ClockSystem::now()) - _flushing[taskId].getStart();
}
if (LOG_WOULD_LOG(event)) {
@@ -351,20 +353,20 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
stats.getPathElementsToLog());
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, duration.sec());
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
_flushing.erase(taskId);
assert(ctx.getHandler());
if (_handlers.hasHandler(ctx.getHandler())) {
_pendingPrune.insert(ctx.getHandler());
}
- guard.broadcast();
+ _cond.notify_all();
}
IFlushHandler::SP
FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
const IFlushHandler::SP &flushHandler)
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
IFlushHandler::SP result(_handlers.putHandler(docTypeName, flushHandler));
if (result) {
_pendingPrune.erase(result);
@@ -376,14 +378,14 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName,
IFlushHandler::SP
FlushEngine::getFlushHandler(const DocTypeName &docTypeName) const
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
return _handlers.getHandler(docTypeName);
}
IFlushHandler::SP
FlushEngine::removeFlushHandler(const DocTypeName &docTypeName)
{
- MonitorGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
IFlushHandler::SP result(_handlers.removeHandler(docTypeName));
_pendingPrune.erase(result);
return std::move(result);
@@ -393,7 +395,7 @@ FlushEngine::FlushMetaSet
FlushEngine::getCurrentlyFlushingSet() const
{
FlushMetaSet s;
- vespalib::LockGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
for (const auto & it : _flushing) {
s.insert(it.second);
}
@@ -405,7 +407,7 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
{
uint32_t taskId(0);
{
- vespalib::LockGuard guard(_monitor);
+ std::lock_guard<std::mutex> guard(_lock);
taskId = _taskId++;
vespalib::string name(FlushContext::createName(*handler, *target));
FlushInfo flush(taskId, target, name);
@@ -419,19 +421,19 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
void
FlushEngine::setStrategy(IFlushStrategy::SP strategy)
{
- vespalib::LockGuard strategyLock(_strategyLock);
- MonitorGuard strategyGuard(_strategyMonitor);
+ std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock);
+ std::unique_lock<std::mutex> strategyGuard(_strategyLock);
if (_closed) {
return;
}
assert(!_priorityStrategy);
_priorityStrategy = strategy;
{
- MonitorGuard guard(_monitor);
- guard.broadcast();
+ std::lock_guard<std::mutex> guard(_lock);
+ _cond.notify_all();
}
while (_priorityStrategy) {
- strategyGuard.wait();
+ _strategyCond.wait(strategyGuard);
}
}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index 7f4698610c8..7346810c5a5 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -5,10 +5,11 @@
#include "iflushstrategy.h"
#include <vespa/searchcore/proton/common/handlermap.hpp>
#include <vespa/searchcore/proton/common/doctypename.h>
-#include <vespa/vespalib/util/sync.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
#include <vespa/fastos/thread.h>
#include <set>
+#include <mutex>
+#include <condition_variable>
namespace proton {
@@ -53,16 +54,18 @@ private:
IFlushStrategy::SP _strategy;
mutable IFlushStrategy::SP _priorityStrategy;
vespalib::ThreadStackExecutor _executor;
- vespalib::Monitor _monitor;
+ mutable std::mutex _lock;
+ std::condition_variable _cond;
FlushHandlerMap _handlers;
FlushMap _flushing;
- vespalib::Lock _strategyLock; // serialize setStrategy calls
- vespalib::Monitor _strategyMonitor;
+ std::mutex _setStrategyLock; // serialize setStrategy calls
+ std::mutex _strategyLock;
+ std::condition_variable _strategyCond;
std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory;
std::set<IFlushHandler::SP> _pendingPrune;
FlushContext::List getTargetList(bool includeFlushingTargets) const;
- std::pair<FlushContext::List,bool> getSortedTargetList(vespalib::MonitorGuard &strategyGuard) const;
+ std::pair<FlushContext::List,bool> getSortedTargetList(const std::lock_guard<std::mutex> &strategyGuard) const;
FlushContext::SP initNextFlush(const FlushContext::List &lst);
vespalib::string flushNextTarget(const vespalib::string & name);
void flushAll(const FlushContext::List &lst);
@@ -70,9 +73,9 @@ private:
uint32_t initFlush(const FlushContext &ctx);
uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target);
void flushDone(const FlushContext &ctx, uint32_t taskId);
- bool canFlushMore(const vespalib::MonitorGuard & guard) const;
+ bool canFlushMore(const std::unique_lock<std::mutex> &guard) const;
bool wait(size_t minimumWaitTimeIfReady);
- bool isFlushing(const vespalib::MonitorGuard & guard, const vespalib::string & name) const;
+ bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const;
friend class FlushTask;
friend class FlushEngineExplorer;