diff options
author | Tor Egge <Tor.Egge@online.no> | 2023-10-19 14:29:51 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2023-10-19 14:29:51 +0200 |
commit | 68c86b598ecec0ee8a912ecc896f8c86dcfcf1f4 (patch) | |
tree | 6de78e8f42f6909faa5406bf0fe74a6039fb4579 /searchcore/src | |
parent | 5e02a90eb9a74228e1dcc6728cbb3b2c730c3486 (diff) |
Avoid blocking flush engine thread due to priority flush.
Diffstat (limited to 'searchcore/src')
5 files changed, 103 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt index 64cbde68416..bcf9848ff66 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt @@ -13,6 +13,7 @@ vespa_add_library(searchcore_flushengine STATIC flushtargetproxy.cpp flushtask.cpp prepare_restart_flush_strategy.cpp + priority_flush_token.cpp threadedflushtarget.cpp tls_stats_factory.cpp tls_stats_map.cpp diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index fc08d4d8a17..82862ef8318 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -66,16 +66,18 @@ FlushEngine::FlushMeta::~FlushMeta() = default; FlushEngine::FlushInfo::FlushInfo() : FlushMeta("", "", 0), - _target() + _target(), + _priority_flush_token() { } FlushEngine::FlushInfo::~FlushInfo() = default; -FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target) +FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target, std::shared_ptr<PriorityFlushToken> priority_flush_token) : FlushMeta(handler_name, target->getName(), taskId), - _target(target) + _target(target), + _priority_flush_token(std::move(priority_flush_token)) { } @@ -89,6 +91,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats _has_thread(false), _strategy(std::move(strategy)), _priorityStrategy(), + _priority_flush_token(), _executor(maxConcurrentTotal(), CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)), _lock(), _cond(), @@ -249,7 +252,7 @@ createName(const IFlushHandler &handler, const vespalib::string &targetName) bool FlushEngine::prune() { - std::set<IFlushHandler::SP> toPrune; + PendingPrunes toPrune; { std::lock_guard<std::mutex> guard(_lock); if (_pendingPrune.empty()) { @@ -257,7 +260,8 @@ FlushEngine::prune() } _pendingPrune.swap(toPrune); } - for (const auto &handler : toPrune) { + for (const auto& kv : toPrune) { + const auto& handler = kv.first; IFlushTarget::List lst = handler->getFlushTargets(); auto oldestFlushed = findOldestFlushedTarget(lst, *handler); if (LOG_WOULD_LOG(event)) { @@ -368,21 +372,21 @@ FlushEngine::initNextFlush(const FlushContext::List &lst) void FlushEngine::flushAll(const FlushContext::List &lst) { + mark_currently_flushing_tasks(_priority_flush_token); LOG(debug, "%ld targets to flush.", lst.size()); for (const FlushContext::SP & ctx : lst) { if (wait_for_slot(IFlushTarget::Priority::NORMAL)) { if (ctx->initFlush(get_flush_token(*ctx))) { logTarget("initiated", *ctx); - _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, _priority_flush_token), *this, ctx)); } else { logTarget("failed to initiate", *ctx); } } } - _executor.sync(); - prune(); std::lock_guard<std::mutex> strategyGuard(_strategyLock); _priorityStrategy.reset(); + _priority_flush_token.reset(); _strategyCond.notify_all(); } @@ -404,19 +408,19 @@ FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext:: name.c_str(), contexts.size()); std::this_thread::sleep_for(100ms); } - _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); + _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, {}), *this, ctx)); return ctx->getName(); } uint32_t -FlushEngine::initFlush(const FlushContext &ctx) +FlushEngine::initFlush(const FlushContext &ctx, std::shared_ptr<PriorityFlushToken> priority_flush_token) { if (LOG_WOULD_LOG(event)) { IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain()); EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(), ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber()); } - return initFlush(ctx.getHandler(), ctx.getTarget()); + return initFlush(ctx.getHandler(), ctx.getTarget(), std::move(priority_flush_token)); } void @@ -434,10 +438,25 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId) } LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, vespalib::to_s(duration)); std::lock_guard<std::mutex> guard(_lock); - _flushing.erase(taskId); + /* + * Hand over any priority flush token for completed flush to + * _pendingPrune, to ensure that setStrategy will wait util + * flush engine has called prune(). + */ + std::shared_ptr<PriorityFlushToken> priority_flush_token; + { + auto itr = _flushing.find(taskId); + if (itr != _flushing.end()) { + priority_flush_token = std::move(itr->second._priority_flush_token); + _flushing.erase(itr); + } + } assert(ctx.getHandler()); if (_handlers.hasHandler(ctx.getHandler())) { - _pendingPrune.insert(ctx.getHandler()); + auto ins_res = _pendingPrune.emplace(ctx.getHandler(), PendingPrunes::mapped_type()); + if (priority_flush_token) { + ins_res.first->second = std::move(priority_flush_token); + } } _cond.notify_all(); } @@ -450,7 +469,7 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler if (result) { _pendingPrune.erase(result); } - _pendingPrune.insert(flushHandler); + _pendingPrune.emplace(flushHandler, PendingPrunes::mapped_type()); return result; } @@ -475,13 +494,13 @@ FlushEngine::getCurrentlyFlushingSet() const } uint32_t -FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target) +FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token) { uint32_t taskId; { std::lock_guard<std::mutex> guard(_lock); taskId = _taskId++; - FlushInfo flush(taskId, handler->getName(), target); + FlushInfo flush(taskId, handler->getName(), target, std::move(priority_flush_token)); _flushing[taskId] = flush; } LOG(debug, "FlushEngine::initFlush(handler='%s', target='%s') => taskId='%d'", @@ -492,6 +511,9 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP void FlushEngine::setStrategy(IFlushStrategy::SP strategy) { + std::promise<void> promise; + auto future = promise.get_future(); + auto priority_flush_token = std::make_shared<PriorityFlushToken>(std::move(promise)); std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock); std::unique_lock<std::mutex> strategyGuard(_strategyLock); if (_closed.load(std::memory_order_relaxed)) { @@ -499,6 +521,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) } assert(!_priorityStrategy); _priorityStrategy = std::move(strategy); + _priority_flush_token = std::move(priority_flush_token); { std::lock_guard<std::mutex> guard(_lock); _cond.notify_all(); @@ -506,6 +529,22 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy) while (_priorityStrategy) { _strategyCond.wait(strategyGuard); } + strategyGuard.unlock(); + /* + * Wait for flushes started before the strategy change, for + * flushes initiated by the strategy, and for flush engine to call + * prune() afterwards. + */ + future.wait(); +} + +void +FlushEngine::mark_currently_flushing_tasks(std::shared_ptr<PriorityFlushToken> priority_flush_token) +{ + std::lock_guard<std::mutex> guard(_lock); + for (auto& kv : _flushing) { + kv.second._priority_flush_token = priority_flush_token; + } } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index ec0f019f6e4..302c4a2499e 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -3,6 +3,7 @@ #include "flushcontext.h" #include "iflushstrategy.h" +#include "priority_flush_token.h" #include <vespa/searchcore/proton/common/handlermap.hpp> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/vespalib/util/threadstackexecutor.h> @@ -43,13 +44,15 @@ private: struct FlushInfo : public FlushMeta { FlushInfo(); - FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP &target); + FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token); ~FlushInfo(); IFlushTarget::SP _target; + std::shared_ptr<PriorityFlushToken> _priority_flush_token; }; using FlushMap = std::map<uint32_t, FlushInfo>; using FlushHandlerMap = HandlerMap<IFlushHandler>; + using PendingPrunes = std::map<std::shared_ptr<IFlushHandler>, std::shared_ptr<PriorityFlushToken>>; std::atomic<bool> _closed; const uint32_t _maxConcurrentNormal; const vespalib::duration _idleInterval; @@ -58,6 +61,7 @@ private: std::atomic<bool> _has_thread; IFlushStrategy::SP _strategy; mutable IFlushStrategy::SP _priorityStrategy; + mutable std::shared_ptr<PriorityFlushToken> _priority_flush_token; vespalib::ThreadStackExecutor _executor; mutable std::mutex _lock; std::condition_variable _cond; @@ -67,7 +71,7 @@ private: std::mutex _strategyLock; std::condition_variable _strategyCond; std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory; - std::set<IFlushHandler::SP> _pendingPrune; + PendingPrunes _pendingPrune; std::shared_ptr<search::FlushToken> _normal_flush_token; std::shared_ptr<search::FlushToken> _gc_flush_token; @@ -78,8 +82,8 @@ private: vespalib::string flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts); void flushAll(const FlushContext::List &lst); bool prune(); - uint32_t initFlush(const FlushContext &ctx); - uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); + uint32_t initFlush(const FlushContext &ctx, std::shared_ptr<PriorityFlushToken> priority_flush_token); + uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token); void flushDone(const FlushContext &ctx, uint32_t taskId); bool canFlushMore(const std::unique_lock<std::mutex> &guard, IFlushTarget::Priority priority) const; void wait_for_slot_or_pending_prune(IFlushTarget::Priority priority); @@ -179,6 +183,7 @@ public: FlushMetaSet getCurrentlyFlushingSet() const; void setStrategy(IFlushStrategy::SP strategy); + void mark_currently_flushing_tasks(std::shared_ptr<PriorityFlushToken> priority_flush_token); uint32_t maxConcurrentTotal() const { return _maxConcurrentNormal + 1; } uint32_t maxConcurrentNormal() const { return _maxConcurrentNormal; } }; diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp new file mode 100644 index 00000000000..f031c19d1d8 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp @@ -0,0 +1,17 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "priority_flush_token.h" + +namespace proton { + +PriorityFlushToken::PriorityFlushToken(std::promise<void> promise) + : _promise(std::move(promise)) +{ +} + +PriorityFlushToken::~PriorityFlushToken() +{ + _promise.set_value(); +} + +} diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h new file mode 100644 index 00000000000..82048b3eb3f --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h @@ -0,0 +1,21 @@ +// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/util/idestructorcallback.h> +#include <future> + +namespace proton { + +/* + * This token is shared between flushes initiated from a priority flush + * strategy (cf. Proton::triggerFLush and Proton::prepareRestart). + */ +class PriorityFlushToken : public vespalib::IDestructorCallback { + std::promise<void> _promise; +public: + PriorityFlushToken(std::promise<void> promise); + ~PriorityFlushToken() override; +}; + +} |