diff options
Diffstat (limited to 'searchlib')
14 files changed, 165 insertions, 126 deletions
diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp index 60f3a6b7664..dc950b84508 100644 --- a/searchlib/src/apps/tests/memoryindexstress_test.cpp +++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp @@ -195,8 +195,8 @@ struct Fixture { Schema schema; DocumentTypeRepo repo; vespalib::ThreadStackExecutor _executor; - search::SequencedTaskExecutor _invertThreads; - search::SequencedTaskExecutor _pushThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads; MemoryIndex index; uint32_t _readThreads; vespalib::ThreadStackExecutor _writer; // 1 write thread @@ -247,9 +247,9 @@ Fixture::Fixture(uint32_t readThreads) : schema(makeSchema()), repo(makeDocTypeRepoConfig()), _executor(1, 128 * 1024), - _invertThreads(2), - _pushThreads(2), - index(schema, MockFieldLengthInspector(), _invertThreads, _pushThreads), + _invertThreads(search::SequencedTaskExecutor::create(2)), + _pushThreads(search::SequencedTaskExecutor::create(2)), + index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads), _readThreads(readThreads), _writer(1, 128 * 1024), _readers(readThreads, 128 * 1024), diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index a51becfbf13..b2b15ded274 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -16,9 +16,9 @@ int main(int argc, char *argv[]) { if (argc > 2) numThreads = atoi(argv[2]); - SequencedTaskExecutor executor(numThreads); + auto executor = SequencedTaskExecutor::create(numThreads); for (unsigned long tid(0); tid < numTasks; tid++) { - executor.executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; })); + executor->executeTask(ExecutorId(tid%numThreads), vespalib::makeLambdaTask([&counter] { counter++; })); } return 0; } diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index fcc7fd7300d..c311a59a56c 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -17,9 +17,9 @@ namespace search::common { class Fixture { public: - SequencedTaskExecutor _threads; + std::unique_ptr<ISequencedTaskExecutor> _threads; - Fixture() : _threads(2) { } + Fixture() : _threads(SequencedTaskExecutor::create(2)) { } }; @@ -67,11 +67,11 @@ public: TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(1, [=]() { tv->modify(0, 42); }); + f._threads->execute(1, [=]() { tv->modify(0, 42); }); tv->wait(1); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -81,12 +81,12 @@ TEST_F("require that task with same component id are serialized", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(0, [=]() { tv->modify(14, 42); }); + f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(0, [=]() { tv->modify(14, 42); }); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -97,15 +97,15 @@ TEST_F("require that task with different component ids are not serialized", Fixt for (tryCnt = 0; tryCnt < 100; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(2, [=]() { tv->modify(14, 42); }); + f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(2, [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -119,12 +119,12 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [=]() { tv->modify(14, 42); }; - f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId("0"), test2); + f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorId("0"), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); } @@ -137,15 +137,15 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(f._threads.getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); + f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; } EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); + f._threads->sync(); EXPECT_EQUAL(1, tv->_fail); EXPECT_EQUAL(14, tv->_val); break; @@ -157,10 +157,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads.getExecutorId(altComponentId) == executorId0) { + if (f._threads->getExecutorId(altComponentId) == executorId0) { break; } } @@ -193,9 +193,9 @@ TEST_F("require that execute works with const lambda", Fixture) std::vector<int> res; const auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; - f._threads.execute(0, lambda); - f._threads.execute(0, lambda); - f._threads.sync(); + f._threads->execute(0, lambda); + f._threads->execute(0, lambda); + f._threads->sync(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -208,9 +208,9 @@ TEST_F("require that execute works with reference to lambda", Fixture) auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; auto &lambdaref = lambda; - f._threads.execute(0, lambdaref); - f._threads.execute(0, lambdaref); - f._threads.sync(); + f._threads->execute(0, lambdaref); + f._threads->execute(0, lambdaref); + f._threads->sync(); std::vector<int> exp({5, 4, 5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); @@ -222,28 +222,28 @@ TEST_F("require that executeLambda works", Fixture) std::vector<int> res; const auto lambda = [i, &res]() mutable { res.push_back(i--); res.push_back(i--); }; - f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); - f._threads.sync(); + f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); + f._threads->sync(); std::vector<int> exp({5, 4}); EXPECT_EQUAL(exp, res); EXPECT_EQUAL(5, i); } TEST("require that you get correct number of executors") { - SequencedTaskExecutor seven(7); - EXPECT_EQUAL(7u, seven.getNumExecutors()); + auto seven = SequencedTaskExecutor::create(7); + EXPECT_EQUAL(7u, seven->getNumExecutors()); } TEST("require that you distribute well") { - SequencedTaskExecutor seven(7); - EXPECT_EQUAL(7u, seven.getNumExecutors()); - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize()); + auto seven = SequencedTaskExecutor::create(7); + EXPECT_EQUAL(7u, seven->getNumExecutors()); + EXPECT_EQUAL(97u, seven->getComponentHashSize()); + EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize()); for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId()); + EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); } - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize()); + EXPECT_EQUAL(97u, seven->getComponentHashSize()); + EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); } } diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 1825c00ceda..92d0659d984 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -314,16 +314,16 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire addField("f4")); FieldIndexCollection fic(schema, MockFieldLengthInspector()); DocBuilder b(schema); - SequencedTaskExecutor invertThreads(2); - SequencedTaskExecutor pushThreads(2); - DocumentInverter inv(schema, invertThreads, pushThreads, fic); + auto invertThreads = SequencedTaskExecutor::create(2); + auto pushThreads = SequencedTaskExecutor::create(2); + DocumentInverter inv(schema, *invertThreads, *pushThreads, fic); Document::UP doc; doc = make_doc10(b); inv.invertDocument(10, *doc); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); b.startDocument("id:ns:searchdocument::11"). startIndexField("f3"). @@ -331,9 +331,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(11, *doc); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); b.startDocument("id:ns:searchdocument::12"). startIndexField("f3"). @@ -341,9 +341,9 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire endField(); doc = b.endDocument(); inv.invertDocument(12, *doc); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); IndexBuilder ib(schema); vespalib::string dump2dir = prefix + "dump2"; @@ -455,14 +455,14 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng uint32_t numDocs = 20; uint32_t numWords = 1000; DocBuilder b(_schema); - SequencedTaskExecutor invertThreads(2); - SequencedTaskExecutor pushThreads(2); - DocumentInverter inv(_schema, invertThreads, pushThreads, fic); + auto invertThreads = SequencedTaskExecutor::create(2); + auto pushThreads = SequencedTaskExecutor::create(2); + DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic); inv.invertDocument(10, *make_doc10(b)); - invertThreads.sync(); + invertThreads->sync(); myPushDocument(inv); - pushThreads.sync(); + pushThreads->sync(); IndexBuilder ib(_schema); TuneFileIndexing tuneFileIndexing; diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp index 3f798df3c05..60b21699406 100644 --- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp +++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp @@ -117,8 +117,8 @@ public: struct DocumentInverterTest : public ::testing::Test { Schema _schema; DocBuilder _b; - SequencedTaskExecutor _invertThreads; - SequencedTaskExecutor _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; WordStore _word_store; FieldIndexRemover _remover; test::OrderedFieldIndexInserter _inserter; @@ -138,26 +138,26 @@ struct DocumentInverterTest : public ::testing::Test { DocumentInverterTest() : _schema(makeSchema()), _b(_schema), - _invertThreads(2), - _pushThreads(2), + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)), _word_store(), _remover(_word_store), _inserter(), _calculator(), _fic(_remover, _inserter, _calculator), - _inv(_schema, _invertThreads, _pushThreads, _fic) + _inv(_schema, *_invertThreads, *_pushThreads, _fic) { } void pushDocuments() { - _invertThreads.sync(); + _invertThreads->sync(); uint32_t fieldId = 0; for (auto &inverter : _inv.getInverters()) { _inserter.setFieldId(fieldId); inverter->pushDocuments(); ++fieldId; } - _pushThreads.sync(); + _pushThreads->sync(); } }; diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index 512e1bd2051..c562c0cf29c 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -872,7 +872,7 @@ struct FieldIndexCollectionTypeTest : public ::testing::Test { fic(schema, MockFieldLengthInspector()) { } - Schema make_schema() { + static Schema make_schema() { Schema result; result.addIndexField(Schema::IndexField("normal", DataType::STRING)); Schema::IndexField interleaved("interleaved", DataType::STRING); @@ -902,17 +902,17 @@ public: Schema _schema; FieldIndexCollection _fic; DocBuilder _b; - SequencedTaskExecutor _invertThreads; - SequencedTaskExecutor _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; DocumentInverter _inv; InverterTest(const Schema& schema) : _schema(schema), _fic(_schema, MockFieldLengthInspector()), _b(_schema), - _invertThreads(2), - _pushThreads(2), - _inv(_schema, _invertThreads, _pushThreads, _fic) + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)), + _inv(_schema, *_invertThreads, *_pushThreads, _fic) { } NormalFieldIndex::PostingList::Iterator find(const vespalib::stringref word, uint32_t field_id) const { @@ -943,9 +943,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::20"); _b.startIndexField("f0"). @@ -953,9 +953,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(20, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::30"); _b.startIndexField("f0"). @@ -984,9 +984,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(30, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::40"); _b.startIndexField("f0"). @@ -995,9 +995,9 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(40, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::999"); _b.startIndexField("f0"). @@ -1025,12 +1025,12 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) doc = _b.endDocument(); for (uint32_t docId = 10000; docId < 20000; ++docId) { _inv.invertDocument(docId, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); } - _pushThreads.sync(); + _pushThreads->sync(); DataStoreBase::MemStats beforeStats = getFeatureStoreMemStats(_fic); LOG(info, "Before feature compaction: allocElems=%zu, usedElems=%zu" @@ -1044,13 +1044,13 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) beforeStats._freeBuffers, beforeStats._activeBuffers, beforeStats._holdBuffers); - myCompactFeatures(_fic, _pushThreads); + myCompactFeatures(_fic, *_pushThreads); std::vector<std::unique_ptr<GenerationHandler::Guard>> guards; for (auto &fieldIndex : _fic.getFieldIndexes()) { guards.push_back(std::make_unique<GenerationHandler::Guard> (fieldIndex->takeGenerationGuard())); } - myCommit(_fic, _pushThreads); + myCommit(_fic, *_pushThreads); DataStoreBase::MemStats duringStats = getFeatureStoreMemStats(_fic); LOG(info, "During feature compaction: allocElems=%zu, usedElems=%zu" @@ -1065,7 +1065,7 @@ TEST_F(BasicInverterTest, require_that_inversion_is_working) duringStats._activeBuffers, duringStats._holdBuffers); guards.clear(); - myCommit(_fic, _pushThreads); + myCommit(_fic, *_pushThreads); DataStoreBase::MemStats afterStats = getFeatureStoreMemStats(_fic); LOG(info, "After feature compaction: allocElems=%zu, usedElems=%zu" @@ -1142,17 +1142,17 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo _b.startIndexField("f1").addStr("a").addStr("c").endField(); Document::UP doc1 = _b.endDocument(); _inv.invertDocument(1, *doc1.get()); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); _b.startDocument("id:ns:searchdocument::2"); _b.startIndexField("f0").addStr("b").addStr("c").endField(); Document::UP doc2 = _b.endDocument(); _inv.invertDocument(2, *doc2.get()); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); EXPECT_TRUE(assertPostingList("[1]", find("a", 0))); EXPECT_TRUE(assertPostingList("[1,2]", find("b", 0))); @@ -1160,8 +1160,8 @@ TEST_F(BasicInverterTest, require_that_inverter_handles_remove_via_document_remo EXPECT_TRUE(assertPostingList("[1]", find("a", 1))); EXPECT_TRUE(assertPostingList("[1]", find("c", 1))); - myremove(1, _inv, _invertThreads); - _pushThreads.sync(); + myremove(1, _inv, *_invertThreads); + _pushThreads->sync(); EXPECT_TRUE(assertPostingList("[]", find("a", 0))); EXPECT_TRUE(assertPostingList("[2]", find("b", 0))); @@ -1311,10 +1311,10 @@ TEST_F(UriInverterTest, require_that_uri_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); SimpleMatchData match_data; { @@ -1387,10 +1387,10 @@ TEST_F(CjkInverterTest, require_that_cjk_indexing_is_working) endField(); doc = _b.endDocument(); _inv.invertDocument(10, *doc); - _invertThreads.sync(); + _invertThreads->sync(); myPushDocument(_inv); - _pushThreads.sync(); + _pushThreads->sync(); SimpleMatchData match_data; uint32_t fieldId = _schema.getIndexFieldId("f0"); @@ -1445,13 +1445,13 @@ TEST_F(FieldIndexCollectionTest, require_that_insert_tells_which_word_ref_that_w } struct RemoverTest : public FieldIndexCollectionTest { - SequencedTaskExecutor _invertThreads; - SequencedTaskExecutor _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; RemoverTest() : FieldIndexCollectionTest(), - _invertThreads(2), - _pushThreads(2) + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)) { } void assertPostingLists(const vespalib::string &e1, @@ -1462,9 +1462,9 @@ struct RemoverTest : public FieldIndexCollectionTest { EXPECT_TRUE(assertPostingList(e3, find("b", 1))); } void remove(uint32_t docId) { - DocumentInverter inv(schema, _invertThreads, _pushThreads, fic); - myremove(docId, inv, _invertThreads); - _pushThreads.sync(); + DocumentInverter inv(schema, *_invertThreads, *_pushThreads, fic); + myremove(docId, inv, *_invertThreads); + _pushThreads->sync(); EXPECT_FALSE(fic.getFieldIndex(0u)->getDocumentRemover(). getStore().get(docId).valid()); } diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp index 4bb0f91659a..5032ed2c179 100644 --- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp +++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp @@ -64,8 +64,8 @@ struct MySetup : public IFieldLengthInspector { struct Index { Schema schema; vespalib::ThreadStackExecutor _executor; - search::SequencedTaskExecutor _invertThreads; - search::SequencedTaskExecutor _pushThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads; MemoryIndex index; DocBuilder builder; uint32_t docid; @@ -123,15 +123,15 @@ private: Index::Index(const MySetup &setup) : schema(setup.schema), _executor(1, 128 * 1024), - _invertThreads(2), - _pushThreads(2), - index(schema, setup, _invertThreads, _pushThreads), + _invertThreads(search::SequencedTaskExecutor::create(2)), + _pushThreads(search::SequencedTaskExecutor::create(2)), + index(schema, setup, *_invertThreads, *_pushThreads), builder(schema), docid(1), currentField() { } -Index::~Index() {} +Index::~Index() = default; //----------------------------------------------------------------------------- std::string toString(SearchIterator & search) diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp index 513684d3fd5..4c501defeea 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp @@ -14,7 +14,8 @@ ForegroundTaskExecutor::ForegroundTaskExecutor() } ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) - : ISequencedTaskExecutor(threads) + : ISequencedTaskExecutor(threads), + _accepted(0) { } @@ -25,6 +26,7 @@ ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP { assert(id.getId() < getNumExecutors()); task->run(); + _accepted++; } void @@ -32,4 +34,12 @@ ForegroundTaskExecutor::sync() { } +void ForegroundTaskExecutor::setTaskLimit(uint32_t) { + +} + +vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { + return vespalib::ExecutorStats(0, _accepted.load(std::memory_order_relaxed), 0); +} + } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h index 2074eda009b..0b604fb140d 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h @@ -2,7 +2,7 @@ #pragma once #include "isequencedtaskexecutor.h" -#include <vespa/vespalib/stllike/hash_map.h> +#include <atomic> namespace vespalib { class ThreadStackExecutorBase; } @@ -25,6 +25,12 @@ public: void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; + + void setTaskLimit(uint32_t taskLimit) override; + + vespalib::ExecutorStats getStats() override; +private: + std::atomic<uint64_t> _accepted; }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h index 109e8319148..55c26abc3d8 100644 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h @@ -2,6 +2,7 @@ #pragma once #include <vespa/vespalib/util/executor.h> +#include <vespa/vespalib/util/executor_stats.h> #include <vespa/vespalib/stllike/string.h> #include <vespa/vespalib/util/lambdatask.h> #include <vector> @@ -67,6 +68,10 @@ public: */ virtual void sync() = 0; + virtual void setTaskLimit(uint32_t taskLimit) = 0; + + virtual vespalib::ExecutorStats getStats() = 0; + /** * Wrap lambda function into a task and schedule it to be run. * Caller must ensure that pointers and references are valid and diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 184ff55220f..30723a6eb2a 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -14,14 +14,15 @@ constexpr uint32_t stackSize = 128 * 1024; } -SequencedTaskExecutor::SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit) - : ISequencedTaskExecutor(threads), - _executors() +std::unique_ptr<ISequencedTaskExecutor> +SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit) { + auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>(); + executors->reserve(threads); for (uint32_t id = 0; id < threads; ++id) { - auto executor = std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit); - _executors.push_back(std::move(executor)); + executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); } + return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); } SequencedTaskExecutor::~SequencedTaskExecutor() @@ -29,10 +30,16 @@ SequencedTaskExecutor::~SequencedTaskExecutor() sync(); } +SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) + : ISequencedTaskExecutor(executors->size()), + _executors(std::move(executors)) +{ +} + void SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) { - for (const auto &executor : _executors) { + for (const auto &executor : *_executors) { executor->setTaskLimit(taskLimit); } } @@ -40,16 +47,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) void SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { - assert(id.getId() < _executors.size()); - vespalib::ThreadStackExecutorBase &executor(*_executors[id.getId()]); - auto rejectedTask = executor.execute(std::move(task)); + assert(id.getId() < _executors->size()); + auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task)); assert(!rejectedTask); } void SequencedTaskExecutor::sync() { - for (auto &executor : _executors) { + for (auto &executor : *_executors) { executor->sync(); } } @@ -58,7 +64,7 @@ SequencedTaskExecutor::Stats SequencedTaskExecutor::getStats() { Stats accumulatedStats; - for (auto &executor : _executors) { + for (auto &executor :* _executors) { accumulatedStats += executor->getStats(); } return accumulatedStats; diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h index 9337f393150..a29e3d5226c 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h @@ -6,7 +6,7 @@ namespace vespalib { struct ExecutorStats; - class BlockingThreadStackExecutor; + class SyncableThreadExecutor; } namespace search { @@ -18,17 +18,19 @@ namespace search { class SequencedTaskExecutor final : public ISequencedTaskExecutor { using Stats = vespalib::ExecutorStats; - std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors; + std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; + + SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); public: using ISequencedTaskExecutor::getExecutorId; - SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000); ~SequencedTaskExecutor(); - void setTaskLimit(uint32_t taskLimit); + void setTaskLimit(uint32_t taskLimit) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; - Stats getStats(); + Stats getStats() override; + static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000); }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp index 04504086520..e553e757c37 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp @@ -41,4 +41,12 @@ SequencedTaskExecutorObserver::getExecuteHistory() return _executeHistory; } +void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { + _executor.setTaskLimit(taskLimit); +} + +vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { + return _executor.getStats(); +} + } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h index b2de71f06b3..dadd4bf59cf 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h @@ -3,8 +3,6 @@ #include "isequencedtaskexecutor.h" #include <atomic> -#include <vector> -#include <mutex> namespace search { @@ -31,6 +29,10 @@ public: uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } std::vector<uint32_t> getExecuteHistory(); + + void setTaskLimit(uint32_t taskLimit) override; + + vespalib::ExecutorStats getStats() override; }; } // namespace search |