diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-09 14:35:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-09 14:35:54 +0200 |
commit | 09898990dd5ba74d09da74a56f137e2fff505be5 (patch) | |
tree | a16caf61353c61edcd42ec158f258c9c5616acd2 /searchcore | |
parent | 73bbf8d0de7277be2c8a8bad9d1033e29c87d518 (diff) | |
parent | 7ec8c50b97dcb8fb382aaea21eb1246a1fac7d60 (diff) |
Merge pull request #27029 from vespa-engine/balder/always-keep-room-for-one-extra-high-priority-flush
Leave at least one slot available for high priority flush targets.
Diffstat (limited to 'searchcore')
3 files changed, 91 insertions, 21 deletions
diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index 3fdc5a8ce9f..dd8096e1d46 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -235,6 +235,7 @@ protected: SimpleTarget(const std::string &name, const Type &type, search::SerialNum flushedSerial = 0, bool proceedImmediately = true) : test::DummyFlushTarget(name, type, Component::OTHER), _flushedSerial(flushedSerial), + _currentSerial(0), _proceed(), _initDone(), _taskStart(), @@ -266,7 +267,7 @@ public: { } SimpleTarget(const std::string &name, search::SerialNum flushedSerial = 0, bool proceedImmediately = true) - : SimpleTarget(name, Type::OTHER, flushedSerial, proceedImmediately) + : SimpleTarget(name, Type::OTHER, flushedSerial, proceedImmediately) { } Time getLastFlushTime() const override { return vespalib::system_clock::now(); } @@ -288,10 +289,21 @@ public: class GCTarget : public SimpleTarget { public: GCTarget(const vespalib::string &name, search::SerialNum flushedSerial) - : SimpleTarget(name, Type::GC, flushedSerial) + : SimpleTarget(name, Type::GC, flushedSerial) {} }; +class HighPriorityTarget : public SimpleTarget { +public: + HighPriorityTarget(const vespalib::string &name, search::SerialNum flushedSerial, bool proceed) + : SimpleTarget(name, Type::OTHER, flushedSerial, proceed) + {} + + Priority getPriority() const override { + return Priority::HIGH; + } +}; + class AssertedTarget : public SimpleTarget { public: mutable bool _mgain; @@ -705,6 +717,44 @@ TEST_F("require that concurrency works", Fixture(2, 1ms)) target2->_proceed.countDown(); } +TEST_F("require that high pri concurrency works", Fixture(2, 1ms)) +{ + auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); + auto target2 = std::make_shared<SimpleTarget>("target2", 2, false); + auto target3 = std::make_shared<SimpleTarget>("target3", 2, false); + auto target4 = std::make_shared<HighPriorityTarget>("target4", 3, false); + auto target5 = std::make_shared<HighPriorityTarget>("target5", 5, false); + auto handler = std::make_shared<SimpleHandler>(Targets({target1, target2, target3, target4, target5}), "handler", 9); + f.putFlushHandler("handler", handler); + f.engine.start(); + + EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target4->_initDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); + EXPECT_FALSE(target5->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target1", "handler.target2", "handler.target4"}); + target1->_proceed.countDown(); + EXPECT_TRUE(target1->_taskDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target5->_initDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target2", "handler.target4", "handler.target5"}); + target2->_proceed.countDown(); + EXPECT_TRUE(target2->_taskDone.await(LONG_TIMEOUT)); + EXPECT_FALSE(target3->_initDone.await(SHORT_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target4", "handler.target5"}); + target4->_proceed.countDown(); + EXPECT_TRUE(target4->_taskDone.await(LONG_TIMEOUT)); + EXPECT_TRUE(target3->_initDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target5", "handler.target3"}); + target3->_proceed.countDown(); + EXPECT_TRUE(target3->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {"handler.target5"}); + target5->_proceed.countDown(); + EXPECT_TRUE(target5->_taskDone.await(LONG_TIMEOUT)); + assertThatHandlersInCurrentSet(f.engine, {}); +} + TEST_F("require that concurrency works with triggerFlush", Fixture(2, 1ms)) { auto target1 = std::make_shared<SimpleTarget>("target1", 1, false); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 51713f26307..b9382ffa824 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -147,20 +147,26 @@ FlushEngine::kick() } bool -FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &guard) const +FlushEngine::canFlushMore(const std::unique_lock<std::mutex> &, IFlushTarget::Priority priority) const { - (void) guard; - return _maxConcurrent > _flushing.size(); + if (priority > IFlushTarget::Priority::NORMAL) { + return (_maxConcurrent + 1) > _flushing.size(); + } else { + return _maxConcurrent > _flushing.size(); + } +} + +void +FlushEngine::idle_wait(vespalib::duration minimumWaitTimeIfReady) { + std::unique_lock<std::mutex> guard(_lock); + _cond.wait_for(guard, minimumWaitTimeIfReady); } bool -FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune) +FlushEngine::wait_for_slot(IFlushTarget::Priority priority) { std::unique_lock<std::mutex> guard(_lock); - if (canFlushMore(guard) && _pendingPrune.empty()) { - _cond.wait_for(guard, minimumWaitTimeIfReady); - } - while ( ! canFlushMore(guard) && ( ignorePendingPrune || _pendingPrune.empty())) { + while ( ! canFlushMore(guard, priority)) { _cond.wait_for(guard, 1s); // broadcast when flush done } return !_closed.load(std::memory_order_relaxed); @@ -169,13 +175,19 @@ FlushEngine::wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingP void FlushEngine::wait_for_slot_or_pending_prune(IFlushTarget::Priority priority) { - (void) priority; std::unique_lock<std::mutex> guard(_lock); - while ( ! canFlushMore(guard) && _pendingPrune.empty()) { + while ( ! canFlushMore(guard, priority) && _pendingPrune.empty()) { _cond.wait_for(guard, 1s); // broadcast when flush done } } +bool +FlushEngine::has_slot(IFlushTarget::Priority priority) +{ + std::unique_lock<std::mutex> guard(_lock); + return canFlushMore(guard, priority); +} + vespalib::string FlushEngine::checkAndFlush(vespalib::string prev) { std::pair<FlushContext::List, bool> lst = getSortedTargetList(); @@ -183,8 +195,17 @@ FlushEngine::checkAndFlush(vespalib::string prev) { // Everything returned from a priority strategy should be flushed flushAll(lst.first); } else if ( ! lst.first.empty()) { - wait_for_slot_or_pending_prune(lst.first[0]->getTarget()->getPriority()); - prev = flushNextTarget(prev, lst.first); + if (has_slot(IFlushTarget::Priority::NORMAL)) { + prev = flushNextTarget(prev, lst.first); + } else { + FlushContext::List highPri; + for (const FlushContext::SP & ctx : lst.first) { + if (ctx->getTarget()->getPriority() > IFlushTarget::Priority::NORMAL) { + highPri.push_back(ctx); + } + } + prev = flushNextTarget(prev, highPri); + } if (!prev.empty()) { // Sleep 1 ms after a successful flush in order to avoid busy loop in case // of strategy or target error. @@ -208,7 +229,7 @@ FlushEngine::run() } else { prevFlushName = checkAndFlush(prevFlushName); if (prevFlushName.empty()) { - wait(idleInterval); + idle_wait(idleInterval); } } } @@ -351,7 +372,7 @@ FlushEngine::flushAll(const FlushContext::List &lst) { LOG(debug, "%ld targets to flush.", lst.size()); for (const FlushContext::SP & ctx : lst) { - if (wait(vespalib::duration::zero(), true)) { + if (wait_for_slot(IFlushTarget::Priority::NORMAL)) { if (ctx->initFlush(get_flush_token(*ctx))) { logTarget("initiated", *ctx); _executor.execute(std::make_unique<FlushTask>(initFlush(*ctx), *this, ctx)); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 0be086729fa..29d65c0f9fb 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -81,12 +81,11 @@ private: uint32_t initFlush(const FlushContext &ctx); uint32_t initFlush(const IFlushHandler::SP &handler, const IFlushTarget::SP &target); void flushDone(const FlushContext &ctx, uint32_t taskId); - bool canFlushMore(const std::unique_lock<std::mutex> &guard) const; + bool canFlushMore(const std::unique_lock<std::mutex> &guard, IFlushTarget::Priority priority) const; void wait_for_slot_or_pending_prune(IFlushTarget::Priority priority); - bool wait(vespalib::duration minimumWaitTimeIfReady, bool ignorePendingPrune); - void wait(vespalib::duration minimumWaitTimeIfReady) { - wait(minimumWaitTimeIfReady, false); - } + void idle_wait(vespalib::duration minimumWaitTimeIfReady); + bool wait_for_slot(IFlushTarget::Priority priority); + bool has_slot(IFlushTarget::Priority priority); bool isFlushing(const std::lock_guard<std::mutex> &guard, const vespalib::string & name) const; vespalib::string checkAndFlush(vespalib::string prev); |