summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2023-10-19 14:29:51 +0200
committerTor Egge <Tor.Egge@online.no>2023-10-19 14:29:51 +0200
commit68c86b598ecec0ee8a912ecc896f8c86dcfcf1f4 (patch)
tree6de78e8f42f6909faa5406bf0fe74a6039fb4579 /searchcore/src
parent5e02a90eb9a74228e1dcc6728cbb3b2c730c3486 (diff)
Avoid blocking flush engine thread due to priority flush.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt1
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp71
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h13
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h21
5 files changed, 103 insertions, 20 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt
index 64cbde68416..bcf9848ff66 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/CMakeLists.txt
@@ -13,6 +13,7 @@ vespa_add_library(searchcore_flushengine STATIC
flushtargetproxy.cpp
flushtask.cpp
prepare_restart_flush_strategy.cpp
+ priority_flush_token.cpp
threadedflushtarget.cpp
tls_stats_factory.cpp
tls_stats_map.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index fc08d4d8a17..82862ef8318 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -66,16 +66,18 @@ FlushEngine::FlushMeta::~FlushMeta() = default;
FlushEngine::FlushInfo::FlushInfo()
: FlushMeta("", "", 0),
- _target()
+ _target(),
+ _priority_flush_token()
{
}
FlushEngine::FlushInfo::~FlushInfo() = default;
-FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target)
+FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP& target, std::shared_ptr<PriorityFlushToken> priority_flush_token)
: FlushMeta(handler_name, target->getName(), taskId),
- _target(target)
+ _target(target),
+ _priority_flush_token(std::move(priority_flush_token))
{
}
@@ -89,6 +91,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_has_thread(false),
_strategy(std::move(strategy)),
_priorityStrategy(),
+ _priority_flush_token(),
_executor(maxConcurrentTotal(), CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)),
_lock(),
_cond(),
@@ -249,7 +252,7 @@ createName(const IFlushHandler &handler, const vespalib::string &targetName)
bool
FlushEngine::prune()
{
- std::set<IFlushHandler::SP> toPrune;
+ PendingPrunes toPrune;
{
std::lock_guard<std::mutex> guard(_lock);
if (_pendingPrune.empty()) {
@@ -257,7 +260,8 @@ FlushEngine::prune()
}
_pendingPrune.swap(toPrune);
}
- for (const auto &handler : toPrune) {
+ for (const auto& kv : toPrune) {
+ const auto& handler = kv.first;
IFlushTarget::List lst = handler->getFlushTargets();
auto oldestFlushed = findOldestFlushedTarget(lst, *handler);
if (LOG_WOULD_LOG(event)) {
@@ -368,21 +372,21 @@ FlushEngine::initNextFlush(const FlushContext::List &lst)
void
FlushEngine::flushAll(const FlushContext::List &lst)
{
+ mark_currently_flushing_tasks(_priority_flush_token);
LOG(debug, "%ld targets to flush.", lst.size());
for (const FlushContext::SP & ctx : lst) {
if (wait_for_slot(IFlushTarget::Priority::NORMAL)) {
if (ctx->initFlush(get_flush_token(*ctx))) {
logTarget("initiated", *ctx);
- _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, _priority_flush_token), *this, ctx));
} else {
logTarget("failed to initiate", *ctx);
}
}
}
- _executor.sync();
- prune();
std::lock_guard<std::mutex> strategyGuard(_strategyLock);
_priorityStrategy.reset();
+ _priority_flush_token.reset();
_strategyCond.notify_all();
}
@@ -404,19 +408,19 @@ FlushEngine::flushNextTarget(const vespalib::string & name, const FlushContext::
name.c_str(), contexts.size());
std::this_thread::sleep_for(100ms);
}
- _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx));
+ _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx, {}), *this, ctx));
return ctx->getName();
}
uint32_t
-FlushEngine::initFlush(const FlushContext &ctx)
+FlushEngine::initFlush(const FlushContext &ctx, std::shared_ptr<PriorityFlushToken> priority_flush_token)
{
if (LOG_WOULD_LOG(event)) {
IFlushTarget::MemoryGain mgain(ctx.getTarget()->getApproxMemoryGain());
EventLogger::flushStart(ctx.getName(), mgain.getBefore(), mgain.getAfter(), mgain.gain(),
ctx.getTarget()->getFlushedSerialNum() + 1, ctx.getHandler()->getCurrentSerialNumber());
}
- return initFlush(ctx.getHandler(), ctx.getTarget());
+ return initFlush(ctx.getHandler(), ctx.getTarget(), std::move(priority_flush_token));
}
void
@@ -434,10 +438,25 @@ FlushEngine::flushDone(const FlushContext &ctx, uint32_t taskId)
}
LOG(debug, "FlushEngine::flushDone(taskId='%d') took '%f' secs", taskId, vespalib::to_s(duration));
std::lock_guard<std::mutex> guard(_lock);
- _flushing.erase(taskId);
+ /*
+ * Hand over any priority flush token for completed flush to
+ * _pendingPrune, to ensure that setStrategy will wait util
+ * flush engine has called prune().
+ */
+ std::shared_ptr<PriorityFlushToken> priority_flush_token;
+ {
+ auto itr = _flushing.find(taskId);
+ if (itr != _flushing.end()) {
+ priority_flush_token = std::move(itr->second._priority_flush_token);
+ _flushing.erase(itr);
+ }
+ }
assert(ctx.getHandler());
if (_handlers.hasHandler(ctx.getHandler())) {
- _pendingPrune.insert(ctx.getHandler());
+ auto ins_res = _pendingPrune.emplace(ctx.getHandler(), PendingPrunes::mapped_type());
+ if (priority_flush_token) {
+ ins_res.first->second = std::move(priority_flush_token);
+ }
}
_cond.notify_all();
}
@@ -450,7 +469,7 @@ FlushEngine::putFlushHandler(const DocTypeName &docTypeName, const IFlushHandler
if (result) {
_pendingPrune.erase(result);
}
- _pendingPrune.insert(flushHandler);
+ _pendingPrune.emplace(flushHandler, PendingPrunes::mapped_type());
return result;
}
@@ -475,13 +494,13 @@ FlushEngine::getCurrentlyFlushingSet() const
}
uint32_t
-FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target)
+FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token)
{
uint32_t taskId;
{
std::lock_guard<std::mutex> guard(_lock);
taskId = _taskId++;
- FlushInfo flush(taskId, handler->getName(), target);
+ FlushInfo flush(taskId, handler->getName(), target, std::move(priority_flush_token));
_flushing[taskId] = flush;
}
LOG(debug, "FlushEngine::initFlush(handler='%s', target='%s') => taskId='%d'",
@@ -492,6 +511,9 @@ FlushEngine::initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP
void
FlushEngine::setStrategy(IFlushStrategy::SP strategy)
{
+ std::promise<void> promise;
+ auto future = promise.get_future();
+ auto priority_flush_token = std::make_shared<PriorityFlushToken>(std::move(promise));
std::lock_guard<std::mutex> setStrategyGuard(_setStrategyLock);
std::unique_lock<std::mutex> strategyGuard(_strategyLock);
if (_closed.load(std::memory_order_relaxed)) {
@@ -499,6 +521,7 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy)
}
assert(!_priorityStrategy);
_priorityStrategy = std::move(strategy);
+ _priority_flush_token = std::move(priority_flush_token);
{
std::lock_guard<std::mutex> guard(_lock);
_cond.notify_all();
@@ -506,6 +529,22 @@ FlushEngine::setStrategy(IFlushStrategy::SP strategy)
while (_priorityStrategy) {
_strategyCond.wait(strategyGuard);
}
+ strategyGuard.unlock();
+ /*
+ * Wait for flushes started before the strategy change, for
+ * flushes initiated by the strategy, and for flush engine to call
+ * prune() afterwards.
+ */
+ future.wait();
+}
+
+void
+FlushEngine::mark_currently_flushing_tasks(std::shared_ptr<PriorityFlushToken> priority_flush_token)
+{
+ std::lock_guard<std::mutex> guard(_lock);
+ for (auto& kv : _flushing) {
+ kv.second._priority_flush_token = priority_flush_token;
+ }
}
} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
index ec0f019f6e4..302c4a2499e 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h
@@ -3,6 +3,7 @@
#include "flushcontext.h"
#include "iflushstrategy.h"
+#include "priority_flush_token.h"
#include <vespa/searchcore/proton/common/handlermap.hpp>
#include <vespa/searchcore/proton/common/doctypename.h>
#include <vespa/vespalib/util/threadstackexecutor.h>
@@ -43,13 +44,15 @@ private:
struct FlushInfo : public FlushMeta
{
FlushInfo();
- FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP &target);
+ FlushInfo(uint32_t taskId, const vespalib::string& handler_name, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token);
~FlushInfo();
IFlushTarget::SP _target;
+ std::shared_ptr<PriorityFlushToken> _priority_flush_token;
};
using FlushMap = std::map<uint32_t, FlushInfo>;
using FlushHandlerMap = HandlerMap<IFlushHandler>;
+ using PendingPrunes = std::map<std::shared_ptr<IFlushHandler>, std::shared_ptr<PriorityFlushToken>>;
std::atomic<bool> _closed;
const uint32_t _maxConcurrentNormal;
const vespalib::duration _idleInterval;
@@ -58,6 +61,7 @@ private:
std::atomic<bool> _has_thread;
IFlushStrategy::SP _strategy;
mutable IFlushStrategy::SP _priorityStrategy;
+ mutable std::shared_ptr<PriorityFlushToken> _priority_flush_token;
vespalib::ThreadStackExecutor _executor;
mutable std::mutex _lock;
std::condition_variable _cond;
@@ -67,7 +71,7 @@ private:
std::mutex _strategyLock;
std::condition_variable _strategyCond;
std::shared_ptr<flushengine::ITlsStatsFactory> _tlsStatsFactory;
- std::set<IFlushHandler::SP> _pendingPrune;
+ PendingPrunes _pendingPrune;
std::shared_ptr<search::FlushToken> _normal_flush_token;
std::shared_ptr<search::FlushToken> _gc_flush_token;
@@ -78,8 +82,8 @@ private:
vespalib::string flushNextTarget(const vespalib::string & name, const FlushContext::List & contexts);
void flushAll(const FlushContext::List &lst);
bool prune();
- uint32_t initFlush(const FlushContext &ctx);
- uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target);
+ uint32_t initFlush(const FlushContext &ctx, std::shared_ptr<PriorityFlushToken> priority_flush_token);
+ uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target, std::shared_ptr<PriorityFlushToken> priority_flush_token);
void flushDone(const FlushContext &ctx, uint32_t taskId);
bool canFlushMore(const std::unique_lock<std::mutex> &guard, IFlushTarget::Priority priority) const;
void wait_for_slot_or_pending_prune(IFlushTarget::Priority priority);
@@ -179,6 +183,7 @@ public:
FlushMetaSet getCurrentlyFlushingSet() const;
void setStrategy(IFlushStrategy::SP strategy);
+ void mark_currently_flushing_tasks(std::shared_ptr<PriorityFlushToken> priority_flush_token);
uint32_t maxConcurrentTotal() const { return _maxConcurrentNormal + 1; }
uint32_t maxConcurrentNormal() const { return _maxConcurrentNormal; }
};
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp
new file mode 100644
index 00000000000..f031c19d1d8
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.cpp
@@ -0,0 +1,17 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "priority_flush_token.h"
+
+namespace proton {
+
+PriorityFlushToken::PriorityFlushToken(std::promise<void> promise)
+ : _promise(std::move(promise))
+{
+}
+
+PriorityFlushToken::~PriorityFlushToken()
+{
+ _promise.set_value();
+}
+
+}
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h
new file mode 100644
index 00000000000..82048b3eb3f
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/priority_flush_token.h
@@ -0,0 +1,21 @@
+// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/util/idestructorcallback.h>
+#include <future>
+
+namespace proton {
+
+/*
+ * This token is shared between flushes initiated from a priority flush
+ * strategy (cf. Proton::triggerFLush and Proton::prepareRestart).
+ */
+class PriorityFlushToken : public vespalib::IDestructorCallback {
+ std::promise<void> _promise;
+public:
+ PriorityFlushToken(std::promise<void> promise);
+ ~PriorityFlushToken() override;
+};
+
+}