diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-08-31 16:32:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-08-31 16:32:42 +0200 |
commit | 46716a0e53ad64ec284228ba6c435330b878cc7f (patch) | |
tree | 2475f46aed46ca964b58fdea546c10e56829113c /searchcore | |
parent | 0cdcbd82624b78e47990f7b447217f0e758237a4 (diff) | |
parent | 1d5c8305e80887c3bd86e7e71952565f81243018 (diff) |
Merge pull request #28334 from vespa-engine/geirst/simplify-threading-service
Simplify IThreadingService as there only is one field writer executor.
Diffstat (limited to 'searchcore')
15 files changed, 38 insertions, 116 deletions
diff --git a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp index 4af98801e92..28ec0482074 100644 --- a/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp +++ b/searchcore/src/tests/proton/documentdb/configurer/configurer_test.cpp @@ -218,7 +218,7 @@ Fixture::initViewSet(ViewSet &views) TuneFileIndexManager(), TuneFileAttributes(), views._fileHeaderContext); auto attrMgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), views._fileHeaderContext, std::make_shared<search::attribute::Interlock>(), - views._service.write().attributeFieldWriter(), views._service.write().shared(), views._hwInfo); + views._service.write().field_writer(), views._service.write().shared(), views._hwInfo); auto summaryMgr = make_shared<SummaryManager> (_summaryExecutor, search::LogDocumentStore::Config(), search::GrowStrategy(), BASE_DIR, TuneFileSummary(), views._fileHeaderContext,views._noTlSyncer, search::IBucketizer::SP()); @@ -318,7 +318,7 @@ struct MyFastAccessFeedView StoreOnlyFeedView::PersistentParams params(1, 1, DocTypeName(DOC_TYPE), 0, SubDbType::NOTREADY); auto mgr = make_shared<AttributeManager>(BASE_DIR, "test.subdb", TuneFileAttributes(), _fileHeaderContext, std::make_shared<search::attribute::Interlock>(), - _writeService.attributeFieldWriter(), _writeService.shared(), _hwInfo); + _writeService.field_writer(), _writeService.shared(), _hwInfo); auto writer = std::make_shared<AttributeWriter>(mgr); FastAccessFeedView::Context fastUpdateCtx(writer, _docIdLimit); _feedView.set(std::make_shared<FastAccessFeedView>(std::move(storeOnlyCtx), params, fastUpdateCtx)); diff --git a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp index bc02f460b4e..9bba8c12ee7 100644 --- a/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp +++ b/searchcore/src/tests/proton/documentdb/executor_threading_service/executor_threading_service_test.cpp @@ -34,15 +34,6 @@ public: ThreadingServiceConfig::make())) { } - SequencedTaskExecutor* index_inverter() { - return to_concrete_type(service->indexFieldInverter()); - } - SequencedTaskExecutor* index_writer() { - return to_concrete_type(service->indexFieldWriter()); - } - SequencedTaskExecutor* attribute_writer() { - return to_concrete_type(service->attributeFieldWriter()); - } SequencedTaskExecutor* field_writer() { return to_concrete_type(*field_writer_executor); } @@ -57,9 +48,7 @@ assert_executor(SequencedTaskExecutor* exec, uint32_t exp_executors, uint32_t ex TEST_F(ExecutorThreadingServiceTest, shared_field_writer_specified_from_the_outside) { - EXPECT_EQ(field_writer(), index_inverter()); - EXPECT_EQ(field_writer(), index_writer()); - EXPECT_EQ(field_writer(), attribute_writer()); + EXPECT_EQ(field_writer(), &service->field_writer()); assert_executor(field_writer(), 3, 200); } @@ -69,9 +58,7 @@ TEST_F(ExecutorThreadingServiceTest, tasks_limits_can_be_updated) EXPECT_EQ(5, service->master_task_limit()); EXPECT_EQ(7, service->index().getTaskLimit()); EXPECT_EQ(11, service->summary().getTaskLimit()); - EXPECT_EQ(7, index_inverter()->first_executor()->getTaskLimit()); - EXPECT_EQ(7, index_writer()->first_executor()->getTaskLimit()); - EXPECT_EQ(7, attribute_writer()->first_executor()->getTaskLimit()); + EXPECT_EQ(7, field_writer()->first_executor()->getTaskLimit()); } GTEST_MAIN_RUN_ALL_TESTS() diff --git a/searchcore/src/tests/proton/index/fusionrunner_test.cpp b/searchcore/src/tests/proton/index/fusionrunner_test.cpp index 9052d024871..e57b1b6a61d 100644 --- a/searchcore/src/tests/proton/index/fusionrunner_test.cpp +++ b/searchcore/src/tests/proton/index/fusionrunner_test.cpp @@ -192,8 +192,8 @@ void Test::createIndex(const string &dir, uint32_t id, bool fusion) { DocBuilder doc_builder(add_fields); auto schema = SchemaBuilder(doc_builder).add_all_indexes().build(); MemoryIndex memory_index(schema, MockFieldLengthInspector(), - _service.write().indexFieldInverter(), - _service.write().indexFieldWriter()); + _service.write().field_writer(), + _service.write().field_writer()); addDocument(doc_builder, memory_index, *_selector, id, id + 0, term); addDocument(doc_builder, memory_index, *_selector, id, id + 1, "bar"); addDocument(doc_builder, memory_index, *_selector, id, id + 2, "baz"); diff --git a/searchcore/src/vespa/searchcore/proton/index/memoryindexwrapper.cpp b/searchcore/src/vespa/searchcore/proton/index/memoryindexwrapper.cpp index 219283dce04..e4c47d1deae 100644 --- a/searchcore/src/vespa/searchcore/proton/index/memoryindexwrapper.cpp +++ b/searchcore/src/vespa/searchcore/proton/index/memoryindexwrapper.cpp @@ -23,8 +23,8 @@ MemoryIndexWrapper::MemoryIndexWrapper(const search::index::Schema& schema, const TuneFileIndexing& tuneFileIndexing, searchcorespi::index::IThreadingService& threadingService, search::SerialNum serialNum) - : _index(schema, inspector, threadingService.indexFieldInverter(), - threadingService.indexFieldWriter()), + : _index(schema, inspector, threadingService.field_writer(), + threadingService.field_writer()), _serialNum(serialNum), _fileHeaderContext(fileHeaderContext), _tuneFileIndexing(tuneFileIndexing) diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_metrics.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_metrics.cpp index 030dba4d826..43b38c8d812 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_metrics.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_metrics.cpp @@ -24,9 +24,10 @@ ExecutorThreadingServiceMetrics::update(const ExecutorThreadingServiceStats &sta master.update(stats.getMasterExecutorStats()); index.update(stats.getIndexExecutorStats()); summary.update(stats.getSummaryExecutorStats()); - indexFieldInverter.update(stats.getIndexFieldInverterExecutorStats()); - indexFieldWriter.update(stats.getIndexFieldWriterExecutorStats()); - attributeFieldWriter.update(stats.getAttributeFieldWriterExecutorStats()); + vespalib::ExecutorStats empty_stats; + indexFieldInverter.update(empty_stats); + indexFieldWriter.update(empty_stats); + attributeFieldWriter.update(empty_stats); } } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.cpp b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.cpp index 63644e5c7ab..9d22344eb2b 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.cpp +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.cpp @@ -6,16 +6,10 @@ namespace proton { ExecutorThreadingServiceStats::ExecutorThreadingServiceStats(Stats masterExecutorStats, Stats indexExecutorStats, - Stats summaryExecutorStats, - Stats indexFieldInverterExecutorStats, - Stats indexFieldWriterExecutorStats, - Stats attributeFieldWriterExecutorStats) + Stats summaryExecutorStats) : _masterExecutorStats(masterExecutorStats), _indexExecutorStats(indexExecutorStats), - _summaryExecutorStats(summaryExecutorStats), - _indexFieldInverterExecutorStats(indexFieldInverterExecutorStats), - _indexFieldWriterExecutorStats(indexFieldWriterExecutorStats), - _attributeFieldWriterExecutorStats(attributeFieldWriterExecutorStats) + _summaryExecutorStats(summaryExecutorStats) { } diff --git a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h index 8015ec83ae9..121ca6038b6 100644 --- a/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h +++ b/searchcore/src/vespa/searchcore/proton/metrics/executor_threading_service_stats.h @@ -16,24 +16,15 @@ private: Stats _masterExecutorStats; Stats _indexExecutorStats; Stats _summaryExecutorStats; - Stats _indexFieldInverterExecutorStats; - Stats _indexFieldWriterExecutorStats; - Stats _attributeFieldWriterExecutorStats; public: ExecutorThreadingServiceStats(Stats masterExecutorStats, Stats indexExecutorStats, - Stats summaryExecutorStats, - Stats indexFieldInverterExecutorStats, - Stats indexFieldWriterExecutorStats, - Stats attributeFieldWriterExecutorStats); + Stats summaryExecutorStats); ~ExecutorThreadingServiceStats(); const Stats &getMasterExecutorStats() const { return _masterExecutorStats; } const Stats &getIndexExecutorStats() const { return _indexExecutorStats; } const Stats &getSummaryExecutorStats() const { return _summaryExecutorStats; } - const Stats &getIndexFieldInverterExecutorStats() const { return _indexFieldInverterExecutorStats; } - const Stats &getIndexFieldWriterExecutorStats() const { return _indexFieldWriterExecutorStats; } - const Stats &getAttributeFieldWriterExecutorStats() const { return _attributeFieldWriterExecutorStats; } }; } diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 3f28f75c521..b0d168fab03 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -413,7 +413,7 @@ DocumentDB::applySubDBConfig(const DocumentDBConfig &newConfigSnapshot, auto newDocType = newRepo->getDocumentType(_docTypeName.getName()); assert(newDocType != nullptr); DocumentDBReferenceResolver resolver(*registry, *newDocType, newConfigSnapshot.getImportedFieldsConfig(), *oldDocType, - _refCount, _writeService.attributeFieldWriter(), _state.getAllowReconfig()); + _refCount, _writeService.field_writer(), _state.getAllowReconfig()); _subDBs.applyConfig(newConfigSnapshot, *_activeConfigSnapshot, serialNum, params, resolver, prepared_reconfig); } @@ -535,7 +535,7 @@ DocumentDB::tearDownReferences() auto docType = repo->getDocumentType(_docTypeName.getName()); assert(docType != nullptr); DocumentDBReferenceResolver resolver(*registry, *docType, activeConfig->getImportedFieldsConfig(), *docType, - _refCount, _writeService.attributeFieldWriter(), false); + _refCount, _writeService.field_writer(), false); _subDBs.tearDownReferences(resolver); registry->remove(_docTypeName.getName()); } diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp index 9dfebaa825d..9bd60af81ca 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_threading_service_explorer.cpp @@ -24,9 +24,7 @@ ExecutorThreadingServiceExplorer::get_state(const vespalib::slime::Inserter& ins convert_executor_to_slime(&_service.master(), object.setObject("master")); convert_executor_to_slime(&_service.index(), object.setObject("index")); convert_executor_to_slime(&_service.summary(), object.setObject("summary")); - convert_executor_to_slime(&_service.indexFieldInverter(), object.setObject("index_field_inverter")); - convert_executor_to_slime(&_service.indexFieldWriter(), object.setObject("index_field_writer")); - convert_executor_to_slime(&_service.attributeFieldWriter(), object.setObject("attribute_field_writer")); + convert_executor_to_slime(&_service.field_writer(), object.setObject("field_writer")); } } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 4798ebd8fbc..01fc0ff3600 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -65,9 +65,7 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::Executor & sharedEx _summaryExecutor(createExecutorWithOneThread(cfg, CpuUsage::wrap(summary_executor, CpuUsage::Category::WRITE))), _masterService(_masterExecutor), _indexService(*_indexExecutor), - _index_field_inverter(field_writer), - _index_field_writer(field_writer), - _attribute_field_writer(field_writer), + _field_writer(field_writer), _invokeRegistrations() { if (cfg.optimize() == vespalib::Executor::OptimizeFor::THROUGHPUT && invokerService) { @@ -92,11 +90,11 @@ void ExecutorThreadingService::shutdown() { _masterExecutor.shutdown().sync(); - _attribute_field_writer.sync_all(); + _field_writer.sync_all(); _summaryExecutor->shutdown().sync(); _indexExecutor->shutdown().sync(); - _index_field_inverter.sync_all(); - _index_field_writer.sync_all(); + _field_writer.sync_all(); + _field_writer.sync_all(); } void @@ -107,10 +105,7 @@ ExecutorThreadingService::set_task_limits(uint32_t master_task_limit, _master_task_limit.store(master_task_limit, std::memory_order_release); _indexExecutor->setTaskLimit(field_task_limit); _summaryExecutor->setTaskLimit(summary_task_limit); - // TODO: Move this to a common place when the field writer is always shared. - _index_field_inverter.setTaskLimit(field_task_limit); - _index_field_writer.setTaskLimit(field_task_limit); - _attribute_field_writer.setTaskLimit(field_task_limit); + _field_writer.setTaskLimit(field_task_limit); } ExecutorThreadingServiceStats @@ -119,26 +114,13 @@ ExecutorThreadingService::getStats() auto master_stats = _masterExecutor.getStats(); auto index_stats = _indexExecutor->getStats(); auto summary_stats = _summaryExecutor->getStats(); - vespalib::ExecutorStats empty_stats; - // In this case the field writer stats are reported at a higher level. - return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats, - empty_stats, empty_stats, empty_stats); + return ExecutorThreadingServiceStats(master_stats, index_stats, summary_stats); } vespalib::ISequencedTaskExecutor & -ExecutorThreadingService::indexFieldInverter() { - return _index_field_inverter; +ExecutorThreadingService::field_writer() { + return _field_writer; } -vespalib::ISequencedTaskExecutor & -ExecutorThreadingService::indexFieldWriter() { - return _index_field_writer; } -vespalib::ISequencedTaskExecutor & -ExecutorThreadingService::attributeFieldWriter() { - return _attribute_field_writer; -} - -} // namespace proton - diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 9da4348c619..ad630c6b1e7 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -29,9 +29,7 @@ private: std::unique_ptr<vespalib::SyncableThreadExecutor> _summaryExecutor; SyncableExecutorThreadService _masterService; ExecutorThreadService _indexService; - vespalib::ISequencedTaskExecutor& _index_field_inverter; - vespalib::ISequencedTaskExecutor& _index_field_writer; - vespalib::ISequencedTaskExecutor& _attribute_field_writer; + vespalib::ISequencedTaskExecutor& _field_writer; std::vector<Registration> _invokeRegistrations; public: @@ -77,9 +75,7 @@ public: return _sharedExecutor; } - vespalib::ISequencedTaskExecutor &indexFieldInverter() override; - vespalib::ISequencedTaskExecutor &indexFieldWriter() override; - vespalib::ISequencedTaskExecutor &attributeFieldWriter() override; + vespalib::ISequencedTaskExecutor &field_writer() override; FNET_Transport &transport() override { return _transport; } const vespalib::Clock &clock() const override { return _clock; } ExecutorThreadingServiceStats getStats(); diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp index 8a3de22656b..47dd8c03706 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_doc_subdb.cpp @@ -70,7 +70,7 @@ FastAccessDocSubDB::createAttributeManagerInitializer(const DocumentDBConfig &co configSnapshot.getTuneFileDocumentDBSP()->_attr, _fileHeaderContext, _attribute_interlock, - _writeService.attributeFieldWriter(), + _writeService.field_writer(), _writeService.shared(), attrFactory, _hwInfo); diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp index 4112564632a..2790c0962b4 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp @@ -10,9 +10,7 @@ ThreadingServiceObserver::ThreadingServiceObserver(searchcorespi::index::IThread _index(service.index()), _summary(service.summary()), _shared(service.shared()), - _indexFieldInverter(_service.indexFieldInverter()), - _indexFieldWriter(_service.indexFieldWriter()), - _attributeFieldWriter(_service.attributeFieldWriter()) + _field_writer(_service.field_writer()) { } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index 16b466a2275..56b5fea293c 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -16,9 +16,7 @@ private: ThreadServiceObserver _index; ThreadExecutorObserver _summary; vespalib::Executor & _shared; - vespalib::SequencedTaskExecutorObserver _indexFieldInverter; - vespalib::SequencedTaskExecutorObserver _indexFieldWriter; - vespalib::SequencedTaskExecutorObserver _attributeFieldWriter; + vespalib::SequencedTaskExecutorObserver _field_writer; public: ThreadingServiceObserver(searchcorespi::index::IThreadingService &service); @@ -51,17 +49,9 @@ public: } FNET_Transport & transport() override { return _service.transport(); } const vespalib::Clock & clock() const override { return _service.clock(); } - vespalib::ISequencedTaskExecutor &indexFieldInverter() override { - return _indexFieldInverter; + vespalib::ISequencedTaskExecutor &field_writer() override { + return _field_writer; } - vespalib::ISequencedTaskExecutor &indexFieldWriter() override { - return _indexFieldWriter; - } - - vespalib::ISequencedTaskExecutor &attributeFieldWriter() override { - return _attributeFieldWriter; - } - }; } diff --git a/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h b/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h index c325d5ded11..4fce6f85a2b 100644 --- a/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h +++ b/searchcore/src/vespa/searchcorespi/index/ithreadingservice.h @@ -20,19 +20,19 @@ namespace searchcorespi::index { * * 2. The "index" write thread used for doing changes to the memory * index, either directly (for data not bound to a field) or via - * index field inverter executor or index field writer executor. + * field writer executor (index field inverter / index field writer). * * 3. The "summary" thread is used for doing changes to the document store. * - * 4. The "index field inverter" executor is used to populate field + * 4. The field writer executor ("index field inverter") is used to populate field * inverters with data from document fields. Scheduled tasks for * the same field are executed in sequence. * - * 5. The "index field writer" executor is used to sort data in field + * 5. The field writer executor ("index field writer") is used to sort data in field * inverters before pushing the data to the memory field indexes. * Scheduled tasks for the same field are executed in sequence. * - * 6. The "attribute field writer" executor is used to write data to attribute vectors. + * 6. The field writer executor ("attribute field writer") is used to write data to attribute vectors. * Each attribute is always handled by the same thread, * and scheduled tasks for the same attribute are executed in sequence. * @@ -47,19 +47,6 @@ namespace searchcorespi::index { * task to the index field inverter executor and the index field * writer executor. * - * The index field inverter executor and index field writer executor - * are separate to allow for double buffering, i.e. populate one set - * of field inverters using the index field inverter executor while - * another set of field inverters are handled by the index field - * writer executor. - * - * We might decide to allow index field inverter tasks to schedule - * tasks to the index field writer executor, so draining logic needs - * to sync index field inverter executor before syncing index field - * writer executor. - * - * TODO: * indexFieldInverter and indexFieldWriter can be collapsed to one. Both need sequencing, - * but they sequence on different things so efficiency will be the same and just depends on #threads */ struct IThreadingService { @@ -80,9 +67,7 @@ struct IThreadingService virtual vespalib::Executor &shared() = 0; virtual FNET_Transport &transport() = 0; virtual const vespalib::Clock &clock() const = 0; - virtual vespalib::ISequencedTaskExecutor &indexFieldInverter() = 0; - virtual vespalib::ISequencedTaskExecutor &indexFieldWriter() = 0; - virtual vespalib::ISequencedTaskExecutor &attributeFieldWriter() = 0; + virtual vespalib::ISequencedTaskExecutor &field_writer() = 0; }; } |