diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-25 09:29:39 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-25 09:34:25 +0000 |
commit | 8004167e964b1eb12a67777ec9d70d57a5dbbfed (patch) | |
tree | b7282fdac9b62926324ca0f3c19c8d21dd75319f /staging_vespalib/src | |
parent | 0680bf96a4bf17aec0b9fde98ac5369c0991f0fb (diff) |
Let the executor create an executor id to its liking.
Taske full advantage over strands instead of being limited to threads.
Diffstat (limited to 'staging_vespalib/src')
7 files changed, 27 insertions, 16 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp index 10f3f6089e3..9c06ecd3c8f 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -63,6 +63,8 @@ public: } }; +vespalib::stringref ZERO("0"); + TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -118,8 +120,8 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [&]() { tv->modify(14, 42); }; - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId("0"), test2); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); @@ -136,8 +138,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); }); + f._threads.execute(f._threads.getExecutorIdFromName(ZERO), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorIdFromName(altComponentId), [&]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -156,10 +158,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorIdFromName(ZERO); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads.getExecutorId(altComponentId) == executorId0) { + if (f._threads.getExecutorIdFromName(altComponentId) == executorId0) { break; } } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 70d0f1c743d..6128386837d 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -65,6 +65,8 @@ public: } }; +vespalib::stringref ZERO("0"); + TEST_F("testExecute", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -120,8 +122,8 @@ TEST_F("require that task with same string component id are serialized", Fixture std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); auto test2 = [=]() { tv->modify(14, 42); }; - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId("0"), test2); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), test2); tv->wait(2); EXPECT_EQUAL(0, tv->_fail); EXPECT_EQUAL(42, tv->_val); @@ -138,8 +140,8 @@ int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int t for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); + f._threads->execute(f._threads->getExecutorIdFromName(ZERO), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorIdFromName(altComponentId), [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -158,10 +160,10 @@ vespalib::string makeAltComponentId(Fixture &f) { int tryCnt = 0; char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorIdFromName(ZERO); for (tryCnt = 1; tryCnt < 100; ++tryCnt) { sprintf(altComponentId, "%d", tryCnt); - if (f._threads->getExecutorId(altComponentId) == executorId0) { + if (f._threads->getExecutorIdFromName(altComponentId) == executorId0) { break; } } diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp index 50bc3b020a8..3e87749c794 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -256,6 +256,11 @@ AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() assert(_worker_stack.empty()); } +ISequencedTaskExecutor::ExecutorId +AdaptiveSequencedExecutor::getExecutorId(uint64_t component) const { + return ExecutorId(component % _strands.size()); +} + void AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) { diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h index bc3457a72ef..a4d3ac97758 100644 --- a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -117,6 +117,7 @@ public: AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, size_t max_waiting, size_t max_pending); ~AdaptiveSequencedExecutor() override; + ExecutorId getExecutorId(uint64_t component) const override; void executeTask(ExecutorId id, Task::UP task) override; void sync() override; void setTaskLimit(uint32_t task_limit) override; diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index d05702cc85b..f8f1f64fac5 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -23,7 +23,7 @@ ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const { +ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) const { vespalib::hash<vespalib::stringref> hashfun; return getExecutorId(hashfun(componentId)); } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index cd2a6c6f0d8..034e1520b8d 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -37,10 +37,10 @@ public: * @param componentId component id * @return executor id */ - ExecutorId getExecutorId(uint64_t componentId) const; + virtual ExecutorId getExecutorId(uint64_t componentId) const; uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorId(vespalib::stringref componentId) const; + ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; /** * Schedule a task to run after all previously scheduled tasks with diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index a0c2f0ac237..0fd78d8dcf6 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -18,7 +18,8 @@ std::unique_ptr<ISequencedTaskExecutor> SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize, uint32_t kindOfWatermark, duration reactionTime) { if (optimize == OptimizeFor::ADAPTIVE) { - return std::make_unique<AdaptiveSequencedExecutor>(threads, threads, kindOfWatermark, taskLimit); + size_t num_strands = std::min(taskLimit, threads*32); + return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit); } else { auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>(); executors->reserve(threads); |