diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-27 16:43:55 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-02-27 16:49:17 +0000 |
commit | 752ced912014457c04d2c3670fd7d8b9eda49fa7 (patch) | |
tree | 404a21f60b4df37a1d4e718096eed971bf7522bc | |
parent | dda41637e506d6f0fdf88f875350adab2743bd1d (diff) |
Add getStats and setTaskLimit to interface to make it easy to swap implementation.
Also make do with ISequenceHandlerInterface.
35 files changed, 312 insertions, 272 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp index 420e18db5af..d2eba28bc9b 100644 --- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp +++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp @@ -24,13 +24,13 @@ createAttribute() struct Fixture { AttributeVector::SP attribute; - SequencedTaskExecutor writer; + std::unique_ptr<ISequencedTaskExecutor> writer; ExclusiveAttributeReadAccessor accessor; Fixture() : attribute(createAttribute()), - writer(1), - accessor(attribute, writer) + writer(SequencedTaskExecutor::create(1)), + accessor(attribute, *writer) {} }; @@ -38,7 +38,7 @@ TEST_F("require that attribute write thread is blocked while guard is held", Fix { ReadGuard::UP guard = f.accessor.takeGuard(); Gate gate; - f.writer.execute(f.writer.getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); }); + f.writer->execute(f.writer->getExecutorId(f.attribute->getNamePrefix()), [&gate]() { gate.countDown(); }); bool reachedZero = gate.await(100); EXPECT_FALSE(reachedZero); EXPECT_EQUAL(1u, gate.getCount()); diff --git a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp index f380ce03152..6a6b05be7f0 100644 --- a/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedview/feedview_test.cpp @@ -692,15 +692,15 @@ struct FixtureBase FixtureBase::FixtureBase(vespalib::duration visibilityDelay) : _tracer(), sc(), - iw(new MyIndexWriter(_tracer)), - sa(new MySummaryAdapter(*sc._builder->getDocumentTypeRepo())), - aw(new MyAttributeWriter(_tracer)), + iw(std::make_shared<MyIndexWriter>(_tracer)), + sa(std::make_shared<MySummaryAdapter>(*sc._builder->getDocumentTypeRepo())), + aw(std::make_shared<MyAttributeWriter>(_tracer)), miw(static_cast<MyIndexWriter&>(*iw)), msa(static_cast<MySummaryAdapter&>(*sa)), maw(static_cast<MyAttributeWriter&>(*aw)), _docIdLimit(0u), - _dmscReal(new DocumentMetaStoreContext(std::make_shared<BucketDBOwner>())), - _dmsc(new test::DocumentMetaStoreContextObserver(*_dmscReal)), + _dmscReal(std::make_shared<DocumentMetaStoreContext>(std::make_shared<BucketDBOwner>())), + _dmsc(std::make_shared<test::DocumentMetaStoreContextObserver>(*_dmscReal)), pc(sc._builder->getDocumentType().getName(), "fileconfig_test"), _sharedExecutor(1, 0x10000), _writeServiceReal(_sharedExecutor), diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt index 0d226268d1a..49f929e0127 100644 --- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt +++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/CMakeLists.txt @@ -5,5 +5,6 @@ vespa_add_executable(searchcore_lidreusedelayer_test_app TEST DEPENDS searchcore_server searchcore_documentmetastore + searchcore_test ) vespa_add_test(NAME searchcore_lidreusedelayer_test_app COMMAND searchcore_lidreusedelayer_test_app) diff --git a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp index d305751f7c2..25668f56753 100644 --- a/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp +++ b/searchcore/src/tests/proton/documentmetastore/lidreusedelayer/lidreusedelayer_test.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/log/log.h> -LOG_SETUP("lidreusedelayer_test"); + #include <vespa/vespalib/testkit/testapp.h> #include <vespa/searchcore/proton/documentmetastore/i_store.h> #include <vespa/searchcore/proton/documentmetastore/lidreusedelayer.h> @@ -9,12 +8,14 @@ LOG_SETUP("lidreusedelayer_test"); #include <vespa/searchcore/proton/test/threading_service_observer.h> #include <vespa/vespalib/util/lambdatask.h> +#include <vespa/log/log.h> +LOG_SETUP("lidreusedelayer_test"); + using vespalib::makeLambdaTask; namespace proton { -namespace -{ +namespace { bool assertThreadObserver(uint32_t masterExecuteCnt, @@ -55,69 +56,52 @@ public: { } - virtual ~MyMetaStore() { } + ~MyMetaStore() override = default; - virtual Result inspectExisting(const GlobalId &) const override - { + Result inspectExisting(const GlobalId &) const override { return Result(); } - virtual Result inspect(const GlobalId &) override - { + Result inspect(const GlobalId &) override { return Result(); } - virtual Result put(const GlobalId &, const BucketId &, const Timestamp &, - uint32_t, DocId) override - { + Result put(const GlobalId &, const BucketId &, const Timestamp &, uint32_t, DocId) override { return Result(); } - virtual bool updateMetaData(DocId, const BucketId &, - const Timestamp &) override - { + bool updateMetaData(DocId, const BucketId &, const Timestamp &) override { return true; } - virtual bool remove(DocId) override - { + bool remove(DocId) override { return true; } - virtual void removeComplete(DocId) override - { + void removeComplete(DocId) override { ++_removeCompleteCount; ++_removeCompleteLids; } - virtual void move(DocId, DocId) override - { + void move(DocId, DocId) override { } - virtual bool validLid(DocId) const override - { + bool validLid(DocId) const override { return true; } - virtual void removeBatch(const std::vector<DocId> &, - const DocId) override - { - } + void removeBatch(const std::vector<DocId> &, const DocId) override {} - virtual void - removeBatchComplete(const std::vector<DocId> &lidsToRemove) override - { + void removeBatchComplete(const std::vector<DocId> &lidsToRemove) override{ ++_removeBatchCompleteCount; _removeCompleteLids += lidsToRemove.size(); } - virtual const RawDocumentMetaData &getRawMetaData(DocId) const override - { + const RawDocumentMetaData &getRawMetaData(DocId) const override { LOG_ABORT("should not be reached"); } - virtual bool getFreeListActive() const override - { + bool getFreeListActive() const override { return _freeListActive; } diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp index ca45108b698..991cb2e519d 100644 --- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp +++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp @@ -149,9 +149,9 @@ void Test::testSearch(Searchable &source, void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { Schema schema = getSchema(); vespalib::ThreadStackExecutor sharedExecutor(2, 0x10000); - search::SequencedTaskExecutor indexFieldInverter(2); - search::SequencedTaskExecutor indexFieldWriter(2); - MemoryIndex memory_index(schema, MockFieldLengthInspector(), indexFieldInverter, indexFieldWriter); + auto indexFieldInverter = search::SequencedTaskExecutor::create(2); + auto indexFieldWriter = search::SequencedTaskExecutor::create(2); + MemoryIndex memory_index(schema, MockFieldLengthInspector(), *indexFieldInverter, *indexFieldWriter); DocBuilder doc_builder(schema); Document::UP doc = buildDocument(doc_builder, doc_id1, word1); @@ -160,7 +160,7 @@ void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { doc = buildDocument(doc_builder, doc_id2, word2); memory_index.insertDocument(doc_id2, *doc.get()); memory_index.commit(std::shared_ptr<search::IDestructorCallback>()); - indexFieldWriter.sync(); + indexFieldWriter->sync(); testSearch(memory_index, word1, doc_id1); testSearch(memory_index, word2, doc_id2); diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 7542ef24c52..264cf6d8cfa 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -379,10 +379,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) { Schema schema(getSchema()); FieldIndexCollection fic(schema, MockFieldLengthInspector()); - SequencedTaskExecutor invertThreads(2); - SequencedTaskExecutor pushThreads(2); - search::memoryindex::DocumentInverter inverter(schema, invertThreads, - pushThreads, fic); + auto invertThreads = SequencedTaskExecutor::create(2); + auto pushThreads = SequencedTaskExecutor::create(2); + search::memoryindex::DocumentInverter inverter(schema, *invertThreads, *pushThreads, fic); uint64_t fixed_index_size = fic.getMemoryUsage().allocatedBytes(); uint64_t index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size; @@ -395,9 +394,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) Document::UP doc = addDocument(docid); inverter.invertDocument(docid, *doc); - invertThreads.sync(); + invertThreads->sync(); inverter.pushDocuments(std::shared_ptr<search::IDestructorCallback>()); - pushThreads.sync(); + pushThreads->sync(); index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size; /// Must account for both docid 0 being reserved and the extra after. @@ -414,9 +413,9 @@ TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) inverter.invertDocument(docid + 10, *doc); doc = addDocument(docid + 100); inverter.invertDocument(docid + 100, *doc); - invertThreads.sync(); + invertThreads->sync(); inverter.pushDocuments(std::shared_ptr<search::IDestructorCallback>()); - pushThreads.sync(); + pushThreads->sync(); index_size = fic.getMemoryUsage().allocatedBytes() - fixed_index_size; /// Must account for both docid 0 being reserved and the extra after. selector_size = (docid + 100 + 1) * sizeof(Source); diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp index b77d9b3f5ab..02e9f3e8687 100644 --- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp +++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp @@ -42,7 +42,7 @@ const ReferenceAttribute *getReferenceAttribute(const IGidToLidChangeListener &l struct MyGidToLidMapperFactory : public IGidToLidMapperFactory { using SP = std::shared_ptr<MyGidToLidMapperFactory>; - virtual std::unique_ptr<IGidToLidMapper> getMapper() const override { + std::unique_ptr<IGidToLidMapper> getMapper() const override { return std::unique_ptr<IGidToLidMapper>(); } }; @@ -60,14 +60,14 @@ struct MyDocumentDBReference : public MockDocumentDBReference { MyDocumentDBReference(MyGidToLidMapperFactory::SP factory_, std::shared_ptr<MockGidToLidChangeHandler> gidToLidChangeHandler) - : factory(factory_), + : factory(std::move(factory_)), _gidToLidChangeHandler(std::move(gidToLidChangeHandler)) { } - virtual IGidToLidMapperFactory::SP getGidToLidMapperFactory() override { + IGidToLidMapperFactory::SP getGidToLidMapperFactory() override { return factory; } - virtual std::shared_ptr<search::attribute::ReadableAttributeVector> getAttribute(vespalib::stringref name) override { + std::shared_ptr<search::attribute::ReadableAttributeVector> getAttribute(vespalib::stringref name) override { auto itr = attributes.find(name); if (itr != attributes.end()) { return itr->second; @@ -78,7 +78,7 @@ struct MyDocumentDBReference : public MockDocumentDBReference { void addIntAttribute(vespalib::stringref name) { attributes[name] = AttributeFactory::createAttribute(name, Config(BasicType::INT32)); } - virtual std::unique_ptr<GidToLidChangeRegistrator> makeGidToLidChangeRegistrator(const vespalib::string &docTypeName) override { + std::unique_ptr<GidToLidChangeRegistrator> makeGidToLidChangeRegistrator(const vespalib::string &docTypeName) override { return std::make_unique<GidToLidChangeRegistrator>(_gidToLidChangeHandler, docTypeName); } @@ -93,12 +93,12 @@ struct MyDocumentDBReference : public MockDocumentDBReference { struct MyReferenceRegistry : public IDocumentDBReferenceRegistry { using ReferenceMap = std::map<vespalib::string, IDocumentDBReference::SP>; ReferenceMap map; - virtual IDocumentDBReference::SP get(vespalib::stringref name) const override { + IDocumentDBReference::SP get(vespalib::stringref name) const override { auto itr = map.find(name); ASSERT_TRUE(itr != map.end()); return itr->second; } - virtual IDocumentDBReference::SP tryGet(vespalib::stringref name) const override { + IDocumentDBReference::SP tryGet(vespalib::stringref name) const override { auto itr = map.find(name); if (itr != map.end()) { return itr->second; @@ -106,10 +106,10 @@ struct MyReferenceRegistry : public IDocumentDBReferenceRegistry { return IDocumentDBReference::SP(); } } - virtual void add(vespalib::stringref name, IDocumentDBReference::SP reference) override { + void add(vespalib::stringref name, IDocumentDBReference::SP reference) override { map[name] = reference; } - virtual void remove(vespalib::stringref) override {} + void remove(vespalib::stringref) override {} }; struct MyAttributeManager : public MockAttributeManager { @@ -121,7 +121,7 @@ struct MyAttributeManager : public MockAttributeManager { } const ReferenceAttribute *getReferenceAttribute(const vespalib::string &name) const { AttributeGuard::UP guard = getAttribute(name); - const ReferenceAttribute *result = dynamic_cast<const ReferenceAttribute *>(guard->get()); + auto *result = dynamic_cast<const ReferenceAttribute *>(guard->get()); ASSERT_TRUE(result != nullptr); return result; } @@ -155,8 +155,7 @@ struct DocumentModel { } }; -DocumentModel::~DocumentModel() { -} +DocumentModel::~DocumentModel() = default; void set(const vespalib::string &name, @@ -182,7 +181,7 @@ createImportedFieldsConfig() const ImportedAttributeVector & asImportedAttribute(const IAttributeVector &attr) { - const ImportedAttributeVector *result = dynamic_cast<const ImportedAttributeVector *>(&attr); + auto *result = dynamic_cast<const ImportedAttributeVector *>(&attr); ASSERT_TRUE(result != nullptr); return *result; } @@ -199,7 +198,7 @@ struct Fixture { MyAttributeManager oldAttrMgr; DocumentModel docModel; ImportedFieldsConfig importedFieldsCfg; - SequencedTaskExecutor _attributeFieldWriter; + std::unique_ptr<ISequencedTaskExecutor> _attributeFieldWriter; Fixture() : factory(std::make_shared<MyGidToLidMapperFactory>()), _gidToLidChangeListenerRefCount(), @@ -211,7 +210,7 @@ struct Fixture { attrMgr(), docModel(), importedFieldsCfg(createImportedFieldsConfig()), - _attributeFieldWriter(1) + _attributeFieldWriter(SequencedTaskExecutor::create(1)) { registry.add("parent", parentReference); @@ -231,7 +230,7 @@ struct Fixture { oldAttrMgr.addReferenceAttribute("parent3_ref"); } ImportedAttributesRepo::UP resolve(vespalib::duration visibilityDelay, bool useReferences) { - DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, _attributeFieldWriter, useReferences); + DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, *_attributeFieldWriter, useReferences); return resolver.resolve(attrMgr, oldAttrMgr, std::shared_ptr<search::IDocumentMetaStoreContext>(), visibilityDelay); } ImportedAttributesRepo::UP resolve(vespalib::duration visibilityDelay) { @@ -244,7 +243,7 @@ struct Fixture { return resolve(vespalib::duration::zero()); } void teardown() { - DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, _attributeFieldWriter, false); + DocumentDBReferenceResolver resolver(registry, docModel.childDocType, importedFieldsCfg, docModel.childDocType, _gidToLidChangeListenerRefCount, *_attributeFieldWriter, false); resolver.teardown(attrMgr); } const IGidToLidMapperFactory *getMapperFactoryPtr(const vespalib::string &attrName) { @@ -254,7 +253,7 @@ struct Fixture { const vespalib::string &referenceField, const vespalib::string &targetField, bool useSearchCache, - ImportedAttributeVector::SP attr) { + const ImportedAttributeVector::SP & attr) { ASSERT_TRUE(attr.get()); EXPECT_EQUAL(name, attr->getName()); EXPECT_EQUAL(attrMgr.getReferenceAttribute(referenceField), attr->getReferenceAttribute().get()); diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp index 159ce17ef45..a023ff175d6 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp @@ -49,13 +49,13 @@ struct MyGidToLidMapperFactory : public MockGidToLidMapperFactory struct Fixture { std::shared_ptr<ReferenceAttribute> _attr; - search::SequencedTaskExecutor _writer; + std::unique_ptr<search::ISequencedTaskExecutor> _writer; MonitoredRefCount _refCount; std::unique_ptr<GidToLidChangeListener> _listener; Fixture() : _attr(std::make_shared<ReferenceAttribute>("test", Config(BasicType::REFERENCE))), - _writer(1), + _writer(search::SequencedTaskExecutor::create(1)), _refCount(), _listener() { @@ -91,7 +91,7 @@ struct Fixture } void allocListener() { - _listener = std::make_unique<GidToLidChangeListener>(_writer, _attr, _refCount, "test", "testdoc"); + _listener = std::make_unique<GidToLidChangeListener>(*_writer, _attr, _refCount, "test", "testdoc"); } void notifyPutDone(const GlobalId &gid, uint32_t referencedDoc) { diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp index af8e16f657b..6ca385711b0 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.cpp @@ -1,11 +1,10 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "executor_thread_service.h" -#include <vespa/vespalib/util/closuretask.h> +#include <vespa/vespalib/util/lambdatask.h> #include <vespa/fastos/thread.h> -using vespalib::makeClosure; -using vespalib::makeTask; +using vespalib::makeLambdaTask; using vespalib::Executor; using vespalib::Gate; using vespalib::Runnable; @@ -32,7 +31,7 @@ std::unique_ptr<internal::ThreadId> getThreadId(ThreadStackExecutorBase &executor) { std::unique_ptr<internal::ThreadId> id = std::make_unique<internal::ThreadId>(); - executor.execute(makeTask(makeClosure(&sampleThreadId, &id->_id))); + executor.execute(makeLambdaTask([threadId=&id->_id] { sampleThreadId(threadId);})); executor.sync(); return id; } @@ -52,7 +51,7 @@ ExecutorThreadService::ExecutorThreadService(ThreadStackExecutorBase &executor) { } -ExecutorThreadService::~ExecutorThreadService() {} +ExecutorThreadService::~ExecutorThreadService() = default; void ExecutorThreadService::run(Runnable &runnable) @@ -61,7 +60,7 @@ ExecutorThreadService::run(Runnable &runnable) runnable.run(); } else { Gate gate; - _executor.execute(makeTask(makeClosure(&runRunnable, &runnable, &gate))); + _executor.execute(makeLambdaTask([runnablePtr=&runnable, gatePtr=&gate] { runRunnable(runnablePtr, gatePtr); })); gate.await(); } } @@ -73,4 +72,12 @@ ExecutorThreadService::isCurrentThread() const return FastOS_Thread::CompareThreadIds(_threadId->_id, currentThreadId); } +vespalib::ThreadExecutor::Stats ExecutorThreadService::getStats() { + return _executor.getStats(); +} + +void ExecutorThreadService::setTaskLimit(uint32_t taskLimit) { + _executor.setTaskLimit(taskLimit); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h index 521565358d9..ccdfb6b72cd 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h +++ b/searchcore/src/vespa/searchcore/proton/server/executor_thread_service.h @@ -21,9 +21,8 @@ public: ExecutorThreadService(vespalib::ThreadStackExecutorBase &executor); ~ExecutorThreadService(); - /** - * Implements IThreadService - */ + Stats getStats() override; + vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override { return _executor.execute(std::move(task)); } @@ -34,6 +33,8 @@ public: } bool isCurrentThread() const override; size_t getNumThreads() const override { return _executor.getNumThreads(); } + + void setTaskLimit(uint32_t taskLimit) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 262363f4250..058197f0271 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -19,9 +19,9 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::ThreadStackExecutor _masterService(_masterExecutor), _indexService(_indexExecutor), _summaryService(_summaryExecutor), - _indexFieldInverter(std::make_unique<SequencedTaskExecutor>(threads, taskLimit)), - _indexFieldWriter(std::make_unique<SequencedTaskExecutor>(threads, taskLimit)), - _attributeFieldWriter(std::make_unique<SequencedTaskExecutor>(threads, taskLimit)) + _indexFieldInverter(SequencedTaskExecutor::create(threads, taskLimit)), + _indexFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)), + _attributeFieldWriter(SequencedTaskExecutor::create(threads, taskLimit)) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h index 9ab6eba39e1..7bbe1cb162a 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.h @@ -6,7 +6,6 @@ #include <vespa/vespalib/util/blockingthreadstackexecutor.h> #include <vespa/vespalib/util/threadstackexecutor.h> -namespace search { class SequencedTaskExecutor; } namespace proton { class ExecutorThreadingServiceStats; @@ -18,16 +17,16 @@ class ExecutorThreadingServiceStats; class ExecutorThreadingService : public searchcorespi::index::IThreadingService { private: - vespalib::ThreadStackExecutorBase & _sharedExecutor; - vespalib::ThreadStackExecutor _masterExecutor; - vespalib::BlockingThreadStackExecutor _indexExecutor; - vespalib::BlockingThreadStackExecutor _summaryExecutor; - ExecutorThreadService _masterService; - ExecutorThreadService _indexService; - ExecutorThreadService _summaryService; - std::unique_ptr<search::SequencedTaskExecutor> _indexFieldInverter; - std::unique_ptr<search::SequencedTaskExecutor> _indexFieldWriter; - std::unique_ptr<search::SequencedTaskExecutor> _attributeFieldWriter; + vespalib::ThreadStackExecutorBase & _sharedExecutor; + vespalib::ThreadStackExecutor _masterExecutor; + vespalib::BlockingThreadStackExecutor _indexExecutor; + vespalib::BlockingThreadStackExecutor _summaryExecutor; + ExecutorThreadService _masterService; + ExecutorThreadService _indexService; + ExecutorThreadService _summaryService; + std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldInverter; + std::unique_ptr<search::ISequencedTaskExecutor> _indexFieldWriter; + std::unique_ptr<search::ISequencedTaskExecutor> _attributeFieldWriter; public: /** diff --git a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt index ad5ae54f6e1..488ac1041f1 100644 --- a/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/test/CMakeLists.txt @@ -8,6 +8,7 @@ vespa_add_library(searchcore_test STATIC documentdb_config_builder.cpp dummy_feed_view.cpp userdocumentsbuilder.cpp + threading_service_observer.cpp DEPENDS searchcore_fconfig ) diff --git a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h index c8d0f8968c9..766bdeeefb0 100644 --- a/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/thread_service_observer.h @@ -20,9 +20,6 @@ public: uint32_t getExecuteCnt() const { return _executeCnt; } - /** - * Implements IThreadService - */ vespalib::Executor::Task::UP execute(vespalib::Executor::Task::UP task) override { ++_executeCnt; return _service.execute(std::move(task)); @@ -39,6 +36,14 @@ public: } size_t getNumThreads() const override { return _service.getNumThreads(); } + Stats getStats() override { + return _service.getStats(); + } + + void setTaskLimit(uint32_t taskLimit) override { + _service.setTaskLimit(taskLimit); + } + }; } diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp new file mode 100644 index 00000000000..30a6334d764 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.cpp @@ -0,0 +1,22 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "threading_service_observer.h" + +namespace proton::test { + +ThreadingServiceObserver::ThreadingServiceObserver(searchcorespi::index::IThreadingService &service) + : _service(service), + _master(_service.master()), + _index(service.index()), + _summary(service.summary()), + _shared(service.shared()), + _indexFieldInverter(_service.indexFieldInverter()), + _indexFieldWriter(_service.indexFieldWriter()), + _attributeFieldWriter(_service.attributeFieldWriter()) +{ +} + +ThreadingServiceObserver::~ThreadingServiceObserver() = default; + +} + diff --git a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h index e1a4433ed4b..7ac9c0c68f2 100644 --- a/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h +++ b/searchcore/src/vespa/searchcore/proton/test/threading_service_observer.h @@ -21,18 +21,8 @@ private: search::SequencedTaskExecutorObserver _attributeFieldWriter; public: - ThreadingServiceObserver(searchcorespi::index::IThreadingService &service) - : _service(service), - _master(_service.master()), - _index(service.index()), - _summary(service.summary()), - _shared(service.shared()), - _indexFieldInverter(_service.indexFieldInverter()), - _indexFieldWriter(_service.indexFieldWriter()), - _attributeFieldWriter(_service.attributeFieldWriter()) - { - } - ~ThreadingServiceObserver() override { } + ThreadingServiceObserver(searchcorespi::index::IThreadingService &service); + ~ThreadingServiceObserver() override; const ThreadServiceObserver &masterObserver() const { return _master; } @@ -53,16 +43,10 @@ public: return _attributeFieldWriter; } - /** - * Implements vespalib::Syncable - */ vespalib::Syncable &sync() override { return _service.sync(); } - /** - * Implements IThreadingService - */ searchcorespi::index::IThreadService &master() override { return _master; } diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp index 60f3a6b7664..dc950b84508 100644 --- a/searchlib/src/apps/tests/memoryindexstress_test.cpp +++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp @@ -195,8 +195,8 @@ struct Fixture { Schema schema; DocumentTypeRepo repo; vespalib::ThreadStackExecutor _executor; - search::SequencedTaskExecutor _invertThreads; - search::SequencedTaskExecutor _pushThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads; MemoryIndex index; uint32_t _readThreads; vespalib::ThreadStackExecutor _writer; // 1 write thread @@ -247,9 +247,9 @@ Fixture::Fixture(uint32_t readThreads) : schema(makeSchema()), repo(makeDocTypeRepoConfig()), _executor(1, 128 * 1024), - _invertThreads(2), - _pushThreads(2), - index(schema, MockFieldLengthInspector(), _invertThreads, _pushThreads), + _invertThreads(search::SequencedTaskExecutor::create(2)), + _pushThreads(search::SequencedTaskExecutor::create(2)), + index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads), _readThreads(readThreads), _writer(1, 128 * 1024), _readers(readThreads, 128 * 1024), diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index a51becfbf13..b2b15ded274 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -16,9 +16,9 @@ int main(int argc, char *argv[]) { if (argc > 2) numThreads = atoi(argv[2]); - SequencedTaskExecutor executor(numThreads); + auto executor = SequencedTaskExecutor::create(numThreads); for (unsigned long tid(0); tid < numTasks; tid++) { - executor.executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; })); + executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; })); } return 0; } diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index fcc7fd7300d..c311a59a56c 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -17,9 +17,9 @@ namespace search::common { class Fixture { public: - SequencedTaskExecutor _threads; + std::unique_ptr<ISequencedTaskExecutor> _threads; - Fixture() : _threads(2) { } + Fixture() : _threads(SequencedTaskExecutor::create(2)) { } }; @@ -67,11 +67,11 @@ public: TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(1, [=]() { tv->modify(0, 42); }); + f._threads->execute(1, [=]() { tv->modify(0, 42); }); tv->wait(1); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -81,12 +81,12 @@ TEST_F("require that task with same component id are serialized", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(0, [=]() { tv->modify(14, 42); }); + f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(0, [=]() { tv->modify(14, 42); }); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -97,15 +97,15 @@ TEST_F("require that task with different component ids are not serialized", Fixt for (tryCnt = 0; tryCnt < 100; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(2, [=]() { tv->modify(14, 42); }); + f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(2, [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -119,12 +119,12 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [=]() { tv->modify(14, 42); }; - f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId("0"), test2); + f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorId("0"), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -137,15 +137,15 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); + f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -157,10 +157,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads.getExecutorId(altComponentId) == executorId0) { + if (f._threads->getExecutorId(altComponentId) == executorId0) { break; } } @@ -193,9 +193,9 @@ TEST_F("require that execute works with const lambda", Fixture) std::vector<int> res; const auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; - f._threads.execute(0, lambda); - f._threads.execute(0, lambda); - f._threads.sync(); + f._threads->execute(0, lambda); + f._threads->execute(0, lambda); + f._threads->sync(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -208,9 +208,9 @@ TEST_F("require that execute works with reference to lambda", Fixture) auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; auto &lambdaref = lambda; - f._threads.execute(0, lambdaref); - f._threads.execute(0, lambdaref); - f._threads.sync(); + f._threads->execute(0, lambdaref); + f._threads->execute(0, lambdaref); + f._threads->sync(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -222,28 +222,28 @@ TEST_F("require that executeLambda works", Fixture) std::vector<int> res; const auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; - f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); - f._threads.sync(); + f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); + f._threads->sync(); std::vector<int> exp({5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); } TEST("require that you get correct number of executors") { - SequencedTaskExecutor seven(7); - EXPECT_EQUAL(7u, seven.getNumExecutors()); + auto seven = SequencedTaskExecutor::create(7); + EXPECT_EQUAL(7u, seven->getNumExecutors()); } TEST("require that you distribute well") { - SequencedTaskExecutor seven(7); - EXPECT_EQUAL(7u, seven.getNumExecutors()); - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize()); + auto seven = SequencedTaskExecutor::create(7); + EXPECT_EQUAL(7u, seven->getNumExecutors()); + EXPECT_EQUAL(97u, seven->getComponentHashSize()); + EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize()); for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId()); + EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); } - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize()); + EXPECT_EQUAL(97u, seven->getComponentHashSize()); + EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); } } diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 1825c00ceda..92d0659d984 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -314,16 +314,16 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire addField("f4")); FieldIndexCollection fic(schema, MockFieldLengthInspector()); DocBuilder b(schema); - SequencedTaskExecutor invertThreads(2); - SequencedTaskExecutor pushThreads(2); - DocumentInverter inv(schema, invertThreads, pushThreads, fic); + auto invertThreads = SequencedTaskExecutor::create(2); + auto pushThreads = SequencedTaskExecutor::create(2); + DocumentInverter inv(schema, *invertThreads, *pushThreads, fic); Document::UP doc; doc = make_doc10(b); inv.invertDocument(10, *doc); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); b.startDocument("id:ns:searchdocument::11"). startIndexField("f3"). @@ -331,9 +331,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(11, *doc); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); b.startDocument("id:ns:searchdocument::12"). startIndexField("f3"). @@ -341,9 +341,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(12, *doc); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); IndexBuilder ib(schema); vespalib::string dump2dir = prefix + "dump2"; @@ -455,14 +455,14 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng uint32_t numDocs = 20; uint32_t numWords = 1000; DocBuilder b(_schema); - SequencedTaskExecutor invertThreads(2); - SequencedTaskExecutor pushThreads(2); - DocumentInverter inv(_schema, invertThreads, pushThreads, fic); + auto invertThreads = SequencedTaskExecutor::create(2); + auto pushThreads = SequencedTaskExecutor::create(2); + DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic); inv.invertDocument(10, *make_doc10(b)); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); IndexBuilder ib(_schema); TuneFileIndexing tuneFileIndexing; diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp index 3f798df3c05..60b21699406 100644 --- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp +++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp @@ -117,8 +117,8 @@ public: struct DocumentInverterTest : public ::testing::Test { Schema _schema; DocBuilder _b; - SequencedTaskExecutor _invertThreads; - SequencedTaskExecutor _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; WordStore _word_store; FieldIndexRemover _remover; test::OrderedFieldIndexInserter _inserter; @@ -138,26 +138,26 @@ struct DocumentInverterTest : public ::testing::Test { DocumentInverterTest() : _schema(makeSchema()), _b(_schema), - _invertThreads(2), - _pushThreads(2), + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)), _word_store(), _remover(_word_store), _inserter(), _calculator(), _fic(_remover, _inserter, _calculator), - _inv(_schema, _invertThreads, _pushThreads, _fic) + _inv(_schema, *_invertThreads, *_pushThreads, _fic) { } void pushDocuments() { - _invertThreads.sync(); + _invertThreads->sync(); uint32_t fieldId = 0; for (auto &inverter : _inv.getInverters()) { _inserter.setFieldId(fieldId); inverter->pushDocuments(); ++fieldId; } - _pushThreads.sync(); + _pushThreads->sync(); } }; diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index 512e1bd2051..c562c0cf29c 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -872,7 +872,7 @@ struct FieldIndexCollectionTypeTest : public ::testing::Test { fic(schema, MockFieldLengthInspector()) { } - Schema make_schema() { + static Schema make_schema() { Schema result; result.addIndexField(Schema::IndexField("normal", DataType::STRING)); Schema::IndexField interleaved("interleaved", DataType::STRING); @@ -902,17 +902,17 @@ public: Schema _schema; FieldIndexCollection _fic; DocBuilder _b; - SequencedTaskExecutor _invertThreads; - SequencedTaskExecutor _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; DocumentInverter _inv; InverterTest(const Schema& schema) : _schema(schema), _fic(_schema, MockFieldLengthInspector()), _b(_schema), - _invertThreads(2), - _pushThreads(2), - _inv(_schema, _invertThreads, _pushThreads, _fic) + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)), + _inv(_schema, *_invertThreads, *_pushThreads, _fic) { } NormalFieldIndex::PostingList::Iterator find(const vespalib::stringref word, uint32_t field_id) const { @@ -943,9 +943,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::20"); _b.startIndexField("f0"). @@ -953,9 +953,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(20, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::30"); _b.startIndexField("f0"). @@ -984,9 +984,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(30, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::40"); _b.startIndexField("f0"). @@ -995,9 +995,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(40, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::999"); _b.startIndexField("f0"). @@ -1025,12 +1025,12 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) doc = _b.endDocument(); for (uint32_t docId = 10000; docId < 20000; ++docId) { _inv.invertDocument(docId, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); } - _pushThreads.sync(); + _pushThreads->sync(); DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic); LOG(info, "Before feature compaction: allocElems=%zu, usedElems=%zu" @@ -1044,13 +1044,13 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) beforeStats._freeBuffers, beforeStats._activeBuffers, beforeStats._holdBuffers); - myCompactFeatures(_fic, _pushThreads); + myCompactFeatures(_fic, *_pushThreads); std::vector<std::unique_ptr<GenerationHandler::Guard>> guards; for (auto &fieldIndex : _fic.getFieldIndexes()) { guards.push_back(std::make_unique<GenerationHandler::Guard> (fieldIndex->takeGenerationGuard())); } - myCommit(_fic, _pushThreads); + myCommit(_fic, *_pushThreads); DataStoreBase::MemStats duringStats = getFeatureStoreMemStats(_fic); LOG(info, "During feature compaction: allocElems=%zu, usedElems=%zu" @@ -1065,7 +1065,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) duringStats._activeBuffers, duringStats._holdBuffers); guards.clear(); - myCommit(_fic, _pushThreads); + myCommit(_fic, *_pushThreads); DataStoreBase::MemStats afterStats = getFeatureStoreMemStats(_fic); LOG(info, "After feature compaction: allocElems=%zu, usedElems=%zu" @@ -1142,17 +1142,17 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo _b.startIndexField("f1").addStr("a").addStr("c").endField(); Document::UP doc1 = _b.endDocument(); _inv.invertDocument(1, *doc1.get()); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::2"); _b.startIndexField("f0").addStr("b").addStr("c").endField(); Document::UP doc2 = _b.endDocument(); _inv.invertDocument(2, *doc2.get()); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); EXPECT_TRUE(assertPostingList("[1]", find("a", 0))); EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0))); @@ -1160,8 +1160,8 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo EXPECT_TRUE(assertPostingList("[1]", find("a", 1))); EXPECT_TRUE(assertPostingList("[1]", find("c", 1))); - myremove(1, _inv, _invertThreads); - _pushThreads.sync(); + myremove(1, _inv, *_invertThreads); + _pushThreads->sync(); EXPECT_TRUE(assertPostingList("[]", find("a", 0))); EXPECT_TRUE(assertPostingList("[2]", find("b", 0))); @@ -1311,10 +1311,10 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); SimpleMatchData match_data; { @@ -1387,10 +1387,10 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); SimpleMatchData match_data; uint32_t fieldId = _schema.getIndexFieldId("f0"); @@ -1445,13 +1445,13 @@ TEST_F(FieldIndexCollectionTest, require_that_insert_tells_which_word_ref_that_w } struct RemoverTest : public FieldIndexCollectionTest { - SequencedTaskExecutor _invertThreads; - SequencedTaskExecutor _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; RemoverTest() : FieldIndexCollectionTest(), - _invertThreads(2), - _pushThreads(2) + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)) { } void assertPostingLists(const vespalib::string &e1, @@ -1462,9 +1462,9 @@ struct RemoverTest : public FieldIndexCollectionTest { EXPECT_TRUE(assertPostingList(e3, find("b", 1))); } void remove(uint32_t docId) { - DocumentInverter inv(schema, _invertThreads, _pushThreads, fic); - myremove(docId, inv, _invertThreads); - _pushThreads.sync(); + DocumentInverter inv(schema, *_invertThreads, *_pushThreads, fic); + myremove(docId, inv, *_invertThreads); + _pushThreads->sync(); EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover(). getStore().get(docId).valid()); } diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp index 4bb0f91659a..5032ed2c179 100644 --- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp +++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp @@ -64,8 +64,8 @@ struct MySetup : public IFieldLengthInspector { struct Index { Schema schema; vespalib::ThreadStackExecutor _executor; - search::SequencedTaskExecutor _invertThreads; - search::SequencedTaskExecutor _pushThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads; MemoryIndex index; DocBuilder builder; uint32_t docid; @@ -123,15 +123,15 @@ private: Index::Index(const MySetup &setup) : schema(setup.schema), _executor(1, 128 * 1024), - _invertThreads(2), - _pushThreads(2), - index(schema, setup, _invertThreads, _pushThreads), + _invertThreads(search::SequencedTaskExecutor::create(2)), + _pushThreads(search::SequencedTaskExecutor::create(2)), + index(schema, setup, *_invertThreads, *_pushThreads), builder(schema), docid(1), currentField() { } -Index::~Index() {} +Index::~Index() = default; //----------------------------------------------------------------------------- std::string toString(SearchIterator & search) diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp index 513684d3fd5..4c501defeea 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp @@ -14,7 +14,8 @@ ForegroundTaskExecutor::ForegroundTaskExecutor() } ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) - : ISequencedTaskExecutor(threads) + : ISequencedTaskExecutor(threads), + _accepted(0) { } @@ -25,6 +26,7 @@ ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP { assert(id.getId() < getNumExecutors()); task->run(); + _accepted++; } void @@ -32,4 +34,12 @@ ForegroundTaskExecutor::sync() { } +void ForegroundTaskExecutor::setTaskLimit(uint32_t) { + +} + +vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { + return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0); +} + } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h index 2074eda009b..0b604fb140d 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h @@ -2,7 +2,7 @@ #pragma once #include "isequencedtaskexecutor.h" -#include <vespa/vespalib/stllike/hash_map.h> +#include <atomic> namespace vespalib { class ThreadStackExecutorBase; } @@ -25,6 +25,12 @@ public: void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; + + void setTaskLimit(uint32_t taskLimit) override; + + vespalib::ExecutorStats getStats() override; +private: + std::atomic<uint64_t> _accepted; }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h index 109e8319148..55c26abc3d8 100644 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/executor_stats.h> #include <vespa/vespalib/stllike/string.h> #include <vespa/vespalib/util/lambdatask.h> #include <vector> @@ -67,6 +68,10 @@ public: */ virtual void sync() = 0; + virtual void setTaskLimit(uint32_t taskLimit) = 0; + + virtual vespalib::ExecutorStats getStats() = 0; + /** * Wrap lambda function into a task and schedule it to be run. * Caller must ensure that pointers and references are valid and diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 184ff55220f..30723a6eb2a 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -14,14 +14,15 @@ constexpr uint32_t stackSize = 128 * 1024; } -SequencedTaskExecutor::SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit) - : ISequencedTaskExecutor(threads), - _executors() +std::unique_ptr<ISequencedTaskExecutor> +SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit) { + auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>(); + executors->reserve(threads); for (uint32_t id = 0; id < threads; ++id) { - auto executor = std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit); - _executors.push_back(std::move(executor)); + executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); } + return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); } SequencedTaskExecutor::~SequencedTaskExecutor() @@ -29,10 +30,16 @@ SequencedTaskExecutor::~SequencedTaskExecutor() sync(); } +SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) + : ISequencedTaskExecutor(executors->size()), + _executors(std::move(executors)) +{ +} + void SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) { - for (const auto &executor : _executors) { + for (const auto &executor : *_executors) { executor->setTaskLimit(taskLimit); } } @@ -40,16 +47,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) void SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { - assert(id.getId() < _executors.size()); - vespalib::ThreadStackExecutorBase &executor(*_executors[id.getId()]); - auto rejectedTask = executor.execute(std::move(task)); + assert(id.getId() < _executors->size()); + auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task)); assert(!rejectedTask); } void SequencedTaskExecutor::sync() { - for (auto &executor : _executors) { + for (auto &executor : *_executors) { executor->sync(); } } @@ -58,7 +64,7 @@ SequencedTaskExecutor::Stats SequencedTaskExecutor::getStats() { Stats accumulatedStats; - for (auto &executor : _executors) { + for (auto &executor :* _executors) { accumulatedStats += executor->getStats(); } return accumulatedStats; diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h index 9337f393150..a29e3d5226c 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h @@ -6,7 +6,7 @@ namespace vespalib { struct ExecutorStats; - class BlockingThreadStackExecutor; + class SyncableThreadExecutor; } namespace search { @@ -18,17 +18,19 @@ namespace search { class SequencedTaskExecutor final : public ISequencedTaskExecutor { using Stats = vespalib::ExecutorStats; - std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors; + std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; + + SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); public: using ISequencedTaskExecutor::getExecutorId; - SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000); ~SequencedTaskExecutor(); - void setTaskLimit(uint32_t taskLimit); + void setTaskLimit(uint32_t taskLimit) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; - Stats getStats(); + Stats getStats() override; + static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000); }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp index 04504086520..e553e757c37 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp @@ -41,4 +41,12 @@ SequencedTaskExecutorObserver::getExecuteHistory() return _executeHistory; } +void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { + _executor.setTaskLimit(taskLimit); +} + +vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { + return _executor.getStats(); +} + } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h index b2de71f06b3..dadd4bf59cf 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h @@ -3,8 +3,6 @@ #include "isequencedtaskexecutor.h" #include <atomic> -#include <vector> -#include <mutex> namespace search { @@ -31,6 +29,10 @@ public: uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } std::vector<uint32_t> getExecuteHistory(); + + void setTaskLimit(uint32_t taskLimit) override; + + vespalib::ExecutorStats getStats() override; }; } // namespace search diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp index 33d6e46a244..b5cf16b4f80 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.cpp @@ -39,10 +39,4 @@ BlockingThreadStackExecutor::~BlockingThreadStackExecutor() cleanup(); } -void -BlockingThreadStackExecutor::setTaskLimit(uint32_t taskLimit) -{ - internalSetTaskLimit(taskLimit); -} - } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h index 73a142ae42c..df3f9408e0a 100644 --- a/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h +++ b/vespalib/src/vespa/vespalib/util/blockingthreadstackexecutor.h @@ -33,11 +33,6 @@ public: init_fun_t init_function); ~BlockingThreadStackExecutor(); - - /** - * Sets a new upper limit for accepted number of tasks. - */ - void setTaskLimit(uint32_t taskLimit); }; } // namespace vespalib diff --git a/vespalib/src/vespa/vespalib/util/threadexecutor.h b/vespalib/src/vespa/vespalib/util/threadexecutor.h index 2dcbb595bb3..202e516bc60 100644 --- a/vespalib/src/vespa/vespalib/util/threadexecutor.h +++ b/vespalib/src/vespa/vespalib/util/threadexecutor.h @@ -4,6 +4,7 @@ #include "executor.h" #include "syncable.h" +#include "executor_stats.h" namespace vespalib { @@ -11,10 +12,26 @@ class ThreadExecutor : public Executor { public: /** + * Internal stats that we want to observe externally. Note that + * all stats are reset each time they are observed. + **/ + using Stats = ExecutorStats; + /** * Get number of threads in the executor pool. * @return number of threads in the pool */ virtual size_t getNumThreads() const = 0; + + /** + * Observe and reset stats for this object. + * @return stats + **/ + virtual Stats getStats() = 0; + + /** + * Sets a new upper limit for accepted number of tasks. + */ + virtual void setTaskLimit(uint32_t taskLimit) = 0; }; /** diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp index e933477bfb9..9ab465841d6 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.cpp @@ -168,6 +168,12 @@ size_t ThreadStackExecutorBase::getNumThreads() const { } void +ThreadStackExecutorBase::setTaskLimit(uint32_t taskLimit) +{ + internalSetTaskLimit(taskLimit); +} + +void ThreadStackExecutorBase::internalSetTaskLimit(uint32_t taskLimit) { MonitorGuard monitor(_monitor); diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index 8718b04d2d3..2c0bc56d6df 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -8,10 +8,8 @@ #include "sync.h" #include "gate.h" #include "runnable.h" -#include <memory> #include <vector> #include <functional> -#include "executor_stats.h" class FastOS_ThreadPool; @@ -36,12 +34,6 @@ class ThreadStackExecutorBase : public SyncableThreadExecutor, public Runnable { public: - /** - * Internal stats that we want to observe externally. Note that - * all stats are reset each time they are observed. - **/ - using Stats = ExecutorStats; - using init_fun_t = std::function<int(Runnable&)>; private: @@ -204,14 +196,8 @@ public: **/ size_t num_idle_workers() const; - /** - * Observe and reset stats for this object. - * - * @return stats - **/ - Stats getStats(); + Stats getStats() override; - // inherited from Executor Task::UP execute(Task::UP task) override; /** @@ -232,6 +218,7 @@ public: void wait_for_task_count(uint32_t task_count); size_t getNumThreads() const override; + void setTaskLimit(uint32_t taskLimit) override; /** * Shut down this executor. This will make this executor reject |