aboutsummaryrefslogtreecommitdiffstats
path: root/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp')
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp73
1 files changed, 56 insertions, 17 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 1916a324c6e..768800ee781 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -1,4 +1,4 @@
-// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "flushengine.h"
#include "active_flush_stats.h"
@@ -66,16 +66,18 @@ FlushEngine::FlushMeta::~FlushMeta() = default;
FlushEngine::FlushInfo::FlushInfo()
: FlushMeta("", "", 0),
- _target()
+ _target(),
+ _priority_flush_token()
{
}
FlushEngine::FlushInfo::~FlushInfo() = default;
-FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target)
+FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target, std::shared_ptr<PriorityFlushToken> priority_flush_token)
: FlushMeta(handler_name, target->getName(), taskId),
- _target(target)
+ _target(target),
+ _priority_flush_token(std::move(priority_flush_token))
{
}
@@ -89,6 +91,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_has_thread(false),
_strategy(std::move(strategy)),
_priorityStrategy(),
+ _priority_flush_token(),
_executor(maxConcurrentTotal(), CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)),
_lock(),
_cond(),
@@ -249,7 +252,7 @@ createName(const IFlushHandler &handler, const vespalib::string &targetName)
bool
FlushEngine::prune()
{
- std::set<IFlushHandler::SP> toPrune;
+ PendingPrunes toPrune;
{
std::lock_guard<std::mutex> guard(_lock);
if (_pendingPrune.empty()) {
@@ -257,7 +260,8 @@ FlushEngine::prune()
}
_pendingPrune.swap(toPrune);
}
- for (const auto &handler : toPrune) {
+ for (const auto& kv : toPrune) {
+ const auto& handler = kv.first;
IFlushTarget::List lst = handler->getFlushTargets();
auto oldestFlushed = findOldestFlushedTarget(lst, *handler);
if (LOG_WOULD_LOG(event)) {
@@ -368,21 +372,21 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
void
FlushEngine::flushAll(const FlushContext::List &lst)
{
+ mark_currently_flushing_tasks(_priority_flush_token);
LOG(debug, "%ld targets to flush.", lst.size());
for (const FlushContext::SP & ctx : lst) {
if (wait_for_slot(IFlushTarget::Priority::NORMAL)) {
if (ctx->initFlush(get_flush_token(*ctx))) {
logTarget("initiated", *ctx);
- _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, _priority_flush_token), *this, ctx));
} else {
logTarget("failed to initiate", *ctx);
}
}
}
- _executor.sync();
- prune();
std::lock_guard<std::mutex> strategyGuard(_strategyLock);
_priorityStrategy.reset();
+ _priority_flush_token.reset();
_strategyCond.notify_all();
}
@@ -404,19 +408,19 @@ FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext::
name.c_str(), contexts.size());
std::this_thread::sleep_for(100ms);
}
- _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, {}), *this, ctx));
return ctx->getName();
}
uint32_t
-FlushEngine::initFlush(const FlushContext &ctx)
+FlushEngine::initFlush(const FlushContext &ctx, std::shared_ptr<PriorityFlushToken> priority_flush_token)
{
if (LOG_WOULD_LOG(event)) {
IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain());
EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(),
ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber());
}
- return initFlush(ctx.getHandler(), ctx.getTarget());
+ return initFlush(ctx.getHandler(), ctx.getTarget(), std::move(priority_flush_token));
}
void
@@ -434,10 +438,25 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, vespalib::to_s(duration));
std::lock_guard<std::mutex> guard(_lock);
- _flushing.erase(taskId);
+ /*
+ * Hand over any priority flush token for completed flush to
+ * _pendingPrune, to ensure that setStrategy will wait until
+ * flush engine has called prune().
+ */
+ std::shared_ptr<PriorityFlushToken> priority_flush_token;
+ {
+ auto itr = _flushing.find(taskId);
+ if (itr != _flushing.end()) {
+ priority_flush_token = std::move(itr->second._priority_flush_token);
+ _flushing.erase(itr);
+ }
+ }
assert(ctx.getHandler());
if (_handlers.hasHandler(ctx.getHandler())) {
- _pendingPrune.insert(ctx.getHandler());
+ auto ins_res = _pendingPrune.emplace(ctx.getHandler(), PendingPrunes::mapped_type());
+ if (priority_flush_token) {
+ ins_res.first->second = std::move(priority_flush_token);
+ }
}
_cond.notify_all();
}
@@ -450,7 +469,7 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler
if (result) {
_pendingPrune.erase(result);
}
- _pendingPrune.insert(flushHandler);
+ _pendingPrune.emplace(flushHandler, PendingPrunes::mapped_type());
return result;
}
@@ -475,13 +494,13 @@ FlushEngine::getCurrentlyFlushingSet() const
}
uint32_t
-FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target)
+FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token)
{
uint32_t taskId;
{
std::lock_guard<std::mutex> guard(_lock);
taskId = _taskId++;
- FlushInfo flush(taskId, handler->getName(), target);
+ FlushInfo flush(taskId, handler->getName(), target, std::move(priority_flush_token));
_flushing[taskId] = flush;
}
LOG(debug, "FlushEngine::initFlush(handler='%s', target='%s') => taskId='%d'",
@@ -492,6 +511,9 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
void
FlushEngine::setStrategy(IFlushStrategy::SP strategy)
{
+ std::promise<void> promise;
+ auto future = promise.get_future();
+ auto priority_flush_token = std::make_shared<PriorityFlushToken>(std::move(promise));
std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock);
std::unique_lock<std::mutex> strategyGuard(_strategyLock);
if (_closed.load(std::memory_order_relaxed)) {
@@ -499,6 +521,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy)
}
assert(!_priorityStrategy);
_priorityStrategy = std::move(strategy);
+ _priority_flush_token = std::move(priority_flush_token);
{
std::lock_guard<std::mutex> guard(_lock);
_cond.notify_all();
@@ -506,6 +529,22 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy)
while (_priorityStrategy) {
_strategyCond.wait(strategyGuard);
}
+ strategyGuard.unlock();
+ /*
+ * Wait for flushes started before the strategy change, for
+ * flushes initiated by the strategy, and for flush engine to call
+ * prune() afterwards.
+ */
+ future.wait();
+}
+
+void
+FlushEngine::mark_currently_flushing_tasks(std::shared_ptr<PriorityFlushToken> priority_flush_token)
+{
+ std::lock_guard<std::mutex> guard(_lock);
+ for (auto& kv : _flushing) {
+ kv.second._priority_flush_token = priority_flush_token;
+ }
}
} // namespace proton