diff options
author | Henning Baldersheim <balder@oath.com> | 2018-06-11 23:23:26 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-01-03 09:50:27 +0000 |
commit | e549978610aa127df17492d80d892742de6fd479 (patch) | |
tree | c4c410432aace639c2f83fb77e700c9d7d81597a /searchlib | |
parent | 798f2fd1d9c85febd9bb56ccb4866c37826c3b43 (diff) |
Use a threadsafe and simpler hashing to map tasks to executors.
Diffstat (limited to 'searchlib')
7 files changed, 16 insertions, 56 deletions
diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp index 91ca91be4cd..513684d3fd5 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp @@ -14,32 +14,16 @@ ForegroundTaskExecutor::ForegroundTaskExecutor() } ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) - : _threads(threads), - _ids() + : ISequencedTaskExecutor(threads) { } -ForegroundTaskExecutor::~ForegroundTaskExecutor() -{ -} - -ISequencedTaskExecutor::ExecutorId -ForegroundTaskExecutor::getExecutorId(uint64_t componentId) -{ - auto itr = _ids.find(componentId); - if (itr == _ids.end()) { - auto insarg = std::make_pair(componentId, ExecutorId(_ids.size() % _threads)); - auto insres = _ids.insert(insarg); - assert(insres.second); - itr = insres.first; - } - return itr->second; -} +ForegroundTaskExecutor::~ForegroundTaskExecutor() = default; void ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { - assert(id.getId() < _threads); + assert(id.getId() < getNumExecutors()); task->run(); } @@ -48,5 +32,4 @@ ForegroundTaskExecutor::sync() { } - } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h index cfd135d3fa0..2074eda009b 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h @@ -16,8 +16,6 @@ namespace search { */ class ForegroundTaskExecutor : public ISequencedTaskExecutor { - const uint32_t _threads; - vespalib::hash_map<size_t, ExecutorId> _ids; public: using ISequencedTaskExecutor::getExecutorId; @@ -25,8 +23,6 @@ public: ForegroundTaskExecutor(uint32_t threads); ~ForegroundTaskExecutor() override; - uint32_t getNumExecutors() const override { return _threads; } - ExecutorId getExecutorId(uint64_t componentId) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; }; diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h index a8b2a722c01..866f9f60423 100644 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h @@ -25,6 +25,7 @@ public: private: uint32_t _id; }; + ISequencedTaskExecutor(uint32_t numExecutors) : _numExecutors(numExecutors) { } virtual ~ISequencedTaskExecutor() { } /** @@ -34,10 +35,12 @@ public: * @param componentId component id * @return executor id */ - virtual ExecutorId getExecutorId(uint64_t componentId) = 0; - virtual uint32_t getNumExecutors() const = 0; + ExecutorId getExecutorId(uint64_t componentId) const { + return ExecutorId((componentId * 1099511628211ULL ) % _numExecutors); + } + uint32_t getNumExecutors() const { return _numExecutors; } - ExecutorId getExecutorId(vespalib::stringref componentId) { + ExecutorId getExecutorId(vespalib::stringref componentId) const { vespalib::hash<vespalib::stringref> hashfun; return getExecutorId(hashfun(componentId)); } @@ -96,7 +99,8 @@ public: void execute(ExecutorId id, FunctionType &&function) { executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); } - +private: + uint32_t _numExecutors; }; } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 5306cabba8c..bb779b659ab 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -16,7 +16,8 @@ constexpr uint32_t stackSize = 128 * 1024; SequencedTaskExecutor::SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit) - : _executors() + : ISequencedTaskExecutor(threads), + _executors() { for (uint32_t id = 0; id < threads; ++id) { auto executor = std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit); @@ -37,19 +38,6 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) } } -ISequencedTaskExecutor::ExecutorId -SequencedTaskExecutor::getExecutorId(uint64_t componentId) -{ - auto itr = _ids.find(componentId); - if (itr == _ids.end()) { - auto insarg = std::make_pair(componentId, ExecutorId(_ids.size() % _executors.size())); - auto insres = _ids.insert(insarg); - assert(insres.second); - itr = insres.first; - } - return itr->second; -} - void SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { @@ -59,7 +47,6 @@ SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP t assert(!rejectedTask); } - void SequencedTaskExecutor::sync() { diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h index 7551e82e489..2b7e70d69c7 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h @@ -20,7 +20,6 @@ class SequencedTaskExecutor : public ISequencedTaskExecutor { using Stats = vespalib::ExecutorStats; std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors; - vespalib::hash_map<size_t, ExecutorId> _ids; public: using ISequencedTaskExecutor::getExecutorId; @@ -28,8 +27,6 @@ public: ~SequencedTaskExecutor(); void setTaskLimit(uint32_t taskLimit); - uint32_t getNumExecutors() const override { return _executors.size(); } - ExecutorId getExecutorId(uint64_t componentId) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; Stats getStats(); diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp index b693c976ebe..04504086520 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp @@ -5,7 +5,8 @@ namespace search { SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor) - : _executor(executor), + : ISequencedTaskExecutor(executor.getNumExecutors()), + _executor(executor), _executeCnt(0u), _syncCnt(0u), _executeHistory(), @@ -15,12 +16,6 @@ SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecu SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default; -ISequencedTaskExecutor::ExecutorId -SequencedTaskExecutorObserver::getExecutorId(uint64_t componentId) -{ - return _executor.getExecutorId(componentId); -} - void SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) { diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h index b4561148bca..b2de71f06b3 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h @@ -23,10 +23,8 @@ public: using ISequencedTaskExecutor::getExecutorId; SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor); - virtual ~SequencedTaskExecutorObserver() override; + ~SequencedTaskExecutorObserver() override; - uint32_t getNumExecutors() const override { return _executor.getNumExecutors(); } - ExecutorId getExecutorId(uint64_t componentId) override; void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; void sync() override; |