summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp3
-rw-r--r--searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp6
-rw-r--r--searchcore/src/tests/proton/index/indexmanager_test.cpp7
-rw-r--r--searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp4
-rw-r--r--searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp6
-rw-r--r--searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp3
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp28
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp4
-rw-r--r--searchlib/src/apps/tests/memoryindexstress_test.cpp6
-rw-r--r--searchlib/src/tests/diskindex/fusion/fusion_test.cpp10
-rw-r--r--searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp6
-rw-r--r--searchlib/src/tests/memoryindex/field_index/field_index_test.cpp11
-rw-r--r--searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp6
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp4
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp16
-rw-r--r--staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp6
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h4
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp7
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/singleexecutor.h4
-rw-r--r--storage/src/tests/common/teststorageapp.cpp20
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp8
-rw-r--r--vespalib/src/vespa/vespalib/util/runnable.h14
-rw-r--r--vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h11
28 files changed, 138 insertions, 78 deletions
diff --git a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
index 19af6a699c4..e284f5ed1d9 100644
--- a/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
+++ b/searchcore/src/tests/proton/attribute/exclusive_attribute_read_accessor/exclusive_attribute_read_accessor_test.cpp
@@ -13,6 +13,7 @@ using namespace search;
using namespace vespalib;
using ReadGuard = ExclusiveAttributeReadAccessor::Guard;
+VESPA_THREAD_STACK_TAG(test_executor)
AttributeVector::SP
createAttribute()
@@ -29,7 +30,7 @@ struct Fixture
Fixture()
: attribute(createAttribute()),
- writer(SequencedTaskExecutor::create(1)),
+ writer(SequencedTaskExecutor::create(test_executor, 1)),
accessor(attribute, *writer)
{}
};
diff --git a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
index 7af9d8d816c..74a2a14dbd6 100644
--- a/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
+++ b/searchcore/src/tests/proton/feed_and_search/feed_and_search.cpp
@@ -143,14 +143,16 @@ void Test::testSearch(Searchable &source,
EXPECT_TRUE(search_iterator->isAtEnd());
}
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(write_executor)
// Creates a memory index, inserts documents, performs a few
// searches, dumps the index to disk, and performs the searches
// again.
void Test::requireThatMemoryIndexCanBeDumpedAndSearched() {
Schema schema = getSchema();
vespalib::ThreadStackExecutor sharedExecutor(2, 0x10000);
- auto indexFieldInverter = vespalib::SequencedTaskExecutor::create(2);
- auto indexFieldWriter = vespalib::SequencedTaskExecutor::create(2);
+ auto indexFieldInverter = vespalib::SequencedTaskExecutor::create(invert_executor, 2);
+ auto indexFieldWriter = vespalib::SequencedTaskExecutor::create(write_executor, 2);
MemoryIndex memory_index(schema, MockFieldLengthInspector(), *indexFieldInverter, *indexFieldWriter);
DocBuilder doc_builder(schema);
diff --git a/searchcore/src/tests/proton/index/indexmanager_test.cpp b/searchcore/src/tests/proton/index/indexmanager_test.cpp
index 3f088a497aa..89594821803 100644
--- a/searchcore/src/tests/proton/index/indexmanager_test.cpp
+++ b/searchcore/src/tests/proton/index/indexmanager_test.cpp
@@ -374,12 +374,15 @@ TEST_F(IndexManagerTest, require_that_source_selector_is_flushed)
ASSERT_TRUE(file.OpenReadOnlyExisting());
}
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(push_executor)
+
TEST_F(IndexManagerTest, require_that_flush_stats_are_calculated)
{
Schema schema(getSchema());
FieldIndexCollection fic(schema, MockFieldLengthInspector());
- auto invertThreads = SequencedTaskExecutor::create(2);
- auto pushThreads = SequencedTaskExecutor::create(2);
+ auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2);
+ auto pushThreads = SequencedTaskExecutor::create(push_executor, 2);
search::memoryindex::DocumentInverter inverter(schema, *invertThreads, *pushThreads, fic);
uint64_t fixed_index_size = fic.getMemoryUsage().allocatedBytes();
diff --git a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
index bacdd85dcc5..6e0efc9188d 100644
--- a/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
+++ b/searchcore/src/tests/proton/reference/document_db_reference_resolver/document_db_reference_resolver_test.cpp
@@ -188,6 +188,8 @@ asImportedAttribute(const IAttributeVector &attr)
return *result;
}
+VESPA_THREAD_STACK_TAG(attribute_executor)
+
struct Fixture {
MyGidToLidMapperFactory::SP factory;
MonitoredRefCount _gidToLidChangeListenerRefCount;
@@ -204,7 +206,7 @@ struct Fixture {
Fixture() :
factory(std::make_shared<MyGidToLidMapperFactory>()),
_gidToLidChangeListenerRefCount(),
- _attributeFieldWriter(SequencedTaskExecutor::create(1)),
+ _attributeFieldWriter(SequencedTaskExecutor::create(attribute_executor, 1)),
_parentGidToLidChangeHandler(std::make_shared<MockGidToLidChangeHandler>()),
_parentGidToLidChangeHandler2(std::make_shared<MockGidToLidChangeHandler>()),
parentReference(std::make_shared<MyDocumentDBReference>(factory, _parentGidToLidChangeHandler)),
diff --git a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
index 3d8cb30eaa0..126a1ead89c 100644
--- a/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
+++ b/searchcore/src/tests/proton/reference/gid_to_lid_change_listener/gid_to_lid_change_listener_test.cpp
@@ -33,6 +33,7 @@ GlobalId toGid(vespalib::stringref docId) {
vespalib::string doc1("id:test:music::1");
vespalib::string doc2("id:test:music::2");
vespalib::string doc3("id:test:music::3");
+VESPA_THREAD_STACK_TAG(test_executor)
struct MyGidToLidMapperFactory : public MockGidToLidMapperFactory
{
@@ -55,7 +56,7 @@ struct Fixture
Fixture()
: _attr(std::make_shared<ReferenceAttribute>("test", Config(BasicType::REFERENCE))),
- _writer(vespalib::SequencedTaskExecutor::create(1)),
+ _writer(vespalib::SequencedTaskExecutor::create(test_executor, 1)),
_refCount(),
_listener()
{
diff --git a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
index 2a0b767a87c..a177aa608e7 100644
--- a/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/flushengine/flushengine.cpp
@@ -49,6 +49,8 @@ logTarget(const char * text, const FlushContext & ctx) {
ctx.getHandler()->getCurrentSerialNumber());
}
+VESPA_THREAD_STACK_TAG(flush_engine_executor)
+
}
FlushEngine::FlushMeta::FlushMeta(const vespalib::string & name, uint32_t id)
@@ -82,7 +84,7 @@ FlushEngine::FlushEngine(std::shared_ptr<flushengine::ITlsStatsFactory> tlsStats
_threadPool(128 * 1024),
_strategy(std::move(strategy)),
_priorityStrategy(),
- _executor(numThreads, 128 * 1024),
+ _executor(numThreads, 128 * 1024, flush_engine_executor),
_lock(),
_cond(),
_handlers(),
diff --git a/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp b/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp
index 16348dbace6..06d875151cc 100644
--- a/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp
+++ b/searchcore/src/vespa/searchcore/proton/initializer/task_runner.cpp
@@ -9,6 +9,10 @@ using vespalib::makeLambdaTask;
namespace proton::initializer {
+namespace {
+ VESPA_THREAD_STACK_TAG(task_runner)
+}
+
TaskRunner::TaskRunner(vespalib::Executor &executor)
: _executor(executor),
_runningTasks(0u)
@@ -89,7 +93,7 @@ TaskRunner::internalRunTasks(const TaskList &taskList, Context::SP context)
void
TaskRunner::runTask(InitializerTask::SP task)
{
- vespalib::ThreadStackExecutor executor(1, 128 * 1024);
+ vespalib::ThreadStackExecutor executor(1, 128 * 1024, task_runner);
std::promise<void> promise;
auto future = promise.get_future();
runTask(task, executor, makeLambdaTask([&]() { promise.set_value(); }));
diff --git a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
index ad47fb98232..91ef4b51a40 100644
--- a/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/matchengine/matchengine.cpp
@@ -34,6 +34,7 @@ public:
}
};
+VESPA_THREAD_STACK_TAG(match_engine_executor)
} // namespace anon
@@ -46,7 +47,7 @@ MatchEngine::MatchEngine(size_t numThreads, size_t threadsPerSearch, uint32_t di
_distributionKey(distributionKey),
_closed(false),
_handlers(),
- _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256 * 1024),
+ _executor(std::max(size_t(1), numThreads / threadsPerSearch), 256 * 1024, match_engine_executor),
_threadBundlePool(std::max(size_t(1), threadsPerSearch)),
_nodeUp(false)
{
diff --git a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
index 0559ac2afba..80c1288903e 100644
--- a/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/executorthreadingservice.cpp
@@ -19,14 +19,22 @@ namespace proton {
namespace {
std::unique_ptr<SyncableThreadExecutor>
-createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize) {
+createExecutorWithOneThread(uint32_t stackSize, uint32_t taskLimit, OptimizeFor optimize,
+ vespalib::Runnable::init_fun_t init_function) {
if (optimize == OptimizeFor::THROUGHPUT) {
- return std::make_unique<SingleExecutor>(taskLimit);
+ return std::make_unique<SingleExecutor>(std::move(init_function), taskLimit);
} else {
- return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit);
+ return std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, std::move(init_function));
}
}
+VESPA_THREAD_STACK_TAG(master_executor)
+VESPA_THREAD_STACK_TAG(index_executor)
+VESPA_THREAD_STACK_TAG(summary_executor)
+VESPA_THREAD_STACK_TAG(field_inverter_executor)
+VESPA_THREAD_STACK_TAG(field_writer_executor)
+VESPA_THREAD_STACK_TAG(attribute_executor)
+
}
ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecutor &sharedExecutor, uint32_t num_treads)
@@ -37,16 +45,16 @@ ExecutorThreadingService::ExecutorThreadingService(vespalib::SyncableThreadExecu
const ThreadingServiceConfig & cfg, uint32_t stackSize)
: _sharedExecutor(sharedExecutor),
- _masterExecutor(1, stackSize),
- _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())),
- _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize())),
+ _masterExecutor(1, stackSize, master_executor),
+ _indexExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), index_executor)),
+ _summaryExecutor(createExecutorWithOneThread(stackSize, cfg.defaultTaskLimit(), cfg.optimize(), summary_executor)),
_masterService(_masterExecutor),
_indexService(*_indexExecutor),
_summaryService(*_summaryExecutor),
- _indexFieldInverter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())),
- _indexFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit())),
- _attributeFieldWriter(SequencedTaskExecutor::create(cfg.indexingThreads(), cfg.defaultTaskLimit(), cfg.optimize(),
- cfg.kindOfwatermark(), cfg.reactionTime()))
+ _indexFieldInverter(SequencedTaskExecutor::create(field_inverter_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())),
+ _indexFieldWriter(SequencedTaskExecutor::create(field_writer_executor, cfg.indexingThreads(), cfg.defaultTaskLimit())),
+ _attributeFieldWriter(SequencedTaskExecutor::create(attribute_executor, cfg.indexingThreads(), cfg.defaultTaskLimit(),
+ cfg.optimize(), cfg.kindOfwatermark(), cfg.reactionTime()))
{
}
diff --git a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
index 428aba1a6b5..b5efea1de91 100644
--- a/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
+++ b/searchcore/src/vespa/searchcore/proton/server/rpc_hooks.cpp
@@ -29,6 +29,8 @@ struct Pair {
Pair::~Pair() = default;
+VESPA_THREAD_STACK_TAG(proton_rpc_executor)
+
}
namespace proton {
@@ -203,7 +205,7 @@ RPCHooksBase::RPCHooksBase(Params &params)
_regAPI(*_orb, slobrok::ConfiguratorFactory(params.slobrok_config)),
_stateLock(),
_stateCond(),
- _executor(48, 128 * 1024)
+ _executor(48, 128 * 1024, proton_rpc_executor)
{ }
void
diff --git a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
index 1189e8a550c..df9dbf99728 100644
--- a/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/summaryengine/summaryengine.cpp
@@ -42,6 +42,8 @@ uint32_t getNumDocs(const DocsumReply &reply) {
}
}
+VESPA_THREAD_STACK_TAG(suammry_engine_executor)
+
} // namespace anonymous
namespace proton {
@@ -60,7 +62,7 @@ SummaryEngine::SummaryEngine(size_t numThreads)
: _lock(),
_closed(false),
_handlers(),
- _executor(numThreads, 128 * 1024),
+ _executor(numThreads, 128 * 1024, suammry_engine_executor),
_metrics(std::make_unique<DocsumMetrics>())
{ }
diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp
index 041335b7ed1..3f848b45cc3 100644
--- a/searchlib/src/apps/tests/memoryindexstress_test.cpp
+++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp
@@ -242,13 +242,15 @@ private:
Fixture &operator=(Fixture &&index) = delete;
};
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(push_executor)
Fixture::Fixture(uint32_t readThreads)
: schema(makeSchema()),
repo(makeDocTypeRepoConfig()),
_executor(1, 128 * 1024),
- _invertThreads(vespalib::SequencedTaskExecutor::create(2)),
- _pushThreads(vespalib::SequencedTaskExecutor::create(2)),
+ _invertThreads(vespalib::SequencedTaskExecutor::create(invert_executor, 2)),
+ _pushThreads(vespalib::SequencedTaskExecutor::create(push_executor, 2)),
index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads),
_readThreads(readThreads),
_writer(1, 128 * 1024),
diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
index c2fd048ce3e..c94d19cb3b7 100644
--- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
+++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp
@@ -279,6 +279,8 @@ validateDiskIndex(DiskIndex &dw, bool f2HasElements, bool f3HasWeights)
}
}
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(push_executor)
void
FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool directio, bool readmmap)
@@ -315,8 +317,8 @@ FusionTest::requireThatFusionIsWorking(const vespalib::string &prefix, bool dire
addField("f4"));
FieldIndexCollection fic(schema, MockFieldLengthInspector());
DocBuilder b(schema);
- auto invertThreads = SequencedTaskExecutor::create(2);
- auto pushThreads = SequencedTaskExecutor::create(2);
+ auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2);
+ auto pushThreads = SequencedTaskExecutor::create(push_executor, 2);
DocumentInverter inv(schema, *invertThreads, *pushThreads, fic);
Document::UP doc;
@@ -456,8 +458,8 @@ FusionTest::make_simple_index(const vespalib::string &dump_dir, const IFieldLeng
uint32_t numDocs = 20;
uint32_t numWords = 1000;
DocBuilder b(_schema);
- auto invertThreads = SequencedTaskExecutor::create(2);
- auto pushThreads = SequencedTaskExecutor::create(2);
+ auto invertThreads = SequencedTaskExecutor::create(invert_executor, 2);
+ auto pushThreads = SequencedTaskExecutor::create(push_executor, 2);
DocumentInverter inv(_schema, *invertThreads, *pushThreads, fic);
inv.invertDocument(10, *make_doc10(b));
diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
index 58f800dec23..0a3a40ec0e7 100644
--- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
+++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp
@@ -116,6 +116,8 @@ public:
}
};
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(push_executor)
struct DocumentInverterTest : public ::testing::Test {
Schema _schema;
@@ -141,8 +143,8 @@ struct DocumentInverterTest : public ::testing::Test {
DocumentInverterTest()
: _schema(makeSchema()),
_b(_schema),
- _invertThreads(SequencedTaskExecutor::create(2)),
- _pushThreads(SequencedTaskExecutor::create(2)),
+ _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)),
+ _pushThreads(SequencedTaskExecutor::create(push_executor, 2)),
_word_store(),
_remover(_word_store),
_inserter(),
diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
index ee42d68a495..d7b515c29e0 100644
--- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp
@@ -901,6 +901,8 @@ TEST_F(FieldIndexCollectionTypeTest, instantiates_field_index_type_based_on_sche
expect_field_index_type<FieldIndex<true>>(fic.getFieldIndex(1));
}
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(push_executor)
class InverterTest : public ::testing::Test {
public:
@@ -915,8 +917,8 @@ public:
: _schema(schema),
_fic(_schema, MockFieldLengthInspector()),
_b(_schema),
- _invertThreads(SequencedTaskExecutor::create(2)),
- _pushThreads(SequencedTaskExecutor::create(2)),
+ _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)),
+ _pushThreads(SequencedTaskExecutor::create(push_executor, 2)),
_inv(_schema, *_invertThreads, *_pushThreads, _fic)
{
}
@@ -1449,14 +1451,15 @@ TEST_F(FieldIndexCollectionTest, require_that_insert_tells_which_word_ref_that_w
insertAndAssertTuple("c", 2, 22, fic);
}
+
struct RemoverTest : public FieldIndexCollectionTest {
std::unique_ptr<ISequencedTaskExecutor> _invertThreads;
std::unique_ptr<ISequencedTaskExecutor> _pushThreads;
RemoverTest()
: FieldIndexCollectionTest(),
- _invertThreads(SequencedTaskExecutor::create(2)),
- _pushThreads(SequencedTaskExecutor::create(2))
+ _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)),
+ _pushThreads(SequencedTaskExecutor::create(push_executor, 2))
{
}
void assertPostingLists(const vespalib::string &e1,
diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
index 6476ac7cf8a..6bbaf16380d 100644
--- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
+++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp
@@ -121,12 +121,14 @@ private:
Index &operator=(const Index &index);
};
+VESPA_THREAD_STACK_TAG(invert_executor)
+VESPA_THREAD_STACK_TAG(push_executor)
Index::Index(const MySetup &setup)
: schema(setup.schema),
_executor(1, 128 * 1024),
- _invertThreads(SequencedTaskExecutor::create(2)),
- _pushThreads(SequencedTaskExecutor::create(2)),
+ _invertThreads(SequencedTaskExecutor::create(invert_executor, 2)),
+ _pushThreads(SequencedTaskExecutor::create(push_executor, 2)),
index(schema, setup, *_invertThreads, *_pushThreads),
builder(schema),
docid(1),
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
index ba82651f1fc..90067d86fc8 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp
@@ -40,6 +40,8 @@ struct SimpleParams {
}
};
+VESPA_THREAD_STACK_TAG(sequenced_executor)
+
int main(int argc, char **argv) {
SimpleParams params(argc, argv);
bool use_adaptive_executor = params.next("use_adaptive_executor", 0);
@@ -58,7 +60,7 @@ int main(int argc, char **argv) {
auto optimize = optimize_for_throughput
? vespalib::Executor::OptimizeFor::THROUGHPUT
: vespalib::Executor::OptimizeFor::LATENCY;
- executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize);
+ executor = SequencedTaskExecutor::create(sequenced_executor, num_strands, task_limit, optimize);
}
vespalib::Timer timer;
for (size_t task_id = 0; task_id < num_tasks; ++task_id) {
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index 29b25cd0471..21674b4e2d0 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -12,6 +12,8 @@
#include <vespa/log/log.h>
LOG_SETUP("sequencedtaskexecutor_test");
+VESPA_THREAD_STACK_TAG(sequenced_executor)
+
namespace vespalib {
@@ -20,7 +22,7 @@ class Fixture
public:
std::unique_ptr<ISequencedTaskExecutor> _threads;
- Fixture() : _threads(SequencedTaskExecutor::create(2)) { }
+ Fixture() : _threads(SequencedTaskExecutor::create(sequenced_executor, 2)) { }
};
@@ -233,12 +235,12 @@ TEST_F("require that executeLambda works", Fixture)
}
TEST("require that you get correct number of executors") {
- auto seven = SequencedTaskExecutor::create(7);
+ auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
EXPECT_EQUAL(7u, seven->getNumExecutors());
}
TEST("require that you distribute well") {
- auto seven = SequencedTaskExecutor::create(7);
+ auto seven = SequencedTaskExecutor::create(sequenced_executor, 7);
const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven);
EXPECT_EQUAL(7u, seven->getNumExecutors());
EXPECT_EQUAL(97u, seq.getComponentHashSize());
@@ -251,21 +253,21 @@ TEST("require that you distribute well") {
}
TEST("Test creation of different types") {
- auto iseq = SequencedTaskExecutor::create(1);
+ auto iseq = SequencedTaskExecutor::create(sequenced_executor, 1);
EXPECT_EQUAL(1u, iseq->getNumExecutors());
auto * seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::LATENCY);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::LATENCY);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::THROUGHPUT);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::THROUGHPUT);
seq = dynamic_cast<SequencedTaskExecutor *>(iseq.get());
ASSERT_TRUE(seq != nullptr);
- iseq = SequencedTaskExecutor::create(1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
+ iseq = SequencedTaskExecutor::create(sequenced_executor, 1, 1000, Executor::OptimizeFor::ADAPTIVE, 17);
auto aseq = dynamic_cast<AdaptiveSequencedExecutor *>(iseq.get());
ASSERT_TRUE(aseq != nullptr);
}
diff --git a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
index 622c9b9985f..5cc8862fc05 100644
--- a/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
+++ b/staging_vespalib/src/tests/singleexecutor/singleexecutor_test.cpp
@@ -8,10 +8,12 @@
using namespace vespalib;
+VESPA_THREAD_STACK_TAG(sequenced_executor)
+
TEST("test that all tasks are executed") {
std::atomic<uint64_t> counter(0);
- SingleExecutor executor(10);
+ SingleExecutor executor(sequenced_executor, 10);
for (uint64_t i(0); i < 10; i++) {
executor.execute(makeLambdaTask([&counter] {counter++;}));
@@ -32,7 +34,7 @@ void verifyResizeTaskLimit(bool up) {
std::condition_variable cond;
std::atomic<uint64_t> started(0);
std::atomic<uint64_t> allowed(0);
- SingleExecutor executor(10);
+ SingleExecutor executor(sequenced_executor, 10);
uint32_t targetTaskLimit = up ? 20 : 5;
uint32_t roundedTaskLimit = roundUp2inN(targetTaskLimit);
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
index cf385275bfb..d1c6b1aba53 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp
@@ -27,7 +27,8 @@ isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & ex
}
std::unique_ptr<ISequencedTaskExecutor>
-SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
+SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t threads, uint32_t taskLimit,
+ OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime)
{
if (optimize == OptimizeFor::ADAPTIVE) {
size_t num_strands = std::min(taskLimit, threads*32);
@@ -38,9 +39,9 @@ SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor
for (uint32_t id = 0; id < threads; ++id) {
if (optimize == OptimizeFor::THROUGHPUT) {
uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 10 : kindOfWatermark;
- executors->push_back(std::make_unique<SingleExecutor>(taskLimit, watermark, reactionTime));
+ executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime));
} else {
- executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit));
+ executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func));
}
}
return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors)));
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
index 180cd1cc6cc..050b00ef011 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h
@@ -3,6 +3,7 @@
#include "isequencedtaskexecutor.h"
#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/runnable.h>
namespace vespalib {
@@ -33,7 +34,8 @@ public:
*
*/
static std::unique_ptr<ISequencedTaskExecutor>
- create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
+ create(vespalib::Runnable::init_fun_t, uint32_t threads, uint32_t taskLimit = 1000,
+ OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms);
/**
* For testing only
*/
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
index 795a7ef1ec3..96d8f267875 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.cpp
@@ -5,11 +5,11 @@
namespace vespalib {
-SingleExecutor::SingleExecutor(uint32_t taskLimit)
- : SingleExecutor(taskLimit, taskLimit/10, 5ms)
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit)
+ : SingleExecutor(func, taskLimit, taskLimit/10, 5ms)
{ }
-SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime)
+SingleExecutor::SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime)
: _taskLimit(vespalib::roundUp2inN(taskLimit)),
_wantedTaskLimit(_taskLimit.load()),
_rp(0),
@@ -27,6 +27,7 @@ SingleExecutor::SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration
_reactionTime(reactionTime),
_closed(false)
{
+ (void) func; //TODO implement similar to ThreadStackExecutor
assert(taskLimit >= watermark);
_thread.start();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
index 7b8a2741d87..58cec52b2b0 100644
--- a/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/singleexecutor.h
@@ -18,8 +18,8 @@ namespace vespalib {
*/
class SingleExecutor final : public vespalib::SyncableThreadExecutor, vespalib::Runnable {
public:
- explicit SingleExecutor(uint32_t taskLimit);
- SingleExecutor(uint32_t taskLimit, uint32_t watermark, duration reactionTime);
+ explicit SingleExecutor(init_fun_t func, uint32_t taskLimit);
+ SingleExecutor(init_fun_t func, uint32_t taskLimit, uint32_t watermark, duration reactionTime);
~SingleExecutor() override;
Task::UP execute(Task::UP task) override;
void setTaskLimit(uint32_t taskLimit) override;
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index 1a6201d8aa3..e904f690bde 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -125,14 +125,16 @@ TestStorageApp::waitUntilInitialized(
}
namespace {
- NodeIndex getIndexFromConfig(vespalib::stringref configId) {
- if (!configId.empty()) {
- config::ConfigUri uri(configId);
- return NodeIndex(
- config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex);
- }
- return NodeIndex(0);
+NodeIndex getIndexFromConfig(vespalib::stringref configId) {
+ if (!configId.empty()) {
+ config::ConfigUri uri(configId);
+ return NodeIndex(
+ config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(uri.getConfigId(), uri.getContext())->nodeIndex);
}
+ return NodeIndex(0);
+}
+
+VESPA_THREAD_STACK_TAG(test_executor)
}
TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
@@ -140,7 +142,7 @@ TestServiceLayerApp::TestServiceLayerApp(vespalib::stringref configId)
lib::NodeType::STORAGE, getIndexFromConfig(configId), configId),
_compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
_persistenceProvider(),
- _executor(vespalib::SequencedTaskExecutor::create(1))
+ _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1))
{
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
_nodeStateUpdater.setReportedNodeState(ns);
@@ -152,7 +154,7 @@ TestServiceLayerApp::TestServiceLayerApp(NodeIndex index,
lib::NodeType::STORAGE, index, configId),
_compReg(dynamic_cast<ServiceLayerComponentRegisterImpl&>(TestStorageApp::getComponentRegister())),
_persistenceProvider(),
- _executor(vespalib::SequencedTaskExecutor::create(1))
+ _executor(vespalib::SequencedTaskExecutor::create(test_executor, 1))
{
lib::NodeState ns(*_nodeStateUpdater.getReportedNodeState());
_nodeStateUpdater.setReportedNodeState(ns);
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 489f58fbcc0..47e516a1dcc 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -88,9 +88,11 @@ PersistenceTestUtils::dumpBucket(const document::BucketId& bid) {
return dynamic_cast<spi::dummy::DummyPersistence&>(_env->_node.getPersistenceProvider()).dumpBucket(makeSpiBucket(bid));
}
+VESPA_THREAD_STACK_TAG(test_executor)
+
void
PersistenceTestUtils::setupExecutor(uint32_t numThreads) {
- _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
+ _sequenceTaskExecutor = vespalib::SequencedTaskExecutor::create(test_executor, numThreads, 1000, vespalib::Executor::OptimizeFor::ADAPTIVE);
}
document::Document::SP
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 8e1ee6cbb3a..0655a8fde7a 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -29,6 +29,11 @@ using std::shared_ptr;
using document::BucketSpace;
using vespalib::make_string_short::fmt;
+namespace {
+
+VESPA_THREAD_STACK_TAG(response_executor)
+
+}
namespace storage {
FileStorManager::
@@ -164,7 +169,8 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
_filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg);
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
- _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType));
+ _sequencedExecutor = vespalib::SequencedTaskExecutor::create(response_executor, numResponseThreads, 10000,
+ selectSequencer(_config->responseSequencerType));
assert(_sequencedExecutor);
LOG(spam, "Setting up the disk");
for (uint32_t i = 0; i < numThreads; i++) {
diff --git a/vespalib/src/vespa/vespalib/util/runnable.h b/vespalib/src/vespa/vespalib/util/runnable.h
index 3b387a2067c..9d6481fbb19 100644
--- a/vespalib/src/vespa/vespalib/util/runnable.h
+++ b/vespalib/src/vespa/vespalib/util/runnable.h
@@ -3,14 +3,26 @@
#pragma once
#include <memory>
+#include <functional>
namespace vespalib {
+// Convenience macro used to create a function that can be used as an
+// init function when creating an executor to inject a frame with the
+// given name into the stack of all worker threads.
+
+#define VESPA_THREAD_STACK_TAG(name) \
+ int name(::vespalib::Runnable &worker) { \
+ worker.run(); \
+ return 1; \
+ }
+
/**
* Interface implemented in order to be run by a Thread.
**/
struct Runnable {
- typedef std::unique_ptr<Runnable> UP;
+ using UP = std::unique_ptr<Runnable>;
+ using init_fun_t = std::function<int(Runnable&)>;
/**
* Entry point called by the running thread
diff --git a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
index e5cc87c1937..c86597ca153 100644
--- a/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
+++ b/vespalib/src/vespa/vespalib/util/threadstackexecutorbase.h
@@ -16,16 +16,6 @@ namespace vespalib {
namespace thread { struct ThreadInit; }
-// Convenience macro used to create a function that can be used as an
-// init function when creating an executor to inject a frame with the
-// given name into the stack of all worker threads.
-
-#define VESPA_THREAD_STACK_TAG(name) \
- int name(::vespalib::Runnable &worker) { \
- worker.run(); \
- return 1; \
- }
-
/**
* An executor service that executes tasks in multiple threads.
**/
@@ -33,7 +23,6 @@ class ThreadStackExecutorBase : public SyncableThreadExecutor,
public Runnable
{
public:
- using init_fun_t = std::function<int(Runnable&)>;
using unique_lock = std::unique_lock<std::mutex>;
private: