diff options
3 files changed, 84 insertions, 32 deletions
diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 647bcf9bcee..4dded5d90ce 100644 --- a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -242,11 +242,16 @@ TEST("require that you get correct number of executors") { TEST("require that you distribute well") { auto seven = SequencedTaskExecutor::create(sequenced_executor, 7); const SequencedTaskExecutor & seq = dynamic_cast<const SequencedTaskExecutor &>(*seven); + const uint32_t NUM_EXACT = 8 * seven->getNumExecutors(); EXPECT_EQUAL(7u, seven->getNumExecutors()); EXPECT_EQUAL(97u, seq.getComponentHashSize()); EXPECT_EQUAL(0u, seq.getComponentEffectiveHashSize()); for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); + if (id < NUM_EXACT) { + EXPECT_EQUAL(id % seven->getNumExecutors(), seven->getExecutorId(id).getId()); + } else { + EXPECT_EQUAL(((id - NUM_EXACT) % 97) % seven->getNumExecutors(), seven->getExecutorId(id).getId()); + } } EXPECT_EQUAL(97u, seq.getComponentHashSize()); EXPECT_EQUAL(97u, seq.getComponentEffectiveHashSize()); @@ -272,8 +277,8 @@ TEST("require that similar names gets 7/8 unique ids with 8 executors") { EXPECT_EQUAL(3u, four->getExecutorIdFromName("f4").getId()); EXPECT_EQUAL(4u, four->getExecutorIdFromName("f5").getId()); EXPECT_EQUAL(5u, four->getExecutorIdFromName("f6").getId()); - EXPECT_EQUAL(2u, four->getExecutorIdFromName("f7").getId()); - EXPECT_EQUAL(6u, four->getExecutorIdFromName("f8").getId()); + EXPECT_EQUAL(6u, four->getExecutorIdFromName("f7").getId()); + EXPECT_EQUAL(7u, four->getExecutorIdFromName("f8").getId()); } TEST("Test creation of different types") { diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp index f92e1655e7d..954a63978f3 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -14,6 +14,8 @@ namespace { constexpr uint32_t stackSize = 128_Ki; constexpr uint8_t MAGIC = 255; +constexpr uint32_t NUM_PERFECT_PER_EXECUTOR = 8; +constexpr uint16_t INVALID_KEY = 0x8000; bool isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & executors) { @@ -25,6 +27,16 @@ isLazy(const std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> & ex return true; } +ssize_t +find(uint16_t key, const uint16_t values[], size_t numValues) { + for (size_t i(0); i < numValues; i++) { + if (key == values[i]) return i; + if (INVALID_KEY == values[i]) return -1; + } + return -1; +} + + } std::unique_ptr<ISequencedTaskExecutor> @@ -35,14 +47,14 @@ SequencedTaskExecutor::create(vespalib::Runnable::init_fun_t func, uint32_t thre size_t num_strands = std::min(taskLimit, threads*32); return std::make_unique<AdaptiveSequencedExecutor>(num_strands, threads, kindOfWatermark, taskLimit); } else { - auto executors = std::make_unique<std::vector<std::unique_ptr<SyncableThreadExecutor>>>(); - executors->reserve(threads); + auto executors = std::vector<std::unique_ptr<SyncableThreadExecutor>>(); + executors.reserve(threads); for (uint32_t id = 0; id < threads; ++id) { if (optimize == OptimizeFor::THROUGHPUT) { uint32_t watermark = kindOfWatermark == 0 ? taskLimit / 2 : kindOfWatermark; - executors->push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime)); + executors.push_back(std::make_unique<SingleExecutor>(func, taskLimit, watermark, reactionTime)); } else { - executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); + executors.push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit, func)); } } return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); @@ -54,21 +66,26 @@ SequencedTaskExecutor::~SequencedTaskExecutor() sync_all(); } -SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) - : ISequencedTaskExecutor(executors->size()), +SequencedTaskExecutor::SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executors) + : ISequencedTaskExecutor(executors.size()), _executors(std::move(executors)), - _lazyExecutors(isLazy(*_executors)), - _component2Id(vespalib::hashtable_base::getModuloStl(getNumExecutors()*8), MAGIC), + _lazyExecutors(isLazy(_executors)), + _component2IdPerfect(std::make_unique<PerfectKeyT[]>(getNumExecutors()*NUM_PERFECT_PER_EXECUTOR)), + _component2IdImperfect(vespalib::hashtable_base::getModuloStl(getNumExecutors()*NUM_PERFECT_PER_EXECUTOR), MAGIC), _mutex(), _nextId(0) { assert(getNumExecutors() < 256); + + for (size_t i(0); i < getNumExecutors() * NUM_PERFECT_PER_EXECUTOR; i++) { + _component2IdPerfect[i] = INVALID_KEY; + } } void SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) { - for (const auto &executor : *_executors) { + for (const auto &executor : _executors) { executor->setTaskLimit(taskLimit); } } @@ -76,15 +93,15 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) void SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { - assert(id.getId() < _executors->size()); - auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task)); + assert(id.getId() < _executors.size()); + auto rejectedTask = _executors[id.getId()]->execute(std::move(task)); assert(!rejectedTask); } void SequencedTaskExecutor::sync_all() { wakeup(); - for (auto &executor : *_executors) { + for (auto &executor : _executors) { executor->sync(); } } @@ -92,7 +109,7 @@ SequencedTaskExecutor::sync_all() { void SequencedTaskExecutor::wakeup() { if (_lazyExecutors) { - for (auto &executor : *_executors) { + for (auto &executor : _executors) { //Enforce parallel wakeup of napping executors. executor->wakeup(); } @@ -103,7 +120,7 @@ ExecutorStats SequencedTaskExecutor::getStats() { ExecutorStats accumulatedStats; - for (auto &executor :* _executors) { + for (auto &executor : _executors) { accumulatedStats.aggregate(executor->getStats()); } return accumulatedStats; @@ -111,15 +128,41 @@ SequencedTaskExecutor::getStats() ISequencedTaskExecutor::ExecutorId SequencedTaskExecutor::getExecutorId(uint64_t componentId) const { - uint32_t shrunkId = componentId % _component2Id.size(); - uint8_t executorId = _component2Id[shrunkId]; + auto id = getExecutorIdPerfect(componentId); + return id ? id.value() : getExecutorIdImPerfect(componentId); +} + +std::optional<ISequencedTaskExecutor::ExecutorId> +SequencedTaskExecutor::getExecutorIdPerfect(uint64_t componentId) const { + PerfectKeyT key = componentId & 0x7fff; + ssize_t pos = find(key, _component2IdPerfect.get(), getNumExecutors() * NUM_PERFECT_PER_EXECUTOR); + if (pos < 0) { + std::unique_lock guard(_mutex); + pos = find(key, _component2IdPerfect.get(), getNumExecutors() * NUM_PERFECT_PER_EXECUTOR); + if (pos < 0) { + pos = find(INVALID_KEY, _component2IdPerfect.get(), getNumExecutors() * NUM_PERFECT_PER_EXECUTOR); + if (pos >= 0) { + _component2IdPerfect[pos] = key; + } else { + // There was a race for the last spots + return std::optional<ISequencedTaskExecutor::ExecutorId>(); + } + } + } + return std::optional<ISequencedTaskExecutor::ExecutorId>(ExecutorId(pos % getNumExecutors())); +} + +ISequencedTaskExecutor::ExecutorId +SequencedTaskExecutor::getExecutorIdImPerfect(uint64_t componentId) const { + uint32_t shrunkId = componentId % _component2IdImperfect.size(); + uint8_t executorId = _component2IdImperfect[shrunkId]; if (executorId == MAGIC) { std::lock_guard guard(_mutex); - if (_component2Id[shrunkId] == MAGIC) { - _component2Id[shrunkId] = _nextId % getNumExecutors(); + if (_component2IdImperfect[shrunkId] == MAGIC) { + _component2IdImperfect[shrunkId] = _nextId % getNumExecutors(); _nextId++; } - executorId = _component2Id[shrunkId]; + executorId = _component2IdImperfect[shrunkId]; } return ExecutorId(executorId); } @@ -127,10 +170,10 @@ SequencedTaskExecutor::getExecutorId(uint64_t componentId) const { const vespalib::SyncableThreadExecutor* SequencedTaskExecutor::first_executor() const { - if (_executors->empty()) { + if (_executors.empty()) { return nullptr; } - return _executors->front().get(); + return _executors.front().get(); } } // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h index 245d6d29780..06e7fa65ac2 100644 --- a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -38,18 +38,22 @@ public: /** * For testing only */ - uint32_t getComponentHashSize() const { return _component2Id.size(); } + uint32_t getComponentHashSize() const { return _component2IdImperfect.size(); } uint32_t getComponentEffectiveHashSize() const { return _nextId; } const vespalib::SyncableThreadExecutor* first_executor() const; private: - explicit SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); - - std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; - const bool _lazyExecutors; - mutable std::vector<uint8_t> _component2Id; - mutable std::mutex _mutex; - mutable uint32_t _nextId; + explicit SequencedTaskExecutor(std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> executor); + std::optional<ExecutorId> getExecutorIdPerfect(uint64_t componentId) const; + ExecutorId getExecutorIdImPerfect(uint64_t componentId) const; + + std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>> _executors; + using PerfectKeyT = uint16_t; + const bool _lazyExecutors; + mutable std::unique_ptr<PerfectKeyT[]> _component2IdPerfect; + mutable std::vector<uint8_t> _component2IdImperfect; + mutable std::mutex _mutex; + mutable uint32_t _nextId; }; |