From d6c99a96fc8d3d2ced331998a81a26beb6acc825 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Thu, 11 Nov 2021 11:39:30 +0000 Subject: Add a fixed size table of 8 * num_exutors with 16 bit entries. Use this for mapping the first components exact. With more components than 8x we fall abck to to using shrunk id of 8 bits as before. This enables perfect distribution for the first 8x and then 'good enough' for the rest. The more there are the less impact of imperfect distribution will be. --- .../sequencedtaskexecutor_test.cpp | 11 +++- .../vespa/vespalib/util/sequencedtaskexecutor.cpp | 69 +++++++++++++++------- .../vespa/vespalib/util/sequencedtaskexecutor.h | 20 ++++--- 3 files changed, 68 insertions(+), 32 deletions(-) (limited to 'staging_vespalib') diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 647bcf9bcee..4dded5d90ce 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -242,11 +242,16 @@ TEST("require that you get correct number of executors") { TEST("require that you distribute well") { auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); const SequencedTaskExecutor & seq = dynamic_cast(*seven); + const uint32_t NUM_EXACT = 8 * seven->getNumExecutors(); EXPECT_EQUAL(7u, seven->getNumExecutors()); EXPECT_EQUAL(97u, seq.getComponentHashSize()); EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize()); for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); + if (id < NUM_EXACT) { + EXPECT_EQUAL(id % seven->getNumExecutors(), seven->getExecutorId(id).getId()); + } else { + EXPECT_EQUAL(((id - NUM_EXACT) % 97) % seven->getNumExecutors(), seven->getExecutorId(id).getId()); + } } EXPECT_EQUAL(97u, seq.getComponentHashSize()); EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize()); @@ -272,8 +277,8 @@ TEST("require that similar names gets 7/8 unique ids with 8 executors") { EXPECT_EQUAL(3u, four->getExecutorIdFromName("f4").getId()); EXPECT_EQUAL(4u, four->getExecutorIdFromName("f5").getId()); EXPECT_EQUAL(5u, four->getExecutorIdFromName("f6").getId()); - EXPECT_EQUAL(2u, four->getExecutorIdFromName("f7").getId()); - EXPECT_EQUAL(6u, four->getExecutorIdFromName("f8").getId()); + EXPECT_EQUAL(6u, four->getExecutorIdFromName("f7").getId()); + EXPECT_EQUAL(7u, four->getExecutorIdFromName("f8").getId()); } TEST("Test creation of different types") { diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index f92e1655e7d..685f670b164 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -35,14 +35,14 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre size_t num_strands = std::min(taskLimit, threads*32); return std::make_unique(num_strands, threads, kindOfWatermark, taskLimit); } else { - auto executors = std::make_unique>>(); - executors->reserve(threads); + auto executors = std::vector>(); + executors.reserve(threads); for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 2 : kindOfWatermark; - executors->push_back(std::make_unique(func, taskLimit, watermark, reactionTime)); + executors.push_back(std::make_unique(func, taskLimit, watermark, reactionTime)); } else { - executors->push_back(std::make_unique(1, stackSize, taskLimit, func)); + executors.push_back(std::make_unique(1, stackSize, taskLimit, func)); } } return std::unique_ptr(new SequencedTaskExecutor(std::move(executors))); @@ -54,21 +54,23 @@ SequencedTaskExecutor::~SequencedTaskExecutor() sync_all(); } -SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr>> executors) - : ISequencedTaskExecutor(executors->size()), +SequencedTaskExecutor::SequencedTaskExecutor(std::vector> executors) + : ISequencedTaskExecutor(executors.size()), _executors(std::move(executors)), - _lazyExecutors(isLazy(*_executors)), - _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC), + _lazyExecutors(isLazy(_executors)), + _component2IdPerfect(), + _component2IdImperfect(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC), _mutex(), _nextId(0) { assert(getNumExecutors() < 256); + _component2IdPerfect.reserve(getNumExecutors()*8); } void SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) { - for (const auto &executor : *_executors) { + for (const auto &executor : _executors) { executor->setTaskLimit(taskLimit); } } @@ -76,15 +78,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) void SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { - assert(id.getId() < _executors->size()); - auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task)); + assert(id.getId() < _executors.size()); + auto rejectedTask = _executors[id.getId()]->execute(std::move(task)); assert(!rejectedTask); } void SequencedTaskExecutor::sync_all() { wakeup(); - for (auto &executor : *_executors) { + for (auto &executor : _executors) { executor->sync(); } } @@ -92,7 +94,7 @@ SequencedTaskExecutor::sync_all() { void SequencedTaskExecutor::wakeup() { if (_lazyExecutors) { - for (auto &executor : *_executors) { + for (auto &executor : _executors) { //Enforce parallel wakeup of napping executors. executor->wakeup(); } @@ -103,7 +105,7 @@ ExecutorStats SequencedTaskExecutor::getStats() { ExecutorStats accumulatedStats; - for (auto &executor :* _executors) { + for (auto &executor : _executors) { accumulatedStats.aggregate(executor->getStats()); } return accumulatedStats; @@ -111,15 +113,40 @@ SequencedTaskExecutor::getStats() ISequencedTaskExecutor::ExecutorId SequencedTaskExecutor::getExecutorId(uint64_t componentId) const { - uint32_t shrunkId = componentId % _component2Id.size(); - uint8_t executorId = _component2Id[shrunkId]; + PerfectKeyT key = componentId; + auto found = std::find(_component2IdPerfect.begin(), _component2IdPerfect.end(), key); + if (found != _component2IdPerfect.end()) { + return ExecutorId((found - _component2IdPerfect.begin()) % getNumExecutors()); + } else if (_component2IdPerfect.size() < _component2IdPerfect.capacity()) { + return getExecutorIdPerfect(componentId); + } else { + return getExecutorIdImPerfect(componentId); + } +} + +ISequencedTaskExecutor::ExecutorId +SequencedTaskExecutor::getExecutorIdPerfect(uint64_t componentId) const { + PerfectKeyT key = componentId; + std::lock_guard guard(_mutex); + auto found = std::find(_component2IdPerfect.begin(), _component2IdPerfect.end(), key); + if (found == _component2IdPerfect.end()) { + _component2IdPerfect.push_back(key); + found = _component2IdPerfect.end() - 1; + } + return ExecutorId((found - _component2IdPerfect.begin()) % getNumExecutors()); +} + +ISequencedTaskExecutor::ExecutorId +SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const { + uint32_t shrunkId = componentId % _component2IdImperfect.size(); + uint8_t executorId = _component2IdImperfect[shrunkId]; if (executorId == MAGIC) { std::lock_guard guard(_mutex); - if (_component2Id[shrunkId] == MAGIC) { - _component2Id[shrunkId] = _nextId % getNumExecutors(); + if (_component2IdImperfect[shrunkId] == MAGIC) { + _component2IdImperfect[shrunkId] = _nextId % getNumExecutors(); _nextId++; } - executorId = _component2Id[shrunkId]; + executorId = _component2IdImperfect[shrunkId]; } return ExecutorId(executorId); } @@ -127,10 +154,10 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const { const vespalib::SyncableThreadExecutor* SequencedTaskExecutor::first_executor() const { - if (_executors->empty()) { + if (_executors.empty()) { return nullptr; } - return _executors->front().get(); + return _executors.front().get(); } } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 245d6d29780..660bc74472b 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -38,18 +38,22 @@ public: /** * For testing only */ - uint32_t getComponentHashSize() const { return _component2Id.size(); } + uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); } uint32_t getComponentEffectiveHashSize() const { return _nextId; } const vespalib::SyncableThreadExecutor* first_executor() const; private: - explicit SequencedTaskExecutor(std::unique_ptr>> executor); - - std::unique_ptr>> _executors; - const bool _lazyExecutors; - mutable std::vector _component2Id; - mutable std::mutex _mutex; - mutable uint32_t _nextId; + explicit SequencedTaskExecutor(std::vector> executor); + ExecutorId getExecutorIdPerfect(uint64_t componentId) const; + ExecutorId getExecutorIdImPerfect(uint64_t componentId) const; + + std::vector> _executors; + using PerfectKeyT = uint16_t; + const bool _lazyExecutors; + mutable std::vector _component2IdPerfect; + mutable std::vector _component2IdImperfect; + mutable std::mutex _mutex; + mutable uint32_t _nextId; }; -- cgit v1.2.3