From 162162cf26e1d1d51848a4c92195648807bd1458 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 13 Jan 2022 12:50:28 +0000 Subject: Add an interface that can post a list of task instead of only one at a time. Intention is to make it cheaper to post many small tasks. It requires that the implementation adds support if it find it worthwhile. --- .../sequencedtaskexecutor_test.cpp | 23 ++++++++++++++++++++-- .../vespa/vespalib/util/isequencedtaskexecutor.cpp | 11 +++++++++-- .../vespa/vespalib/util/isequencedtaskexecutor.h | 21 +++++++++++++------- .../util/sequencedtaskexecutorobserver.cpp | 17 ++++++++++++++-- .../vespalib/util/sequencedtaskexecutorobserver.h | 5 +++-- 5 files changed, 62 insertions(+), 15 deletions(-) (limited to 'staging_vespalib') diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index ef7f8bfb0f6..243935d4013 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -96,6 +96,23 @@ TEST_F("require that task with same component id are serialized", Fixture) EXPECT_EQUAL(42, tv->_val); } +TEST_F("require that task with same component id are serialized when executed with list", Fixture) +{ + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + ISequencedTaskExecutor::ExecutorId executorId = f._threads->getExecutorId(0); + ISequencedTaskExecutor::TaskList list; + list.template emplace_back(executorId, makeLambdaTask([=]() { usleep(2000); tv->modify(0, 14); })); + list.template emplace_back(executorId, makeLambdaTask([=]() { tv->modify(14, 42); })); + f._threads->executeTasks(std::move(list)); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads->sync_all(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + TEST_F("require that task with different component ids are not serialized", Fixture) { int tryCnt = 0; @@ -136,7 +153,8 @@ TEST_F("require that task with same string component id are serialized", Fixture namespace { -int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) +int +detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) { int tryCnt = 0; for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { @@ -158,7 +176,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t return tryCnt; } -vespalib::string makeAltComponentId(Fixture &f) +vespalib::string +makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index c54f182891c..b31d72da3b1 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -12,9 +12,16 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; +void +ISequencedTaskExecutor::executeTasks(TaskList tasks) { + for (auto & task : tasks) { + executeTask(task.first, std::move(task.second)); + } +} + ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const { - vespalib::hash hashfun; +ISequencedTaskExecutor::getExecutorIdFromName(stringref componentId) const { + hash hashfun; return getExecutorId(hashfun(componentId)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index 3fe6fb5d678..ff90556e3e4 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -14,7 +14,7 @@ namespace vespalib { * Interface class to run multiple tasks in parallel, but tasks with same * id has to be run in sequence. */ -class ISequencedTaskExecutor : public vespalib::IWakeup +class ISequencedTaskExecutor : public IWakeup { public: class ExecutorId { @@ -28,6 +28,7 @@ public: private: uint32_t _id; }; + using TaskList = std::vector>; ISequencedTaskExecutor(uint32_t numExecutors); virtual ~ISequencedTaskExecutor(); @@ -40,7 +41,7 @@ public: virtual ExecutorId getExecutorId(uint64_t componentId) const = 0; uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; + ExecutorId getExecutorIdFromName(stringref componentId) const; /** * Returns an executor id that is NOT equal to the given executor id, @@ -58,7 +59,13 @@ public: * @param id which internal executor to use * @param task unique pointer to the task to be executed */ - virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; + virtual void executeTask(ExecutorId id, Executor::Task::UP task) = 0; + /** + * Schedule a list of tasks to run after all previously scheduled tasks with + * same id. Default is to just iterate and execute one by one, but implementations + * that can schedule all in one go more efficiently can implement this one. + */ + virtual void executeTasks(TaskList tasks); /** * Call this one to ensure you get the attention of the workers. */ @@ -74,7 +81,7 @@ public: */ template void executeLambda(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward(function))); + executeTask(id, makeLambdaTask(std::forward(function))); } /** * Wait for all scheduled tasks to complete. @@ -83,7 +90,7 @@ public: virtual void setTaskLimit(uint32_t taskLimit) = 0; - virtual vespalib::ExecutorStats getStats() = 0; + virtual ExecutorStats getStats() = 0; /** * Wrap lambda function into a task and schedule it to be run. @@ -96,7 +103,7 @@ public: template void execute(uint64_t componentId, FunctionType &&function) { ExecutorId id = getExecutorId(componentId); - executeTask(id, vespalib::makeLambdaTask(std::forward(function))); + executeTask(id, makeLambdaTask(std::forward(function))); } /** @@ -109,7 +116,7 @@ public: */ template void execute(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward(function))); + executeTask(id, makeLambdaTask(std::forward(function))); } private: diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp index 5ae8e96b606..d81b8ec1db6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp @@ -17,7 +17,7 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default; void -SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +SequencedTaskExecutorObserver::executeTask(ExecutorId id, Executor::Task::UP task) { ++_executeCnt; { @@ -27,6 +27,19 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta _executor.executeTask(id, std::move(task)); } +void +SequencedTaskExecutorObserver::executeTasks(TaskList tasks) +{ + _executeCnt += tasks.size(); + { + std::lock_guard guard(_mutex); + for (const auto & task : tasks) { + _executeHistory.emplace_back(task.first.getId()); + } + } + _executor.executeTasks(std::move(tasks)); +} + void SequencedTaskExecutorObserver::sync_all() { @@ -45,7 +58,7 @@ void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { _executor.setTaskLimit(taskLimit); } -vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { +ExecutorStats SequencedTaskExecutorObserver::getStats() { return _executor.getStats(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h index 7e2bf968952..1d54283c393 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h @@ -24,10 +24,11 @@ public: ~SequencedTaskExecutorObserver() override; ExecutorId getExecutorId(uint64_t componentId) const override; - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void executeTask(ExecutorId id, Executor::Task::UP task) override; + void executeTasks(TaskList tasks) override; void sync_all() override; void setTaskLimit(uint32_t taskLimit) override; - vespalib::ExecutorStats getStats() override; + ExecutorStats getStats() override; uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } -- cgit v1.2.3