diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-26 10:13:45 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-06-26 10:13:45 +0000 |
commit | 7e20842be3eec0f0881749098a2c944e5a1a258e (patch) | |
tree | 304bb5e1f5bcd785d33ad6fbf89d20632454b691 /staging_vespalib | |
parent | bb367946be112361611f62fb6803c5060cfe9dde (diff) |
Let getExecutorId be a pure interface.
Diffstat (limited to 'staging_vespalib')
9 files changed, 60 insertions, 50 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 6128386837d..df94e70f9d6 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -239,14 +239,15 @@ TEST("require that you get correct number of executors") { TEST("require that you distribute well") { auto seven = SequencedTaskExecutor::create(7); + const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven); EXPECT_EQUAL(7u, seven->getNumExecutors()); - EXPECT_EQUAL(97u, seven->getComponentHashSize()); - EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize()); + EXPECT_EQUAL(97u, seq.getComponentHashSize()); + EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize()); for (uint32_t id=0; id < 1000; id++) { EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); } - EXPECT_EQUAL(97u, seven->getComponentHashSize()); - EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); + EXPECT_EQUAL(97u, seq.getComponentHashSize()); + EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize()); } TEST("Test creation of different types") { diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp index b45ada1c58c..f295d2b30c1 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -39,4 +39,9 @@ vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); } +ISequencedTaskExecutor::ExecutorId +ForegroundTaskExecutor::getExecutorId(uint64_t componentId) const { + return ExecutorId(componentId%getNumExecutors()); +} + } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h index d9a348ed012..f7b3ff8eab0 100644 --- a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h @@ -21,11 +21,10 @@ public: ForegroundTaskExecutor(uint32_t threads); ~ForegroundTaskExecutor() override; + ExecutorId getExecutorId(uint64_t componentId) const override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; - void setTaskLimit(uint32_t taskLimit) override; - vespalib::ExecutorStats getStats() override; private: std::atomic<uint64_t> _accepted; diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp index f8f1f64fac5..af3ce5fe64f 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -2,22 +2,12 @@ #include "isequencedtaskexecutor.h" #include <vespa/vespalib/stllike/hash_fun.h> -#include <vespa/vespalib/stllike/hashtable.h> -#include <cassert> namespace vespalib { -namespace { - constexpr uint8_t MAGIC = 255; -} - ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) - : _component2Id(vespalib::hashtable_base::getModuloStl(numExecutors*8), MAGIC), - _mutex(), - _numExecutors(numExecutors), - _nextId(0) + : _numExecutors(numExecutors) { - assert(numExecutors < 256); } ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; @@ -28,19 +18,4 @@ ISequencedTaskExecutor::getExecutorIdFromName(vespalib::stringref componentId) c return getExecutorId(hashfun(componentId)); } -ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorId(uint64_t componentId) const { - uint32_t shrunkId = componentId % _component2Id.size(); - uint8_t executorId = _component2Id[shrunkId]; - if (executorId == MAGIC) { - std::lock_guard guard(_mutex); - if (_component2Id[shrunkId] == MAGIC) { - _component2Id[shrunkId] = _nextId % getNumExecutors(); - _nextId++; - } - executorId = _component2Id[shrunkId]; - } - return ExecutorId(executorId); -} - } diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h index 034e1520b8d..d457de26f54 100644 --- a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -37,7 +37,7 @@ public: * @param componentId component id * @return executor id */ - virtual ExecutorId getExecutorId(uint64_t componentId) const; + virtual ExecutorId getExecutorId(uint64_t componentId) const = 0; uint32_t getNumExecutors() const { return _numExecutors; } ExecutorId getExecutorIdFromName(vespalib::stringref componentId) const; @@ -98,16 +98,9 @@ public: void execute(ExecutorId id, FunctionType &&function) { executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); } - /** - * For testing only - */ - uint32_t getComponentHashSize() const { return _component2Id.size(); } - uint32_t getComponentEffectiveHashSize() const { return _nextId; } + private: - mutable std::vector<uint8_t> _component2Id; - mutable std::mutex _mutex; uint32_t _numExecutors; - mutable uint32_t _nextId; }; } diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index 0fd78d8dcf6..963264a62e7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -4,12 +4,15 @@ #include "adaptive_sequenced_executor.h" #include "singleexecutor.h" #include <vespa/vespalib/util/blockingthreadstackexecutor.h> +#include <vespa/vespalib/stllike/hashtable.h> +#include <cassert> namespace vespalib { namespace { constexpr uint32_t stackSize = 128 * 1024; +constexpr uint8_t MAGIC = 255; } @@ -42,8 +45,12 @@ SequencedTaskExecutor::~SequencedTaskExecutor() SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) : ISequencedTaskExecutor(executors->size()), - _executors(std::move(executors)) + _executors(std::move(executors)), + _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC), + _mutex(), + _nextId(0) { + assert(getNumExecutors() < 256); } void @@ -87,4 +94,19 @@ SequencedTaskExecutor::getStats() return accumulatedStats; } +ISequencedTaskExecutor::ExecutorId +SequencedTaskExecutor::getExecutorId(uint64_t componentId) const { + uint32_t shrunkId = componentId % _component2Id.size(); + uint8_t executorId = _component2Id[shrunkId]; + if (executorId == MAGIC) { + std::lock_guard guard(_mutex); + if (_component2Id[shrunkId] == MAGIC) { + _component2Id[shrunkId] = _nextId % getNumExecutors(); + _nextId++; + } + executorId = _component2Id[shrunkId]; + } + return ExecutorId(executorId); +} + } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index b3dd400478a..c37bd2eecf4 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -14,11 +14,8 @@ class SyncableThreadExecutor; */ class SequencedTaskExecutor final : public ISequencedTaskExecutor { - using Stats = vespalib::ExecutorStats; - std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; - - SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); public: + using Stats = vespalib::ExecutorStats; using ISequencedTaskExecutor::getExecutorId; using OptimizeFor = vespalib::Executor::OptimizeFor; @@ -26,6 +23,7 @@ public: void setTaskLimit(uint32_t taskLimit) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + ExecutorId getExecutorId(uint64_t componentId) const override; void sync() override; Stats getStats() override; @@ -35,6 +33,19 @@ public: */ static std::unique_ptr<ISequencedTaskExecutor> create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY, uint32_t kindOfWatermark = 0, duration reactionTime = 10ms); + /** + * For testing only + */ + uint32_t getComponentHashSize() const { return _component2Id.size(); } + uint32_t getComponentEffectiveHashSize() const { return _nextId; } +private: + SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); + + std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; + mutable std::vector<uint8_t> _component2Id; + mutable std::mutex _mutex; + mutable uint32_t _nextId; + }; } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp index 3d9ed4e21f4..d6a89117d68 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp @@ -49,4 +49,9 @@ vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { return _executor.getStats(); } +ISequencedTaskExecutor::ExecutorId +SequencedTaskExecutorObserver::getExecutorId(uint64_t componentId) const { + return _executor.getExecutorId(componentId); +} + } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h index 9307a7ddb37..6bcdf08ae5c 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h @@ -23,16 +23,15 @@ public: SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor); ~SequencedTaskExecutorObserver() override; + ExecutorId getExecutorId(uint64_t componentId) const override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; + void setTaskLimit(uint32_t taskLimit) override; + vespalib::ExecutorStats getStats() override; uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } std::vector<uint32_t> getExecuteHistory(); - - void setTaskLimit(uint32_t taskLimit) override; - - vespalib::ExecutorStats getStats() override; }; } // namespace search |