diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-12 09:26:11 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-11-12 09:26:11 +0000 |
commit | 434fca0b458910329f63da49b9b1c84de232bf3f (patch) | |
tree | 6060535c3c2b13e40b37867b21230fdbdc7c80ec | |
parent | de23b574462e6931e6afd0906257f0bd7673f1f8 (diff) |
Name the threads so it is easier to see who is doing what.
28 files changed, 138 insertions, 78 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp index 19af6a699c4..e284f5ed1d9 100644 --- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp +++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp @@ -13,6 +13,7 @@ using namespace search; using namespace vespalib; using ReadGuard = ExclusiveAttributeReadAccessor::Guard; +VESPA_THREAD_STACK_TAG(test_executor) AttributeVector::SP createAttribute() @@ -29,7 +30,7 @@ struct Fixture Fixture() : attribute(createAttribute()), - writer(SequencedTaskExecutor::create(1)), + writer(SequencedTaskExecutor::create(test_executor, 1)), accessor(attribute, *writer) {} }; diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp index 7af9d8d816c..74a2a14dbd6 100644 --- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp +++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp @@ -143,14 +143,16 @@ void Test::testSearch(Searchable &source, EXPECT_TRUE(search_iterator->isAtEnd()); } +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(write_executor) // Creates a memory index, inserts documents, performs a few // searches, dumps the index to disk, and performs the searches // again. void Test::requireThatMemoryIndexCanBeDumpedAndSearched() { Schema schema = getSchema(); vespalib::ThreadStackExecutor sharedExecutor(2, 0x10000); - auto indexFieldInverter = vespalib::SequencedTaskExecutor::create(2); - auto indexFieldWriter = vespalib::SequencedTaskExecutor::create(2); + auto indexFieldInverter = vespalib::SequencedTaskExecutor::create(invert_executor, 2); + auto indexFieldWriter = vespalib::SequencedTaskExecutor::create(write_executor, 2); MemoryIndex memory_index(schema, MockFieldLengthInspector(), *indexFieldInverter, *indexFieldWriter); DocBuilder doc_builder(schema); diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp index 3f088a497aa..89594821803 100644 --- a/searchcore/src/tests/proton/index/indexmanager_test.cpp +++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp @@ -374,12 +374,15 @@ TEST_F(IndexManagerTest, require_that_source_selector_is_flushed) ASSERT_TRUE(file.OpenReadOnlyExisting()); } +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(push_executor) + TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated) { Schema schema(getSchema()); FieldIndexCollection fic(schema, MockFieldLengthInspector()); - auto invertThreads = SequencedTaskExecutor::create(2); - auto pushThreads = SequencedTaskExecutor::create(2); + auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2); + auto pushThreads = SequencedTaskExecutor::create(push_executor, 2); search::memoryindex::DocumentInverter inverter(schema, *invertThreads, *pushThreads, fic); uint64_t fixed_index_size = fic.getMemoryUsage().allocatedBytes(); diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp index bacdd85dcc5..6e0efc9188d 100644 --- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp +++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp @@ -188,6 +188,8 @@ asImportedAttribute(const IAttributeVector &attr) return *result; } +VESPA_THREAD_STACK_TAG(attribute_executor) + struct Fixture { MyGidToLidMapperFactory::SP factory; MonitoredRefCount _gidToLidChangeListenerRefCount; @@ -204,7 +206,7 @@ struct Fixture { Fixture() : factory(std::make_shared<MyGidToLidMapperFactory>()), _gidToLidChangeListenerRefCount(), - _attributeFieldWriter(SequencedTaskExecutor::create(1)), + _attributeFieldWriter(SequencedTaskExecutor::create(attribute_executor, 1)), _parentGidToLidChangeHandler(std::make_shared<MockGidToLidChangeHandler>()), _parentGidToLidChangeHandler2(std::make_shared<MockGidToLidChangeHandler>()), parentReference(std::make_shared<MyDocumentDBReference>(factory, _parentGidToLidChangeHandler)), diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp index 3d8cb30eaa0..126a1ead89c 100644 --- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp +++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp @@ -33,6 +33,7 @@ GlobalId toGid(vespalib::stringref docId) { vespalib::string doc1("id:test:music::1"); vespalib::string doc2("id:test:music::2"); vespalib::string doc3("id:test:music::3"); +VESPA_THREAD_STACK_TAG(test_executor) struct MyGidToLidMapperFactory : public MockGidToLidMapperFactory { @@ -55,7 +56,7 @@ struct Fixture Fixture() : _attr(std::make_shared<ReferenceAttribute>("test", Config(BasicType::REFERENCE))), - _writer(vespalib::SequencedTaskExecutor::create(1)), + _writer(vespalib::SequencedTaskExecutor::create(test_executor, 1)), _refCount(), _listener() { diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp index 2a0b767a87c..a177aa608e7 100644 --- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp @@ -49,6 +49,8 @@ logTarget(const char * text, const FlushContext & ctx) { ctx.getHandler()->getCurrentSerialNumber()); } +VESPA_THREAD_STACK_TAG(flush_engine_executor) + } FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, uint32_t id) @@ -82,7 +84,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats _threadPool(128 * 1024), _strategy(std::move(strategy)), _priorityStrategy(), - _executor(numThreads, 128 * 1024), + _executor(numThreads, 128 * 1024, flush_engine_executor), _lock(), _cond(), _handlers(), diff --git a/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp b/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp index 16348dbace6..06d875151cc 100644 --- a/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp +++ b/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp @@ -9,6 +9,10 @@ using vespalib::makeLambdaTask; namespace proton::initializer { +namespace { + VESPA_THREAD_STACK_TAG(task_runner) +} + TaskRunner::TaskRunner(vespalib::Executor &executor) : _executor(executor), _runningTasks(0u) @@ -89,7 +93,7 @@ TaskRunner::internalRunTasks(const TaskList &taskList, Context::SP context) void TaskRunner::runTask(InitializerTask::SP task) { - vespalib::ThreadStackExecutor executor(1, 128 * 1024); + vespalib::ThreadStackExecutor executor(1, 128 * 1024, task_runner); std::promise<void> promise; auto future = promise.get_future(); runTask(task, executor, makeLambdaTask([&]() { promise.set_value(); })); diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp index ad47fb98232..91ef4b51a40 100644 --- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp @@ -34,6 +34,7 @@ public: } }; +VESPA_THREAD_STACK_TAG(match_engine_executor) } // namespace anon @@ -46,7 +47,7 @@ MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t di _distributionKey(distributionKey), _closed(false), _handlers(), - _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256 * 1024), + _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256 * 1024, match_engine_executor), _threadBundlePool(std::max(size_t(1), threadsPerSearch)), _nodeUp(false) { diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp index 0559ac2afba..80c1288903e 100644 --- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp @@ -19,14 +19,22 @@ namespace proton { namespace { std::unique_ptr<SyncableThreadExecutor> -createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize) { +createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize, + vespalib::Runnable::init_fun_t init_function) { if (optimize == OptimizeFor::THROUGHPUT) { - return std::make_unique<SingleExecutor>(taskLimit); + return std::make_unique<SingleExecutor>(std::move(init_function), taskLimit); } else { - return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit); + return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, std::move(init_function)); } } +VESPA_THREAD_STACK_TAG(master_executor) +VESPA_THREAD_STACK_TAG(index_executor) +VESPA_THREAD_STACK_TAG(summary_executor) +VESPA_THREAD_STACK_TAG(field_inverter_executor) +VESPA_THREAD_STACK_TAG(field_writer_executor) +VESPA_THREAD_STACK_TAG(attribute_executor) + } ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads) @@ -37,16 +45,16 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecu const ThreadingServiceConfig & cfg, uint32_t stackSize) : _sharedExecutor(sharedExecutor), - _masterExecutor(1, stackSize), - _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())), - _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())), + _masterExecutor(1, stackSize, master_executor), + _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)), + _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)), _masterService(_masterExecutor), _indexService(*_indexExecutor), _summaryService(*_summaryExecutor), - _indexFieldInverter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())), - _indexFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())), - _attributeFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(), - cfg.kindOfwatermark(), cfg.reactionTime())) + _indexFieldInverter(SequencedTaskExecutor::create(field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())), + _indexFieldWriter(SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())), + _attributeFieldWriter(SequencedTaskExecutor::create(attribute_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(), + cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime())) { } diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp index 428aba1a6b5..b5efea1de91 100644 --- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp @@ -29,6 +29,8 @@ struct Pair { Pair::~Pair() = default; +VESPA_THREAD_STACK_TAG(proton_rpc_executor) + } namespace proton { @@ -203,7 +205,7 @@ RPCHooksBase::RPCHooksBase(Params ¶ms) _regAPI(*_orb, slobrok::ConfiguratorFactory(params.slobrok_config)), _stateLock(), _stateCond(), - _executor(48, 128 * 1024) + _executor(48, 128 * 1024, proton_rpc_executor) { } void diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp index 1189e8a550c..df9dbf99728 100644 --- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp @@ -42,6 +42,8 @@ uint32_t getNumDocs(const DocsumReply &reply) { } } +VESPA_THREAD_STACK_TAG(suammry_engine_executor) + } // namespace anonymous namespace proton { @@ -60,7 +62,7 @@ SummaryEngine::SummaryEngine(size_t numThreads) : _lock(), _closed(false), _handlers(), - _executor(numThreads, 128 * 1024), + _executor(numThreads, 128 * 1024, suammry_engine_executor), _metrics(std::make_unique<DocsumMetrics>()) { } diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp index 041335b7ed1..3f848b45cc3 100644 --- a/searchlib/src/apps/tests/memoryindexstress_test.cpp +++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp @@ -242,13 +242,15 @@ private: Fixture &operator=(Fixture &&index) = delete; }; +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(push_executor) Fixture::Fixture(uint32_t readThreads) : schema(makeSchema()), repo(makeDocTypeRepoConfig()), _executor(1, 128 * 1024), - _invertThreads(vespalib::SequencedTaskExecutor::create(2)), - _pushThreads(vespalib::SequencedTaskExecutor::create(2)), + _invertThreads(vespalib::SequencedTaskExecutor::create(invert_executor, 2)), + _pushThreads(vespalib::SequencedTaskExecutor::create(push_executor, 2)), index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads), _readThreads(readThreads), _writer(1, 128 * 1024), diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index c2fd048ce3e..c94d19cb3b7 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -279,6 +279,8 @@ validateDiskIndex(DiskIndex &dw, bool f2HasElements, bool f3HasWeights) } } +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(push_executor) void FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap) @@ -315,8 +317,8 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire addField("f4")); FieldIndexCollection fic(schema, MockFieldLengthInspector()); DocBuilder b(schema); - auto invertThreads = SequencedTaskExecutor::create(2); - auto pushThreads = SequencedTaskExecutor::create(2); + auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2); + auto pushThreads = SequencedTaskExecutor::create(push_executor, 2); DocumentInverter inv(schema, *invertThreads, *pushThreads, fic); Document::UP doc; @@ -456,8 +458,8 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng uint32_t numDocs = 20; uint32_t numWords = 1000; DocBuilder b(_schema); - auto invertThreads = SequencedTaskExecutor::create(2); - auto pushThreads = SequencedTaskExecutor::create(2); + auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2); + auto pushThreads = SequencedTaskExecutor::create(push_executor, 2); DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic); inv.invertDocument(10, *make_doc10(b)); diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp index 58f800dec23..0a3a40ec0e7 100644 --- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp +++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp @@ -116,6 +116,8 @@ public: } }; +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(push_executor) struct DocumentInverterTest : public ::testing::Test { Schema _schema; @@ -141,8 +143,8 @@ struct DocumentInverterTest : public ::testing::Test { DocumentInverterTest() : _schema(makeSchema()), _b(_schema), - _invertThreads(SequencedTaskExecutor::create(2)), - _pushThreads(SequencedTaskExecutor::create(2)), + _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)), + _pushThreads(SequencedTaskExecutor::create(push_executor, 2)), _word_store(), _remover(_word_store), _inserter(), diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index ee42d68a495..d7b515c29e0 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -901,6 +901,8 @@ TEST_F(FieldIndexCollectionTypeTest, instantiates_field_index_type_based_on_sche expect_field_index_type<FieldIndex<true>>(fic.getFieldIndex(1)); } +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(push_executor) class InverterTest : public ::testing::Test { public: @@ -915,8 +917,8 @@ public: : _schema(schema), _fic(_schema, MockFieldLengthInspector()), _b(_schema), - _invertThreads(SequencedTaskExecutor::create(2)), - _pushThreads(SequencedTaskExecutor::create(2)), + _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)), + _pushThreads(SequencedTaskExecutor::create(push_executor, 2)), _inv(_schema, *_invertThreads, *_pushThreads, _fic) { } @@ -1449,14 +1451,15 @@ TEST_F(FieldIndexCollectionTest, require_that_insert_tells_which_word_ref_that_w insertAndAssertTuple("c", 2, 22, fic); } + struct RemoverTest : public FieldIndexCollectionTest { std::unique_ptr<ISequencedTaskExecutor> _invertThreads; std::unique_ptr<ISequencedTaskExecutor> _pushThreads; RemoverTest() : FieldIndexCollectionTest(), - _invertThreads(SequencedTaskExecutor::create(2)), - _pushThreads(SequencedTaskExecutor::create(2)) + _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)), + _pushThreads(SequencedTaskExecutor::create(push_executor, 2)) { } void assertPostingLists(const vespalib::string &e1, diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp index 6476ac7cf8a..6bbaf16380d 100644 --- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp +++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp @@ -121,12 +121,14 @@ private: Index &operator=(const Index &index); }; +VESPA_THREAD_STACK_TAG(invert_executor) +VESPA_THREAD_STACK_TAG(push_executor) Index::Index(const MySetup &setup) : schema(setup.schema), _executor(1, 128 * 1024), - _invertThreads(SequencedTaskExecutor::create(2)), - _pushThreads(SequencedTaskExecutor::create(2)), + _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)), + _pushThreads(SequencedTaskExecutor::create(push_executor, 2)), index(schema, setup, *_invertThreads, *_pushThreads), builder(schema), docid(1), diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp index ba82651f1fc..90067d86fc8 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -40,6 +40,8 @@ struct SimpleParams { } }; +VESPA_THREAD_STACK_TAG(sequenced_executor) + int main(int argc, char **argv) { SimpleParams params(argc, argv); bool use_adaptive_executor = params.next("use_adaptive_executor", 0); @@ -58,7 +60,7 @@ int main(int argc, char **argv) { auto optimize = optimize_for_throughput ? vespalib::Executor::OptimizeFor::THROUGHPUT : vespalib::Executor::OptimizeFor::LATENCY; - executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize); + executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize); } vespalib::Timer timer; for (size_t task_id = 0; task_id < num_tasks; ++task_id) { diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 29b25cd0471..21674b4e2d0 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -12,6 +12,8 @@ #include <vespa/log/log.h> LOG_SETUP("sequencedtaskexecutor_test"); +VESPA_THREAD_STACK_TAG(sequenced_executor) + namespace vespalib { @@ -20,7 +22,7 @@ class Fixture public: std::unique_ptr<ISequencedTaskExecutor> _threads; - Fixture() : _threads(SequencedTaskExecutor::create(2)) { } + Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { } }; @@ -233,12 +235,12 @@ TEST_F("require that executeLambda works", Fixture) } TEST("require that you get correct number of executors") { - auto seven = SequencedTaskExecutor::create(7); + auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); EXPECT_EQUAL(7u, seven->getNumExecutors()); } TEST("require that you distribute well") { - auto seven = SequencedTaskExecutor::create(7); + auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven); EXPECT_EQUAL(7u, seven->getNumExecutors()); EXPECT_EQUAL(97u, seq.getComponentHashSize()); @@ -251,21 +253,21 @@ TEST("require that you distribute well") { } TEST("Test creation of different types") { - auto iseq = SequencedTaskExecutor::create(1); + auto iseq = SequencedTaskExecutor::create(sequenced_executor, 1); EXPECT_EQUAL(1u, iseq->getNumExecutors()); auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT); seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get()); ASSERT_TRUE(seq != nullptr); - iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); + iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17); auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get()); ASSERT_TRUE(aseq != nullptr); } diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp index 622c9b9985f..5cc8862fc05 100644 --- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp +++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp @@ -8,10 +8,12 @@ using namespace vespalib; +VESPA_THREAD_STACK_TAG(sequenced_executor) + TEST("test that all tasks are executed") { std::atomic<uint64_t> counter(0); - SingleExecutor executor(10); + SingleExecutor executor(sequenced_executor, 10); for (uint64_t i(0); i < 10; i++) { executor.execute(makeLambdaTask([&counter] {counter++;})); @@ -32,7 +34,7 @@ void verifyResizeTaskLimit(bool up) { std::condition_variable cond; std::atomic<uint64_t> started(0); std::atomic<uint64_t> allowed(0); - SingleExecutor executor(10); + SingleExecutor executor(sequenced_executor, 10); uint32_t targetTaskLimit = up ? 20 : 5; uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index cf385275bfb..d1c6b1aba53 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -27,7 +27,8 @@ isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & ex } std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) +SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit, + OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { size_t num_strands = std::min(taskLimit, threads*32); @@ -38,9 +39,9 @@ SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark; - executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime)); + executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime)); } else { - executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); + executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } } return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 180cd1cc6cc..050b00ef011 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -3,6 +3,7 @@ #include "isequencedtaskexecutor.h" #include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/runnable.h> namespace vespalib { @@ -33,7 +34,8 @@ public: * */ static std::unique_ptr<ISequencedTaskExecutor> - create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); + create(vespalib::Runnable::init_fun_t, uint32_t threads, uint32_t taskLimit = 1000, + OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); /** * For testing only */ diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp index 795a7ef1ec3..96d8f267875 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp @@ -5,11 +5,11 @@ namespace vespalib { -SingleExecutor::SingleExecutor(uint32_t taskLimit) - : SingleExecutor(taskLimit, taskLimit/10, 5ms) +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit) + : SingleExecutor(func, taskLimit, taskLimit/10, 5ms) { } -SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime) +SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime) : _taskLimit(vespalib::roundUp2inN(taskLimit)), _wantedTaskLimit(_taskLimit.load()), _rp(0), @@ -27,6 +27,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration _reactionTime(reactionTime), _closed(false) { + (void) func; //TODO implement similar to ThreadStackExecutor assert(taskLimit >= watermark); _thread.start(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h index 7b8a2741d87..58cec52b2b0 100644 --- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h @@ -18,8 +18,8 @@ namespace vespalib { */ class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable { public: - explicit SingleExecutor(uint32_t taskLimit); - SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime); + explicit SingleExecutor(init_fun_t func, uint32_t taskLimit); + SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime); ~SingleExecutor() override; Task::UP execute(Task::UP task) override; void setTaskLimit(uint32_t taskLimit) override; diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index 1a6201d8aa3..e904f690bde 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -125,14 +125,16 @@ TestStorageApp::waitUntilInitialized( } namespace { - NodeIndex getIndexFromConfig(vespalib::stringref configId) { - if (!configId.empty()) { - config::ConfigUri uri(configId); - return NodeIndex( - config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex); - } - return NodeIndex(0); +NodeIndex getIndexFromConfig(vespalib::stringref configId) { + if (!configId.empty()) { + config::ConfigUri uri(configId); + return NodeIndex( + config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex); } + return NodeIndex(0); +} + +VESPA_THREAD_STACK_TAG(test_executor) } TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId) @@ -140,7 +142,7 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId) lib::NodeType::STORAGE, getIndexFromConfig(configId), configId), _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), _persistenceProvider(), - _executor(vespalib::SequencedTaskExecutor::create(1)) + _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1)) { lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState()); _nodeStateUpdater.setReportedNodeState(ns); @@ -152,7 +154,7 @@ TestServiceLayerApp::TestServiceLayerApp(NodeIndex index, lib::NodeType::STORAGE, index, configId), _compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())), _persistenceProvider(), - _executor(vespalib::SequencedTaskExecutor::create(1)) + _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1)) { lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState()); _nodeStateUpdater.setReportedNodeState(ns); diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 489f58fbcc0..47e516a1dcc 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -88,9 +88,11 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid) { return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid)); } +VESPA_THREAD_STACK_TAG(test_executor) + void PersistenceTestUtils::setupExecutor(uint32_t numThreads) { - _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); + _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE); } document::Document::SP diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 8e1ee6cbb3a..0655a8fde7a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -29,6 +29,11 @@ using std::shared_ptr; using document::BucketSpace; using vespalib::make_string_short::fmt; +namespace { + +VESPA_THREAD_STACK_TAG(response_executor) + +} namespace storage { FileStorManager:: @@ -164,7 +169,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); - _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); + _sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000, + selectSequencer(_config->responseSequencerType)); assert(_sequencedExecutor); LOG(spam, "Setting up the disk"); for (uint32_t i = 0; i < numThreads; i++) { diff --git a/vespalib/src/vespa/vespalib/util/runnable.h b/vespalib/src/vespa/vespalib/util/runnable.h index 3b387a2067c..9d6481fbb19 100644 --- a/vespalib/src/vespa/vespalib/util/runnable.h +++ b/vespalib/src/vespa/vespalib/util/runnable.h @@ -3,14 +3,26 @@ #pragma once #include <memory> +#include <functional> namespace vespalib { +// Convenience macro used to create a function that can be used as an +// init function when creating an executor to inject a frame with the +// given name into the stack of all worker threads. + +#define VESPA_THREAD_STACK_TAG(name) \ + int name(::vespalib::Runnable &worker) { \ + worker.run(); \ + return 1; \ + } + /** * Interface implemented in order to be run by a Thread. **/ struct Runnable { - typedef std::unique_ptr<Runnable> UP; + using UP = std::unique_ptr<Runnable>; + using init_fun_t = std::function<int(Runnable&)>; /** * Entry point called by the running thread diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h index e5cc87c1937..c86597ca153 100644 --- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h +++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h @@ -16,16 +16,6 @@ namespace vespalib { namespace thread { struct ThreadInit; } -// Convenience macro used to create a function that can be used as an -// init function when creating an executor to inject a frame with the -// given name into the stack of all worker threads. - -#define VESPA_THREAD_STACK_TAG(name) \ - int name(::vespalib::Runnable &worker) { \ - worker.run(); \ - return 1; \ - } - /** * An executor service that executes tasks in multiple threads. **/ @@ -33,7 +23,6 @@ class ThreadStackExecutorBase : public SyncableThreadExecutor, public Runnable { public: - using init_fun_t = std::function<int(Runnable&)>; using unique_lock = std::unique_lock<std::mutex>; private: |