diff options
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp | 73 |
1 files changed, 56 insertions, 17 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 1916a324c6e..768800ee781 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -1,4 +1,4 @@ -// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "flushengine.h" #include "active_flush_stats.h" @@ -66,16 +66,18 @@ FlushEngine::FlushMeta::~FlushMeta() = default; FlushEngine::FlushInfo::FlushInfo() : FlushMeta("", "", 0), - _target() + _target(), + _priority_flush_token() { } FlushEngine::FlushInfo::~FlushInfo() = default; -FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target) +FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target, std::shared_ptr<PriorityFlushToken> priority_flush_token) : FlushMeta(handler_name, target->getName(), taskId), - _target(target) + _target(target), + _priority_flush_token(std::move(priority_flush_token)) { } @@ -89,6 +91,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats _has_thread(false), _strategy(std::move(strategy)), _priorityStrategy(), + _priority_flush_token(), _executor(maxConcurrentTotal(), CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)), _lock(), _cond(), @@ -249,7 +252,7 @@ createName(const IFlushHandler &handler, const vespalib::string &targetName) bool FlushEngine::prune() { - std::set<IFlushHandler::SP> toPrune; + PendingPrunes toPrune; { std::lock_guard<std::mutex> guard(_lock); if (_pendingPrune.empty()) { @@ -257,7 +260,8 @@ FlushEngine::prune() } _pendingPrune.swap(toPrune); } - for (const auto &handler : toPrune) { + for (const auto& kv : toPrune) { + const auto& handler = kv.first; IFlushTarget::List lst = handler->getFlushTargets(); auto oldestFlushed = findOldestFlushedTarget(lst, *handler); if (LOG_WOULD_LOG(event)) { @@ -368,21 +372,21 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) void FlushEngine::flushAll(const FlushContext::List &lst) { + mark_currently_flushing_tasks(_priority_flush_token); LOG(debug, "%ld targets to flush.", lst.size()); for (const FlushContext::SP & ctx : lst) { if (wait_for_slot(IFlushTarget::Priority::NORMAL)) { if (ctx->initFlush(get_flush_token(*ctx))) { logTarget("initiated", *ctx); - _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, _priority_flush_token), *this, ctx)); } else { logTarget("failed to initiate", *ctx); } } } - _executor.sync(); - prune(); std::lock_guard<std::mutex> strategyGuard(_strategyLock); _priorityStrategy.reset(); + _priority_flush_token.reset(); _strategyCond.notify_all(); } @@ -404,19 +408,19 @@ FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext:: name.c_str(), contexts.size()); std::this_thread::sleep_for(100ms); } - _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, {}), *this, ctx)); return ctx->getName(); } uint32_t -FlushEngine::initFlush(const FlushContext &ctx) +FlushEngine::initFlush(const FlushContext &ctx, std::shared_ptr<PriorityFlushToken> priority_flush_token) { if (LOG_WOULD_LOG(event)) { IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain()); EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(), ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber()); } - return initFlush(ctx.getHandler(), ctx.getTarget()); + return initFlush(ctx.getHandler(), ctx.getTarget(), std::move(priority_flush_token)); } void @@ -434,10 +438,25 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) } LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, vespalib::to_s(duration)); std::lock_guard<std::mutex> guard(_lock); - _flushing.erase(taskId); + /* + * Hand over any priority flush token for completed flush to + * _pendingPrune, to ensure that setStrategy will wait until + * flush engine has called prune(). + */ + std::shared_ptr<PriorityFlushToken> priority_flush_token; + { + auto itr = _flushing.find(taskId); + if (itr != _flushing.end()) { + priority_flush_token = std::move(itr->second._priority_flush_token); + _flushing.erase(itr); + } + } assert(ctx.getHandler()); if (_handlers.hasHandler(ctx.getHandler())) { - _pendingPrune.insert(ctx.getHandler()); + auto ins_res = _pendingPrune.emplace(ctx.getHandler(), PendingPrunes::mapped_type()); + if (priority_flush_token) { + ins_res.first->second = std::move(priority_flush_token); + } } _cond.notify_all(); } @@ -450,7 +469,7 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler if (result) { _pendingPrune.erase(result); } - _pendingPrune.insert(flushHandler); + _pendingPrune.emplace(flushHandler, PendingPrunes::mapped_type()); return result; } @@ -475,13 +494,13 @@ FlushEngine::getCurrentlyFlushingSet() const } uint32_t -FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target) +FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token) { uint32_t taskId; { std::lock_guard<std::mutex> guard(_lock); taskId = _taskId++; - FlushInfo flush(taskId, handler->getName(), target); + FlushInfo flush(taskId, handler->getName(), target, std::move(priority_flush_token)); _flushing[taskId] = flush; } LOG(debug, "FlushEngine::initFlush(handler='%s', target='%s') => taskId='%d'", @@ -492,6 +511,9 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP void FlushEngine::setStrategy(IFlushStrategy::SP strategy) { + std::promise<void> promise; + auto future = promise.get_future(); + auto priority_flush_token = std::make_shared<PriorityFlushToken>(std::move(promise)); std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock); std::unique_lock<std::mutex> strategyGuard(_strategyLock); if (_closed.load(std::memory_order_relaxed)) { @@ -499,6 +521,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) } assert(!_priorityStrategy); _priorityStrategy = std::move(strategy); + _priority_flush_token = std::move(priority_flush_token); { std::lock_guard<std::mutex> guard(_lock); _cond.notify_all(); @@ -506,6 +529,22 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) while (_priorityStrategy) { _strategyCond.wait(strategyGuard); } + strategyGuard.unlock(); + /* + * Wait for flushes started before the strategy change, for + * flushes initiated by the strategy, and for flush engine to call + * prune() afterwards. + */ + future.wait(); +} + +void +FlushEngine::mark_currently_flushing_tasks(std::shared_ptr<PriorityFlushToken> priority_flush_token) +{ + std::lock_guard<std::mutex> guard(_lock); + for (auto& kv : _flushing) { + kv.second._priority_flush_token = priority_flush_token; + } } } // namespace proton |