diff options
11 files changed, 105 insertions, 33 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 714ffaa16b7..32707f8a69f 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -75,5 +75,17 @@ TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_fie assert_executor(index_inverter(), 12, 100); } +TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated) +{ + setup(4, SharedFieldWriterExecutor::NONE); + service->set_task_limits(5, 7, 11); + EXPECT_EQ(5, service->master_task_limit()); + EXPECT_EQ(7, service->index().getTaskLimit()); + EXPECT_EQ(11, service->summary().getTaskLimit()); + EXPECT_EQ(7, index_inverter()->first_executor()->getTaskLimit()); + EXPECT_EQ(7, index_writer()->first_executor()->getTaskLimit()); + EXPECT_EQ(7, attribute_writer()->first_executor()->getTaskLimit()); +} + GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp index 353ebeb3abc..50a8349b859 100644 --- a/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp +++ b/searchcore/src/tests/proton/documentdb/threading_service_config/threading_service_config_test.cpp @@ -14,15 +14,15 @@ using ProtonConfigBuilder = vespa::config::search::core::ProtonConfigBuilder; struct Fixture { ProtonConfig cfg; - Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t task_limit = 500, uint32_t semi_unbound_task_limit = 50000) - : cfg(makeConfig(baseLineIndexingThreads, task_limit, semi_unbound_task_limit)) + Fixture(uint32_t baseLineIndexingThreads = 2, uint32_t master_task_limit = 2000, uint32_t task_limit = 500) + : cfg(makeConfig(baseLineIndexingThreads, master_task_limit, task_limit)) { } - ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t task_limit, uint32_t semi_unbound_task_limit) { + ProtonConfig makeConfig(uint32_t baseLineIndexingThreads, uint32_t master_task_limit, uint32_t task_limit) { ProtonConfigBuilder builder; builder.indexing.threads = baseLineIndexingThreads; builder.indexing.tasklimit = task_limit; - builder.indexing.semiunboundtasklimit = semi_unbound_task_limit; + builder.feeding.masterTaskLimit = master_task_limit; return builder; } ThreadingServiceConfig make(uint32_t cpuCores) { @@ -51,28 +51,32 @@ TEST_F("require that indexing threads is always >= 1", Fixture(0)) TEST_DO(f.assertIndexingThreads(1, 0)); } -TEST_F("require that default task limit is set", Fixture) +TEST_F("require that task limits are set", Fixture) { - EXPECT_EQUAL(500u, f.make(24).defaultTaskLimit()); + auto tcfg = f.make(24); + EXPECT_EQUAL(2000u, tcfg.master_task_limit()); + EXPECT_EQUAL(500u, tcfg.defaultTaskLimit()); } namespace { -void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_default_task_limit, const ThreadingServiceConfig &config) { +void assertConfig(uint32_t exp_indexing_threads, uint32_t exp_master_task_limit, + uint32_t exp_default_task_limit, const ThreadingServiceConfig& config) { EXPECT_EQUAL(exp_indexing_threads, config.indexingThreads()); + EXPECT_EQUAL(exp_master_task_limit, config.master_task_limit()); EXPECT_EQUAL(exp_default_task_limit, config.defaultTaskLimit()); } } -TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 1000, 100000)) +TEST_FF("require that config can be somewhat updated", Fixture(), Fixture(2, 3000, 1000)) { auto cfg1 = f1.make(1); - assertConfig(2u, 500u, cfg1); + assertConfig(2u, 2000, 500u, cfg1); const auto cfg2 = f2.make(13); - assertConfig(3u, 1000u, cfg2); + assertConfig(3u, 3000u, 1000u, cfg2); cfg1.update(cfg2); - assertConfig(2u, 1000u, cfg1); // Indexing threads not changed + assertConfig(2u, 3000u, 1000u, cfg1); // Indexing threads not changed } TEST_MAIN() diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 6b85f4a6829..5857fdd4f8d 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -503,6 +503,13 @@ feeding.concurrency double default = 0.2 restart ## TODO: Remove this when a shared executor is the default. feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = NONE restart +## Maximum number of pending tasks for the master thread in each document db. +## +## This limit is only considered when executing tasks for handling external feed operations. +## In that case the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks. +## When this limit is set to 0 it is ignored. +feeding.master_task_limit int default = 0 + ## Adjustment to resource limit when determining if maintenance jobs can run. ## ## Currently used by 'lid_space_compaction' and 'move_buckets' jobs. diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 3f2fb6c4634..427d435aae7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -482,7 +482,9 @@ DocumentDB::applyConfig(DocumentDBConfig::SP configSnapshot, SerialNum serialNum if (_state.getState() >= DDBState::State::APPLY_LIVE_CONFIG) { _writeServiceConfig.update(configSnapshot->get_threading_service_config()); } - _writeService.setTaskLimit(_writeServiceConfig.defaultTaskLimit(), _writeServiceConfig.defaultTaskLimit()); + _writeService.set_task_limits(_writeServiceConfig.master_task_limit(), + _writeServiceConfig.defaultTaskLimit(), + _writeServiceConfig.defaultTaskLimit()); if (params.shouldSubDbsChange()) { applySubDBConfig(*configSnapshot, serialNum, params); if (serialNum < _feedHandler->getSerialNum()) { diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 0e9ba7a24c8..7e0a1851bf5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -42,11 +42,13 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor &sha : ExecutorThreadingService(sharedExecutor, ThreadingServiceConfig::make(num_treads)) {} -ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor & sharedExecutor, - const ThreadingServiceConfig & cfg, uint32_t stackSize) +ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + const ThreadingServiceConfig& cfg, + uint32_t stackSize) : _sharedExecutor(sharedExecutor), _masterExecutor(1, stackSize, master_executor), + _master_task_limit(cfg.master_task_limit()), _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)), _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)), _masterService(_masterExecutor), @@ -97,6 +99,16 @@ ExecutorThreadingService::sync_all_executors() { } void +ExecutorThreadingService::blocking_master_execute(vespalib::Executor::Task::UP task) +{ + uint32_t limit = master_task_limit(); + if (limit > 0) { + _masterExecutor.wait_for_task_count(limit); + } + _masterExecutor.execute(std::move(task)); +} + +void ExecutorThreadingService::syncOnce() { bool isMasterThread = _masterService.isCurrentThread(); if (!isMasterThread) { @@ -127,13 +139,16 @@ ExecutorThreadingService::shutdown() } void -ExecutorThreadingService::setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit) +ExecutorThreadingService::set_task_limits(uint32_t master_task_limit, + uint32_t field_task_limit, + uint32_t summary_task_limit) { - _indexExecutor->setTaskLimit(taskLimit); - _summaryExecutor->setTaskLimit(summaryTaskLimit); - _index_field_inverter_ptr->setTaskLimit(taskLimit); - _index_field_writer_ptr->setTaskLimit(taskLimit); - _attribute_field_writer_ptr->setTaskLimit(taskLimit); + _master_task_limit.store(master_task_limit, std::memory_order_release); + _indexExecutor->setTaskLimit(field_task_limit); + _summaryExecutor->setTaskLimit(summary_task_limit); + _index_field_inverter_ptr->setTaskLimit(field_task_limit); + _index_field_writer_ptr->setTaskLimit(field_task_limit); + _attribute_field_writer_ptr->setTaskLimit(field_task_limit); } ExecutorThreadingServiceStats diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index e571e205f47..1890ca300e2 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -19,6 +19,7 @@ class ExecutorThreadingService : public searchcorespi::index::IThreadingService private: vespalib::ThreadExecutor & _sharedExecutor; vespalib::ThreadStackExecutor _masterExecutor; + std::atomic<uint32_t> _master_task_limit; std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor; ExecutorThreadService _masterService; @@ -36,21 +37,27 @@ private: public: using OptimizeFor = vespalib::Executor::OptimizeFor; /** - * Constructor. - * - * @stackSize The size of the stack of the underlying executors. - * @cfg config used to set up all executors. + * Convenience constructor used in unit tests. */ - ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, - const ThreadingServiceConfig & cfg, uint32_t stackSize = 128 * 1024); - ExecutorThreadingService(vespalib::ThreadExecutor &sharedExecutor, uint32_t num_treads = 1); + ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, uint32_t num_treads = 1); + + ExecutorThreadingService(vespalib::ThreadExecutor& sharedExecutor, + const ThreadingServiceConfig& cfg, + uint32_t stackSize = 128 * 1024); ~ExecutorThreadingService() override; void sync_all_executors() override; + void blocking_master_execute(vespalib::Executor::Task::UP task) override; + void shutdown(); - void setTaskLimit(uint32_t taskLimit, uint32_t summaryTaskLimit); + uint32_t master_task_limit() const { + return _master_task_limit.load(std::memory_order_relaxed); + } + void set_task_limits(uint32_t master_task_limit, + uint32_t field_task_limit, + uint32_t summary_task_limit); // Expose the underlying executors for stats fetching and testing. // TOD: Remove - This is only used for casting to check the underlying type diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index c9294150f16..ea63d59c830 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -736,7 +736,13 @@ FeedHandler::performOperation(FeedToken token, FeedOperation::UP op) void FeedHandler::handleOperation(FeedToken token, FeedOperation::UP op) { - _writeService.master().execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable { + // This function is only called when handling external feed operations (see PersistenceHandlerProxy), + // and ensures that the calling thread (persistence thread) is blocked until the master thread has capacity to handle more tasks. + // This helps keeping feed operation latencies and memory usage in check. + // NOTE: Tasks that are created and executed from the master thread itself or some of its helpers + // cannot use blocking_master_execute() as that could lead to deadlocks. + // See FeedHandler::initiateCommit() for a concrete example. + _writeService.blocking_master_execute(makeLambdaTask([this, token = std::move(token), op = std::move(op)]() mutable { doHandleOperation(std::move(token), std::move(op)); })); } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 012d91cb49f..ff75a59c41b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -11,12 +11,14 @@ using OptimizeFor = vespalib::Executor::OptimizeFor; ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, + uint32_t master_task_limit_, uint32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_) : _indexingThreads(indexingThreads_), + _master_task_limit(master_task_limit_), _defaultTaskLimit(defaultTaskLimit_), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), @@ -59,7 +61,9 @@ ThreadingServiceConfig ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo) { uint32_t indexingThreads = calculateIndexingThreads(cfg.indexing, concurrency, cpuInfo); - return ThreadingServiceConfig(indexingThreads, cfg.indexing.tasklimit, + return ThreadingServiceConfig(indexingThreads, + cfg.feeding.masterTaskLimit, + cfg.indexing.tasklimit, selectOptimization(cfg.indexing.optimize), cfg.indexing.kindOfWatermark, vespalib::from_s(cfg.indexing.reactiontime), @@ -68,12 +72,13 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const ThreadingServiceConfig ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) { - return ThreadingServiceConfig(indexingThreads, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_); + return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_); } void ThreadingServiceConfig::update(const ThreadingServiceConfig& cfg) { + _master_task_limit = cfg._master_task_limit; _defaultTaskLimit = cfg._defaultTaskLimit; } @@ -81,6 +86,7 @@ bool ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const { return _indexingThreads == rhs._indexingThreads && + _master_task_limit == rhs._master_task_limit && _defaultTaskLimit == rhs._defaultTaskLimit && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index 5869eaf9c2e..f1a4f0525d1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -21,6 +21,7 @@ public: private: uint32_t _indexingThreads; + uint32_t _master_task_limit; uint32_t _defaultTaskLimit; OptimizeFor _optimize; uint32_t _kindOfWatermark; @@ -28,14 +29,16 @@ private: SharedFieldWriterExecutor _shared_field_writer; private: - ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t defaultTaskLimit_, OptimizeFor optimize_, - uint32_t kindOfWatermark_, vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); + ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, uint32_t defaultTaskLimit_, + OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_, + SharedFieldWriterExecutor shared_field_writer_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE); void update(const ThreadingServiceConfig& cfg); uint32_t indexingThreads() const { return _indexingThreads; } + uint32_t master_task_limit() const { return _master_task_limit; } uint32_t defaultTaskLimit() const { return _defaultTaskLimit; } OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index 1d379f439fa..46527362091 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -37,6 +37,10 @@ public: _service.sync_all_executors(); } + void blocking_master_execute(vespalib::Executor::Task::UP task) override { + _service.blocking_master_execute(std::move(task)); + } + searchcorespi::index::IThreadService &master() override { return _master; } diff --git a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h index f30aec94d53..0660f3ab495 100644 --- a/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcorespi/src/vespa/searchcorespi/index/ithreadingservice.h @@ -65,6 +65,12 @@ struct IThreadingService virtual void sync_all_executors() = 0; + /** + * Block the calling thread until the master thread has capacity to handle more tasks, + * and then execute the given task in the master thread. + */ + virtual void blocking_master_execute(vespalib::Executor::Task::UP task) = 0; + virtual IThreadService &master() = 0; virtual IThreadService &index() = 0; virtual IThreadService &summary() = 0; |