diff options
author | Geir Storli <geirst@yahooinc.com> | 2022-05-16 17:04:44 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-16 17:04:44 +0200 |
commit | 4bb588d36eab746d4bc5d210f869b116790ff437 (patch) | |
tree | 3ab47098beae9fde6e9fb5fb95bdf88497ecf42b | |
parent | cc8d97a8e9f56d139700424e1393ea954723ae2d (diff) | |
parent | e6ff27a287c753d5ccbe3c08eb881dc2c78e3e7a (diff) |
Merge pull request #22621 from vespa-engine/geirst/shared-field-writer-cleanup
Remove unused code after the shared field writer is used for all docu…
14 files changed, 74 insertions, 193 deletions
diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index 46a6419e924..d4f4e24ba6c 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -9,7 +9,6 @@ using namespace proton; using vespalib::ISequencedTaskExecutor; using vespalib::SequencedTaskExecutor; -using SharedFieldWriterExecutor = ThreadingServiceConfig::SharedFieldWriterExecutor; VESPA_THREAD_STACK_TAG(my_field_writer_executor) @@ -30,13 +29,13 @@ public: service() { } - void setup(uint32_t indexing_threads, SharedFieldWriterExecutor shared_field_writer) { + void setup(uint32_t indexing_threads) { service = std::make_unique<ExecutorThreadingService>(_transport.shared(), _transport.transport(), _transport.clock(), - field_writer_executor.get(), + *field_writer_executor, nullptr, - ThreadingServiceConfig::make(indexing_threads, shared_field_writer)); + ThreadingServiceConfig::make(indexing_threads)); } SequencedTaskExecutor* index_inverter() { return to_concrete_type(service->indexFieldInverter()); @@ -59,36 +58,9 @@ assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t ex EXPECT_EQ(exp_task_limit, exec->first_executor()->getTaskLimit()); } -TEST_F(ExecutorThreadingServiceTest, no_shared_field_writer_executor) -{ - setup(4, SharedFieldWriterExecutor::NONE); - EXPECT_NE(index_inverter(), index_writer()); - EXPECT_NE(index_writer(), attribute_writer()); - assert_executor(index_inverter(), 4, 100); - assert_executor(index_writer(), 4, 100); - assert_executor(attribute_writer(), 4, 100); -} - -TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_field_writers) -{ - setup(4, SharedFieldWriterExecutor::INDEX); - EXPECT_EQ(index_inverter(), index_writer()); - EXPECT_NE(index_inverter(), attribute_writer()); - assert_executor(index_inverter(), 8, 100); - assert_executor(attribute_writer(), 4, 100); -} - -TEST_F(ExecutorThreadingServiceTest, shared_executor_for_index_and_attribute_field_writers) -{ - setup(4, SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE); - EXPECT_EQ(index_inverter(), index_writer()); - EXPECT_EQ(index_inverter(), attribute_writer()); - assert_executor(index_inverter(), 12, 100); -} - TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside) { - setup(4, SharedFieldWriterExecutor::DOCUMENT_DB); + setup(4); EXPECT_EQ(field_writer(), index_inverter()); EXPECT_EQ(field_writer(), index_writer()); EXPECT_EQ(field_writer(), attribute_writer()); @@ -97,7 +69,7 @@ TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outs TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated) { - setup(4, SharedFieldWriterExecutor::NONE); + setup(4); service->set_task_limits(5, 7, 11); EXPECT_EQ(5, service->master_task_limit()); EXPECT_EQ(7, service->index().getTaskLimit()); diff --git a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp index a15bd8c67c9..948c98b1034 100644 --- a/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp +++ b/searchcore/src/tests/proton/server/shared_threading_service/shared_threading_service_test.cpp @@ -26,7 +26,6 @@ make_proton_config(double concurrency) builder.flush.maxconcurrent = 1; builder.feeding.concurrency = concurrency; - builder.feeding.sharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor::DOCUMENT_DB; builder.indexing.tasklimit = 255; return builder; } @@ -65,7 +64,7 @@ public: transport.transport(), bucket_executor); } SequencedTaskExecutor* field_writer() { - return dynamic_cast<SequencedTaskExecutor*>(service->field_writer()); + return dynamic_cast<SequencedTaskExecutor*>(&service->field_writer()); } }; diff --git a/searchcore/src/vespa/searchcore/config/proton.def b/searchcore/src/vespa/searchcore/config/proton.def index 51536299d51..7cc78178b51 100644 --- a/searchcore/src/vespa/searchcore/config/proton.def +++ b/searchcore/src/vespa/searchcore/config/proton.def @@ -506,16 +506,6 @@ hwinfo.cpu.cores int default = 0 restart ## Deprecated -> Use documentdb.feeding.concurrency feeding.concurrency double default = 0.2 restart -## Whether we should use a shared field writer executor in the document db threading service: -## -## NONE: Don't use a shared executor. -## INDEX: Use a shared executor for index field inverter and index field writer. -## INDEX_AND_ATTRIBUTE: Use a shared executor for index field inverter, index field writer, and attribute field writer. -## DOCUMENT_DB: Use a shared executor for index field inverter, index field writer, and attribute field writer among all document dbs. -## -## TODO: Remove this when a shared executor is the default. -feeding.shared_field_writer_executor enum {NONE, INDEX, INDEX_AND_ATTRIBUTE, DOCUMENT_DB} default = DOCUMENT_DB restart - ## Maximum number of pending tasks for the master thread in each document db. ## ## This limit is only considered when executing tasks for handling external feed operations. diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index dd735c75d79..eb1c953dcf5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -15,7 +15,6 @@ using vespalib::SingleExecutor; using vespalib::SyncableThreadExecutor; using vespalib::steady_time; using OptimizeFor = vespalib::Executor::OptimizeFor; -using SharedFieldWriterExecutor = proton::ThreadingServiceConfig::SharedFieldWriterExecutor; namespace proton { @@ -34,22 +33,21 @@ createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor VESPA_THREAD_STACK_TAG(master_executor) VESPA_THREAD_STACK_TAG(index_executor) VESPA_THREAD_STACK_TAG(summary_executor) -VESPA_THREAD_STACK_TAG(index_field_inverter_executor) -VESPA_THREAD_STACK_TAG(index_field_writer_executor) -VESPA_THREAD_STACK_TAG(attribute_field_writer_executor) -VESPA_THREAD_STACK_TAG(field_writer_executor) } -ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor &sharedExecutor, FNET_Transport & transport, - const vespalib::Clock & clock, uint32_t num_treads) - : ExecutorThreadingService(sharedExecutor, transport, clock, nullptr, nullptr, ThreadingServiceConfig::make(num_treads)) +ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor& sharedExecutor, + FNET_Transport& transport, + const vespalib::Clock& clock, + vespalib::ISequencedTaskExecutor& field_writer, + uint32_t num_treads) + : ExecutorThreadingService(sharedExecutor, transport, clock, field_writer, nullptr, ThreadingServiceConfig::make(num_treads)) {} ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedExecutor, FNET_Transport & transport, const vespalib::Clock & clock, - vespalib::ISequencedTaskExecutor * field_writer, + vespalib::ISequencedTaskExecutor& field_writer, vespalib::InvokeService * invokerService, const ThreadingServiceConfig & cfg, uint32_t stackSize) @@ -58,7 +56,6 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx _transport(transport), _clock(clock), _masterExecutor(1, stackSize, CpuUsage::wrap(master_executor, CpuUsage::Category::WRITE)), - _shared_field_writer(cfg.shared_field_writer()), _master_task_limit(cfg.master_task_limit()), _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), CpuUsage::wrap(index_executor, CpuUsage::Category::WRITE))), @@ -66,62 +63,15 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))), _masterService(_masterExecutor), _indexService(*_indexExecutor), - _indexFieldInverter(), - _indexFieldWriter(), - _attributeFieldWriter(), - _field_writer(), - _index_field_inverter_ptr(), - _index_field_writer_ptr(), - _attribute_field_writer_ptr(), + _index_field_inverter(field_writer), + _index_field_writer(field_writer), + _attribute_field_writer(field_writer), _invokeRegistrations() { if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_indexExecutor.get()](){ executor->wakeup();})); _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_summaryExecutor.get()](){ executor->wakeup();})); } - if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { - _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE), - cfg.indexingThreads() * 2, cfg.defaultTaskLimit()); - _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE), - cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); - if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { - _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); - } - _index_field_inverter_ptr = _field_writer.get(); - _index_field_writer_ptr = _field_writer.get(); - _attribute_field_writer_ptr = _attributeFieldWriter.get(); - - } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { - _field_writer = SequencedTaskExecutor::create(CpuUsage::wrap(field_writer_executor, CpuUsage::Category::WRITE), - cfg.indexingThreads() * 3, cfg.defaultTaskLimit(), - cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); - if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { - _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_field_writer.get()](){ executor->wakeup();})); - } - _index_field_inverter_ptr = _field_writer.get(); - _index_field_writer_ptr = _field_writer.get(); - _attribute_field_writer_ptr = _field_writer.get(); - } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) { - assert(field_writer != nullptr); - _index_field_inverter_ptr = field_writer; - _index_field_writer_ptr = field_writer; - _attribute_field_writer_ptr = field_writer; - } else { - _indexFieldInverter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_inverter_executor, CpuUsage::Category::WRITE), - cfg.indexingThreads(), cfg.defaultTaskLimit()); - _indexFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(index_field_writer_executor, CpuUsage::Category::WRITE), - cfg.indexingThreads(), cfg.defaultTaskLimit()); - _attributeFieldWriter = SequencedTaskExecutor::create(CpuUsage::wrap(attribute_field_writer_executor, CpuUsage::Category::WRITE), - cfg.indexingThreads(), cfg.defaultTaskLimit(), - cfg.is_task_limit_hard(), cfg.optimize(), cfg.kindOfwatermark()); - if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { - _invokeRegistrations.push_back(invokerService->registerInvoke([executor=_attributeFieldWriter.get()](){ executor->wakeup();})); - } - _index_field_inverter_ptr = _indexFieldInverter.get(); - _index_field_writer_ptr = _indexFieldWriter.get(); - _attribute_field_writer_ptr = _attributeFieldWriter.get(); - } } ExecutorThreadingService::~ExecutorThreadingService() = default; @@ -140,11 +90,11 @@ void ExecutorThreadingService::shutdown() { _masterExecutor.shutdown().sync(); - _attribute_field_writer_ptr->sync_all(); + _attribute_field_writer.sync_all(); _summaryExecutor->shutdown().sync(); _indexExecutor->shutdown().sync(); - _index_field_inverter_ptr->sync_all(); - _index_field_writer_ptr->sync_all(); + _index_field_inverter.sync_all(); + _index_field_writer.sync_all(); } void @@ -156,9 +106,9 @@ ExecutorThreadingService::set_task_limits(uint32_t master_task_limit, _indexExecutor->setTaskLimit(field_task_limit); _summaryExecutor->setTaskLimit(summary_task_limit); // TODO: Move this to a common place when the field writer is always shared. - _index_field_inverter_ptr->setTaskLimit(field_task_limit); - _index_field_writer_ptr->setTaskLimit(field_task_limit); - _attribute_field_writer_ptr->setTaskLimit(field_task_limit); + _index_field_inverter.setTaskLimit(field_task_limit); + _index_field_writer.setTaskLimit(field_task_limit); + _attribute_field_writer.setTaskLimit(field_task_limit); } ExecutorThreadingServiceStats @@ -167,44 +117,25 @@ ExecutorThreadingService::getStats() auto master_stats = _masterExecutor.getStats(); auto index_stats = _indexExecutor->getStats(); auto summary_stats = _summaryExecutor->getStats(); - if (_shared_field_writer == SharedFieldWriterExecutor::INDEX) { - auto field_writer_stats = _field_writer->getStats(); - return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, - field_writer_stats, - field_writer_stats, - _attribute_field_writer_ptr->getStats()); - } else if (_shared_field_writer == SharedFieldWriterExecutor::INDEX_AND_ATTRIBUTE) { - auto field_writer_stats = _field_writer->getStats(); - return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, - field_writer_stats, - field_writer_stats, - field_writer_stats); - } else if (_shared_field_writer == SharedFieldWriterExecutor::DOCUMENT_DB) { - vespalib::ExecutorStats empty_stats; - // In this case the field writer stats are reported at a higher level. - return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, - empty_stats, empty_stats, empty_stats); - } else { - return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, - _index_field_inverter_ptr->getStats(), - _index_field_writer_ptr->getStats(), - _attribute_field_writer_ptr->getStats()); - } + vespalib::ExecutorStats empty_stats; + // In this case the field writer stats are reported at a higher level. + return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, + empty_stats, empty_stats, empty_stats); } vespalib::ISequencedTaskExecutor & ExecutorThreadingService::indexFieldInverter() { - return *_index_field_inverter_ptr; + return _index_field_inverter; } vespalib::ISequencedTaskExecutor & ExecutorThreadingService::indexFieldWriter() { - return *_index_field_writer_ptr; + return _index_field_writer; } vespalib::ISequencedTaskExecutor & ExecutorThreadingService::attributeFieldWriter() { - return *_attribute_field_writer_ptr; + return _attribute_field_writer; } } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 1179c88ef76..03c3feabd86 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -24,19 +24,14 @@ private: FNET_Transport & _transport; const vespalib::Clock & _clock; vespalib::ThreadStackExecutor _masterExecutor; - ThreadingServiceConfig::SharedFieldWriterExecutor _shared_field_writer; std::atomic<uint32_t> _master_task_limit; std::unique_ptr<vespalib::SyncableThreadExecutor> _indexExecutor; std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor; SyncableExecutorThreadService _masterService; ExecutorThreadService _indexService; - std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldInverter; - std::unique_ptr<vespalib::ISequencedTaskExecutor> _indexFieldWriter; - std::unique_ptr<vespalib::ISequencedTaskExecutor> _attributeFieldWriter; - std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer; - vespalib::ISequencedTaskExecutor* _index_field_inverter_ptr; - vespalib::ISequencedTaskExecutor* _index_field_writer_ptr; - vespalib::ISequencedTaskExecutor* _attribute_field_writer_ptr; + vespalib::ISequencedTaskExecutor& _index_field_inverter; + vespalib::ISequencedTaskExecutor& _index_field_writer; + vespalib::ISequencedTaskExecutor& _attribute_field_writer; std::vector<Registration> _invokeRegistrations; public: @@ -44,13 +39,16 @@ public: /** * Convenience constructor used in unit tests. */ - ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, - const vespalib::Clock & clock, uint32_t num_treads = 1); + ExecutorThreadingService(vespalib::Executor& sharedExecutor, + FNET_Transport& transport, + const vespalib::Clock& clock, + vespalib::ISequencedTaskExecutor& field_writer, + uint32_t num_treads = 1); ExecutorThreadingService(vespalib::Executor& sharedExecutor, FNET_Transport & transport, const vespalib::Clock & clock, - vespalib::ISequencedTaskExecutor* field_writer, + vespalib::ISequencedTaskExecutor& field_writer, vespalib::InvokeService * invokeService, const ThreadingServiceConfig& cfg, uint32_t stackSize = 128 * 1024); diff --git a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h index dccea41373f..0bfd874b90f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_shared_threading_service.h @@ -39,11 +39,8 @@ public: /** * Returns the sequenced executor used to write index and attribute fields in a document db. - * - * This is a nullptr if the field writer is not shared across all document dbs. - * TODO: Make this a reference when it is always shared. */ - virtual vespalib::ISequencedTaskExecutor* field_writer() = 0; + virtual vespalib::ISequencedTaskExecutor& field_writer() = 0; /** * Returns an InvokeService intended for regular wakeup calls. diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 8d52ecc0ba7..b5e70a540ec 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -809,9 +809,7 @@ Proton::updateMetrics(const metrics::MetricLockGuard &) if (_shared_service) { metrics.shared.update(_shared_service->shared().getStats()); metrics.warmup.update(_shared_service->warmup().getStats()); - if (_shared_service->field_writer()) { - metrics.field_writer.update(_shared_service->field_writer()->getStats()); - } + metrics.field_writer.update(_shared_service->field_writer().getStats()); } } } @@ -970,7 +968,7 @@ Proton::get_child(vespalib::stringref name) const (_flushEngine) ? &_flushEngine->get_executor() : nullptr, &_executor, (_shared_service) ? &_shared_service->warmup() : nullptr, - (_shared_service) ? _shared_service->field_writer() : nullptr); + (_shared_service) ? &_shared_service->field_writer() : nullptr); } else if (name == HW_INFO) { return std::make_unique<HwInfoExplorer>(_hw_info); diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp index 79827ce81c0..82d7f6c650d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.cpp @@ -16,8 +16,6 @@ VESPA_THREAD_STACK_TAG(proton_warmup_executor) namespace proton { -using SharedFieldWriterExecutor = ThreadingServiceConfig::ProtonConfig::Feeding::SharedFieldWriterExecutor; - SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfig& cfg, FNET_Transport& transport, storage::spi::BucketExecutor& bucket_executor) @@ -35,18 +33,16 @@ SharedThreadingService::SharedThreadingService(const SharedThreadingServiceConfi _clock(_invokeService.nowRef()) { const auto& fw_cfg = cfg.field_writer_config(); - if (fw_cfg.shared_field_writer() == SharedFieldWriterExecutor::DOCUMENT_DB) { - _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE), - fw_cfg.indexingThreads() * 3, - fw_cfg.defaultTaskLimit(), - fw_cfg.is_task_limit_hard(), - fw_cfg.optimize(), - fw_cfg.kindOfwatermark()); - if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) { - _invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() { - executor->wakeup(); - })); - } + _field_writer = vespalib::SequencedTaskExecutor::create(CpuUsage::wrap(proton_field_writer_executor, CpuUsage::Category::WRITE), + fw_cfg.indexingThreads() * 3, + fw_cfg.defaultTaskLimit(), + fw_cfg.is_task_limit_hard(), + fw_cfg.optimize(), + fw_cfg.kindOfwatermark()); + if (fw_cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT) { + _invokeRegistrations.push_back(_invokeService.registerInvoke([executor = _field_writer.get()]() { + executor->wakeup(); + })); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h index ead16441da0..019b9fe0596 100644 --- a/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/shared_threading_service.h @@ -36,7 +36,7 @@ public: vespalib::ThreadExecutor& warmup() override { return *_warmup; } vespalib::ThreadExecutor& shared() override { return *_shared; } - vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } + vespalib::ISequencedTaskExecutor& field_writer() override { return *_field_writer; } vespalib::InvokeService & invokeService() override { return _invokeService; } FNET_Transport & transport() override { return _transport; } storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp index 335d5bab8d0..c129a78b045 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.cpp @@ -15,16 +15,14 @@ ThreadingServiceConfig::ThreadingServiceConfig(uint32_t indexingThreads_, int32_t defaultTaskLimit_, OptimizeFor optimize_, uint32_t kindOfWatermark_, - vespalib::duration reactionTime_, - SharedFieldWriterExecutor shared_field_writer_) + vespalib::duration reactionTime_) : _indexingThreads(indexingThreads_), _master_task_limit(master_task_limit_), _defaultTaskLimit(std::abs(defaultTaskLimit_)), _is_task_limit_hard(defaultTaskLimit_ >= 0), _optimize(optimize_), _kindOfWatermark(kindOfWatermark_), - _reactionTime(reactionTime_), - _shared_field_writer(shared_field_writer_) + _reactionTime(reactionTime_) { } @@ -60,13 +58,12 @@ ThreadingServiceConfig::make(const ProtonConfig &cfg, double concurrency, const cfg.indexing.tasklimit, selectOptimization(cfg.indexing.optimize), cfg.indexing.kindOfWatermark, - vespalib::from_s(cfg.indexing.reactiontime), - cfg.feeding.sharedFieldWriterExecutor); + vespalib::from_s(cfg.indexing.reactiontime)); } ThreadingServiceConfig -ThreadingServiceConfig::make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_) { - return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms, shared_field_writer_); +ThreadingServiceConfig::make(uint32_t indexingThreads) { + return ThreadingServiceConfig(indexingThreads, 0, 100, OptimizeFor::LATENCY, 0, 10ms); } void @@ -85,8 +82,7 @@ ThreadingServiceConfig::operator==(const ThreadingServiceConfig &rhs) const _is_task_limit_hard == rhs._is_task_limit_hard && _optimize == rhs._optimize && _kindOfWatermark == rhs._kindOfWatermark && - _reactionTime == rhs._reactionTime && - _shared_field_writer == rhs._shared_field_writer; + _reactionTime == rhs._reactionTime; } } diff --git a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h index a54c0674263..8c63340a62d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h +++ b/searchcore/src/vespa/searchcore/proton/server/threading_service_config.h @@ -17,7 +17,6 @@ class ThreadingServiceConfig { public: using ProtonConfig = const vespa::config::search::core::internal::InternalProtonType; using OptimizeFor = vespalib::Executor::OptimizeFor; - using SharedFieldWriterExecutor = ProtonConfig::Feeding::SharedFieldWriterExecutor; private: uint32_t _indexingThreads; @@ -27,16 +26,14 @@ private: OptimizeFor _optimize; uint32_t _kindOfWatermark; vespalib::duration _reactionTime; // Maximum reaction time to new tasks - SharedFieldWriterExecutor _shared_field_writer; private: ThreadingServiceConfig(uint32_t indexingThreads_, uint32_t master_task_limit_, int32_t defaultTaskLimit_, - OptimizeFor optimize_, uint32_t kindOfWatermark_, - vespalib::duration reactionTime_, SharedFieldWriterExecutor shared_field_writer_); + OptimizeFor optimize_, uint32_t kindOfWatermark_, vespalib::duration reactionTime_); public: static ThreadingServiceConfig make(const ProtonConfig &cfg, double concurrency, const HwInfo::Cpu &cpuInfo); - static ThreadingServiceConfig make(uint32_t indexingThreads, SharedFieldWriterExecutor shared_field_writer_ = SharedFieldWriterExecutor::NONE); + static ThreadingServiceConfig make(uint32_t indexingThreads); void update(const ThreadingServiceConfig& cfg); uint32_t indexingThreads() const { return _indexingThreads; } uint32_t master_task_limit() const { return _master_task_limit; } @@ -45,7 +42,6 @@ public: OptimizeFor optimize() const { return _optimize; } uint32_t kindOfwatermark() const { return _kindOfWatermark; } vespalib::duration reactionTime() const { return _reactionTime; } - SharedFieldWriterExecutor shared_field_writer() const { return _shared_field_writer; } bool operator==(const ThreadingServiceConfig &rhs) const; }; diff --git a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h index 00ffdc92020..e92d4362f53 100644 --- a/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h +++ b/searchcore/src/vespa/searchcore/proton/test/mock_shared_threading_service.h @@ -26,7 +26,7 @@ public: ~MockSharedThreadingService() override; ThreadExecutor& warmup() override { return _warmup; } ThreadExecutor& shared() override { return _shared; } - vespalib::ISequencedTaskExecutor* field_writer() override { return _field_writer.get(); } + vespalib::ISequencedTaskExecutor& field_writer() override { return *_field_writer; } vespalib::InvokeService & invokeService() override { return _invokeService; } FNET_Transport & transport() override { return _transport.transport(); } storage::spi::BucketExecutor& bucket_executor() override { return _bucket_executor; } diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp index 0731a3429b1..80e2622fa3b 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.cpp @@ -1,12 +1,13 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "transport_helper.h" -#include <vespa/fnet/transport.h> #include <vespa/fastos/thread.h> +#include <vespa/fnet/transport.h> +#include <vespa/searchcore/proton/server/executorthreadingservice.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/vespalib/util/size_literals.h> -#include <vespa/vespalib/util/threadstackexecutor.h> #include <vespa/vespalib/util/testclock.h> -#include <vespa/searchcore/proton/server/executorthreadingservice.h> +#include <vespa/vespalib/util/threadstackexecutor.h> namespace proton { @@ -31,9 +32,12 @@ Transport::shutdown() { _transport->ShutDown(true); } +VESPA_THREAD_STACK_TAG(proton_transport_and_executor_field_writer) + TransportAndExecutor::TransportAndExecutor(size_t num_threads) : Transport(), - _sharedExecutor(std::make_unique<vespalib::ThreadStackExecutor>(num_threads, 64_Ki)) + _sharedExecutor(std::make_unique<vespalib::ThreadStackExecutor>(num_threads, 64_Ki)), + _field_writer(vespalib::SequencedTaskExecutor::create(proton_transport_and_executor_field_writer, num_threads)) {} TransportAndExecutor::~TransportAndExecutor() = default; @@ -45,8 +49,9 @@ TransportAndExecutor::shutdown() { TransportAndExecutorService::TransportAndExecutorService(size_t num_threads) : TransportAndExecutor(num_threads), - _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport(), clock())) + _writeService(std::make_unique<ExecutorThreadingService>(shared(), transport(), clock(), field_writer())) {} + TransportAndExecutorService::~TransportAndExecutorService() = default; searchcorespi::index::IThreadingService & diff --git a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h index 8ec4f50e3f0..46ca8131041 100644 --- a/searchcore/src/vespa/searchcore/proton/test/transport_helper.h +++ b/searchcore/src/vespa/searchcore/proton/test/transport_helper.h @@ -33,9 +33,12 @@ public: TransportAndExecutor(size_t num_threads); ~TransportAndExecutor() override; vespalib::Executor & shared() { return *_sharedExecutor; } + vespalib::ISequencedTaskExecutor& field_writer() { return *_field_writer; } void shutdown() override; private: std::unique_ptr<vespalib::Executor> _sharedExecutor; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _field_writer; + }; class TransportAndExecutorService : public TransportAndExecutor { |