summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-01-15 08:08:37 +0100
committerGitHub <noreply@github.com>2022-01-15 08:08:37 +0100
commit33ba95d090430acd6f362f9525705048d74360ba (patch)
tree9d0e32af8a7758216568056fd34545c8cb53842e
parent6eaa34d5ec1e51805ab0819650621bc23c89ddc4 (diff)
parent162162cf26e1d1d51848a4c92195648807bd1458 (diff)
Merge pull request #20800 from vespa-engine/balder/add-an-interface-that-can-accept-a-tasklist
Add an interface that can post a list of task instead of only one at …
-rw-r--r--staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp23
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp11
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h21
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp17
-rw-r--r--staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h5
5 files changed, 62 insertions, 15 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
index ef7f8bfb0f6..243935d4013 100644
--- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
+++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp
@@ -96,6 +96,23 @@ TEST_F("require that task with same component id are serialized", Fixture)
EXPECT_EQUAL(42, tv->_val);
}
+TEST_F("require that task with same component id are serialized when executed with list", Fixture)
+{
+ std::shared_ptr<TestObj> tv(std::make_shared<TestObj>());
+ EXPECT_EQUAL(0, tv->_val);
+ ISequencedTaskExecutor::ExecutorId executorId = f._threads->getExecutorId(0);
+ ISequencedTaskExecutor::TaskList list;
+ list.template emplace_back(executorId, makeLambdaTask([=]() { usleep(2000); tv->modify(0, 14); }));
+ list.template emplace_back(executorId, makeLambdaTask([=]() { tv->modify(14, 42); }));
+ f._threads->executeTasks(std::move(list));
+ tv->wait(2);
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+ f._threads->sync_all();
+ EXPECT_EQUAL(0, tv->_fail);
+ EXPECT_EQUAL(42, tv->_val);
+}
+
TEST_F("require that task with different component ids are not serialized", Fixture)
{
int tryCnt = 0;
@@ -136,7 +153,8 @@ TEST_F("require that task with same string component id are serialized", Fixture
namespace {
-int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
+int
+detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit)
{
int tryCnt = 0;
for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) {
@@ -158,7 +176,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t
return tryCnt;
}
-vespalib::string makeAltComponentId(Fixture &f)
+vespalib::string
+makeAltComponentId(Fixture &f)
{
int tryCnt = 0;
char altComponentId[20];
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
index c54f182891c..b31d72da3b1 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp
@@ -12,9 +12,16 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors)
ISequencedTaskExecutor::~ISequencedTaskExecutor() = default;
+void
+ISequencedTaskExecutor::executeTasks(TaskList tasks) {
+ for (auto & task : tasks) {
+ executeTask(task.first, std::move(task.second));
+ }
+}
+
ISequencedTaskExecutor::ExecutorId
-ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const {
- vespalib::hash<vespalib::stringref> hashfun;
+ISequencedTaskExecutor::getExecutorIdFromName(stringref componentId) const {
+ hash<stringref> hashfun;
return getExecutorId(hashfun(componentId));
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
index 3fe6fb5d678..ff90556e3e4 100644
--- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
+++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h
@@ -14,7 +14,7 @@ namespace vespalib {
* Interface class to run multiple tasks in parallel, but tasks with same
* id has to be run in sequence.
*/
-class ISequencedTaskExecutor : public vespalib::IWakeup
+class ISequencedTaskExecutor : public IWakeup
{
public:
class ExecutorId {
@@ -28,6 +28,7 @@ public:
private:
uint32_t _id;
};
+ using TaskList = std::vector<std::pair<ExecutorId, Executor::Task::UP>>;
ISequencedTaskExecutor(uint32_t numExecutors);
virtual ~ISequencedTaskExecutor();
@@ -40,7 +41,7 @@ public:
virtual ExecutorId getExecutorId(uint64_t componentId) const = 0;
uint32_t getNumExecutors() const { return _numExecutors; }
- ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const;
+ ExecutorId getExecutorIdFromName(stringref componentId) const;
/**
* Returns an executor id that is NOT equal to the given executor id,
@@ -58,7 +59,13 @@ public:
* @param id which internal executor to use
* @param task unique pointer to the task to be executed
*/
- virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0;
+ virtual void executeTask(ExecutorId id, Executor::Task::UP task) = 0;
+ /**
+ * Schedule a list of tasks to run after all previously scheduled tasks with
+ * same id. Default is to just iterate and execute one by one, but implementations
+ * that can schedule all in one go more efficiently can implement this one.
+ */
+ virtual void executeTasks(TaskList tasks);
/**
* Call this one to ensure you get the attention of the workers.
*/
@@ -74,7 +81,7 @@ public:
*/
template <class FunctionType>
void executeLambda(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
* Wait for all scheduled tasks to complete.
@@ -83,7 +90,7 @@ public:
virtual void setTaskLimit(uint32_t taskLimit) = 0;
- virtual vespalib::ExecutorStats getStats() = 0;
+ virtual ExecutorStats getStats() = 0;
/**
* Wrap lambda function into a task and schedule it to be run.
@@ -96,7 +103,7 @@ public:
template <class FunctionType>
void execute(uint64_t componentId, FunctionType &&function) {
ExecutorId id = getExecutorId(componentId);
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
/**
@@ -109,7 +116,7 @@ public:
*/
template <class FunctionType>
void execute(ExecutorId id, FunctionType &&function) {
- executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function)));
+ executeTask(id, makeLambdaTask(std::forward<FunctionType>(function)));
}
private:
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
index 5ae8e96b606..d81b8ec1db6 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp
@@ -17,7 +17,7 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu
SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default;
void
-SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task)
+SequencedTaskExecutorObserver::executeTask(ExecutorId id, Executor::Task::UP task)
{
++_executeCnt;
{
@@ -28,6 +28,19 @@ SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Ta
}
void
+SequencedTaskExecutorObserver::executeTasks(TaskList tasks)
+{
+ _executeCnt += tasks.size();
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ for (const auto & task : tasks) {
+ _executeHistory.emplace_back(task.first.getId());
+ }
+ }
+ _executor.executeTasks(std::move(tasks));
+}
+
+void
SequencedTaskExecutorObserver::sync_all()
{
++_syncCnt;
@@ -45,7 +58,7 @@ void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) {
_executor.setTaskLimit(taskLimit);
}
-vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() {
+ExecutorStats SequencedTaskExecutorObserver::getStats() {
return _executor.getStats();
}
diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
index 7e2bf968952..1d54283c393 100644
--- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
+++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h
@@ -24,10 +24,11 @@ public:
~SequencedTaskExecutorObserver() override;
ExecutorId getExecutorId(uint64_t componentId) const override;
- void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override;
+ void executeTask(ExecutorId id, Executor::Task::UP task) override;
+ void executeTasks(TaskList tasks) override;
void sync_all() override;
void setTaskLimit(uint32_t taskLimit) override;
- vespalib::ExecutorStats getStats() override;
+ ExecutorStats getStats() override;
uint32_t getExecuteCnt() const { return _executeCnt; }
uint32_t getSyncCnt() const { return _syncCnt; }