From 658955c5600e264eb527ff74871fa41f0de4bd42 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 10 May 2023 14:38:48 +0000 Subject: Ensure we have enough threads in the flushengine thread pool. --- searchcore/src/tests/proton/flushengine/flushengine_test.cpp | 3 +++ .../src/vespa/searchcore/proton/flushengine/flushengine.cpp | 8 ++++---- searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h | 4 +++- 3 files changed, 10 insertions(+), 5 deletions(-) (limited to 'searchcore/src') diff --git a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp index dd8096e1d46..fd6274e83a6 100644 --- a/searchcore/src/tests/proton/flushengine/flushengine_test.cpp +++ b/searchcore/src/tests/proton/flushengine/flushengine_test.cpp @@ -727,6 +727,9 @@ TEST_F("require that high pri concurrency works", Fixture(2, 1ms)) auto handler = std::make_shared(Targets({target1, target2, target3, target4, target5}), "handler", 9); f.putFlushHandler("handler", handler); f.engine.start(); + EXPECT_EQUAL(2u, f.engine.maxConcurrentNormal()); + EXPECT_EQUAL(3u, f.engine.maxConcurrentTotal()); + EXPECT_EQUAL(f.engine.maxConcurrentTotal(), f.engine.get_executor().getNumThreads()); EXPECT_TRUE(target1->_initDone.await(LONG_TIMEOUT)); EXPECT_TRUE(target2->_initDone.await(LONG_TIMEOUT)); diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index b9382ffa824..2d47ea8fb4e 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -82,14 +82,14 @@ FlushEngine::FlushInfo::FlushInfo(uint32_t taskId, const vespalib::string& handl FlushEngine::FlushEngine(std::shared_ptr tlsStatsFactory, IFlushStrategy::SP strategy, uint32_t numThreads, vespalib::duration idleInterval) : _closed(false), - _maxConcurrent(numThreads), + _maxConcurrentNormal(numThreads), _idleInterval(idleInterval), _taskId(0), _thread(), _has_thread(false), _strategy(std::move(strategy)), _priorityStrategy(), - _executor(numThreads, CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)), + _executor(maxConcurrentTotal(), CpuUsage::wrap(flush_engine_executor, CpuUsage::Category::COMPACT)), _lock(), _cond(), _handlers(), @@ -150,9 +150,9 @@ bool FlushEngine::canFlushMore(const std::unique_lock &, IFlushTarget::Priority priority) const { if (priority > IFlushTarget::Priority::NORMAL) { - return (_maxConcurrent + 1) > _flushing.size(); + return maxConcurrentTotal() > _flushing.size(); } else { - return _maxConcurrent > _flushing.size(); + return maxConcurrentNormal() > _flushing.size(); } } diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h index 29d65c0f9fb..67235aeb539 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.h @@ -51,7 +51,7 @@ private: using FlushMap = std::map; using FlushHandlerMap = HandlerMap; std::atomic _closed; - const uint32_t _maxConcurrent; + const uint32_t _maxConcurrentNormal; const vespalib::duration _idleInterval; uint32_t _taskId; std::thread _thread; @@ -179,6 +179,8 @@ public: FlushMetaSet getCurrentlyFlushingSet() const; void setStrategy(IFlushStrategy::SP strategy); + uint32_t maxConcurrentTotal() const { return _maxConcurrentNormal + 1; } + uint32_t maxConcurrentNormal() const { return _maxConcurrentNormal; } }; } // namespace proton -- cgit v1.2.3