diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-05-09 15:44:01 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-05-09 15:44:01 +0200 |
commit | c7efb4ecdde276e192e813bf52956cf6a0ca9c9c (patch) | |
tree | 0ce748833cd76f6fef9bffe4a479f3ac86da560f /searchlib | |
parent | 39c689d83745455d7105da09500eb7232fde45cd (diff) | |
parent | e97f8c7eef48f3e5c206ef01e745601ceff8bd28 (diff) |
Merge pull request #2377 from yahoo/toregge/replace-task-id-with-executor-id-and-component-id-in-sequenced-task-executor
Toregge/replace task id with executor id and component id in sequenced task executor
Diffstat (limited to 'searchlib')
9 files changed, 205 insertions, 56 deletions
diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp index 98436364ea0..e8183319fd4 100644 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -82,7 +82,7 @@ TEST_F("testExecute", Fixture) { } -TEST_F("require that task with same id are serialized", Fixture) +TEST_F("require that task with same component id are serialized", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -96,7 +96,7 @@ TEST_F("require that task with same id are serialized", Fixture) EXPECT_EQUAL(42, tv->_val); } -TEST_F("require that task with different ids are not serialized", Fixture) +TEST_F("require that task with different component ids are not serialized", Fixture) { int tryCnt = 0; for (tryCnt = 0; tryCnt < 100; ++tryCnt) { @@ -119,7 +119,7 @@ TEST_F("require that task with different ids are not serialized", Fixture) } -TEST_F("require that task with same string id are serialized", Fixture) +TEST_F("require that task with same string component id are serialized", Fixture) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); @@ -134,15 +134,17 @@ TEST_F("require that task with same string id are serialized", Fixture) EXPECT_EQUAL(42, tv->_val); } -TEST_F("require that task with different string ids are not serialized", - Fixture) +namespace +{ + +int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) { int tryCnt = 0; - for (tryCnt = 0; tryCnt < 100; ++tryCnt) { + for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); EXPECT_EQUAL(0, tv->_val); f._threads.execute("0", [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute("2", [=]() { tv->modify(14, 42); }); + f._threads.execute(altComponentId, [=]() { tv->modify(14, 42); }); tv->wait(2); if (tv->_fail != 1) { continue; @@ -154,10 +156,44 @@ TEST_F("require that task with different string ids are not serialized", EXPECT_EQUAL(14, tv->_val); break; } + return tryCnt; +} + +vespalib::string makeAltComponentId(Fixture &f) +{ + int tryCnt = 0; + char altComponentId[20]; + uint32_t executorId0 = f._threads.getExecutorId("0"); + for (tryCnt = 1; tryCnt < 100; ++tryCnt) { + sprintf(altComponentId, "%d", tryCnt); + if (f._threads.getExecutorId(altComponentId) == executorId0) { + break; + } + } + EXPECT_TRUE(tryCnt < 100); + return altComponentId; +} + +} + +TEST_F("require that task with different string component ids are not serialized", + Fixture) +{ + int tryCnt = detectSerializeFailure(f, "2", 100); EXPECT_TRUE(tryCnt < 100); } +TEST_F("require that task with different string component ids mapping to the same executor id are serialized", + Fixture) +{ + vespalib::string altComponentId = makeAltComponentId(f); + LOG(info, "second string component id is \"%s\"", altComponentId.c_str()); + int tryCnt = detectSerializeFailure(f, altComponentId, 100); + EXPECT_TRUE(tryCnt == 100); +} + + TEST_F("require that execute works with const lambda", Fixture) { int i = 5; @@ -187,6 +223,19 @@ TEST_F("require that execute works with reference to lambda", Fixture) EXPECT_EQUAL(5, i); } +TEST_F("require that executeLambda works", Fixture) +{ + int i = 5; + std::vector<int> res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads.executeLambda(0, lambda); + f._threads.sync(); + std::vector<int> exp({5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + } // namespace common } // namespace search diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt index 0399e0ddc5c..fd369581a80 100644 --- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt @@ -23,6 +23,7 @@ vespa_add_library(searchlib_common OBJECT rcuvector.cpp resultset.cpp sequencedtaskexecutor.cpp + sequencedtaskexecutorobserver.cpp serialnumfileheadercontext.cpp sort.cpp sortdata.cpp diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp index 2c50f20df30..595c91f9637 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp @@ -12,15 +12,14 @@ using vespalib::ThreadStackExecutor; namespace search { -namespace +ForegroundTaskExecutor::ForegroundTaskExecutor() + : ForegroundTaskExecutor(1) { - -constexpr uint32_t stackSize = 128 * 1024; - } - -ForegroundTaskExecutor::ForegroundTaskExecutor() +ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) + : _threads(threads), + _ids() { } @@ -28,12 +27,23 @@ ForegroundTaskExecutor::~ForegroundTaskExecutor() { } +uint32_t +ForegroundTaskExecutor::getExecutorId(uint64_t componentId) +{ + auto itr = _ids.find(componentId); + if (itr == _ids.end()) { + auto insarg = std::make_pair(componentId, _ids.size() % _threads); + auto insres = _ids.insert(insarg); + assert(insres.second); + itr = insres.first; + } + return itr->second; +} void -ForegroundTaskExecutor::executeTask(uint64_t id, - vespalib::Executor::Task::UP task) +ForegroundTaskExecutor::executeTask(uint32_t executorId, vespalib::Executor::Task::UP task) { - (void) id; + assert(executorId < _threads); task->run(); } diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h index ee481f5e496..cb3745de501 100644 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h @@ -2,6 +2,7 @@ #pragma once #include "isequencedtaskexecutor.h" +#include <vespa/vespalib/stllike/hash_map.h> namespace vespalib { @@ -21,13 +22,19 @@ namespace search */ class ForegroundTaskExecutor : public ISequencedTaskExecutor { + const uint32_t _threads; + vespalib::hash_map<size_t, uint32_t> _ids; public: + using ISequencedTaskExecutor::getExecutorId; + ForegroundTaskExecutor(); + ForegroundTaskExecutor(uint32_t threads); ~ForegroundTaskExecutor(); - virtual void executeTask(uint64_t id, - vespalib::Executor::Task::UP task) override; + virtual uint32_t getExecutorId(uint64_t componentId) override; + + virtual void executeTask(uint32_t executorId, vespalib::Executor::Task::UP task) override; virtual void sync() override; }; diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h index f978cb30ff5..f1c0073dd45 100644 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h @@ -18,16 +18,41 @@ public: virtual ~ISequencedTaskExecutor() { } /** + * Calculate which executor will handle an component. All callers + * must be in the same thread. + * + * @param componentId component id + * @return executor id + */ + virtual uint32_t getExecutorId(uint64_t componentId) = 0; + + inline uint32_t getExecutorId(vespalib::stringref componentId) { + vespalib::hash<vespalib::stringref> hashfun; + return getExecutorId(hashfun(componentId)); + } + + /** * Schedule a task to run after all previously scheduled tasks with - * same id. All tasks must be scheduled from same thread. + * same id. * - * @param id task id. + * @param executorId which internal executor to use * @param task unique pointer to the task to be executed */ - virtual void executeTask(uint64_t id, - vespalib::Executor::Task::UP task) = 0; + virtual void executeTask(uint32_t exeucutorId, vespalib::Executor::Task::UP task) = 0; /** + * Wrap lambda function into a task and schedule it to be run. + * Caller must ensure that pointers and references are valid and + * call sync before tearing down pointed to/referenced data. + * + * @param executorId which internal executor to use + * @param function function to be wrapped in a task and later executed + */ + template <class FunctionType> + inline void executeLambda(uint32_t executorId, FunctionType &&function) { + executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function))); + } + /** * Wait for all scheduled tasks to complete. */ virtual void sync() = 0; @@ -38,12 +63,13 @@ public: * call sync before tearing down pointed to/referenced data. * All tasks must be scheduled from same thread. * - * @param id task id. - * @param function function to be wrapped in a task and later executed + * @param componentId component id + * @param function function to be wrapped in a task and later executed */ template <class FunctionType> - inline void execute(uint64_t id, FunctionType &&function) { - executeTask(id, makeLambdaTask(std::forward<FunctionType>(function))); + inline void execute(uint64_t componentId, FunctionType &&function) { + uint32_t executorId = getExecutorId(componentId); + executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function))); } /** @@ -52,13 +78,13 @@ public: * call sync before tearing down pointed to/referenced data. * All tasks must be scheduled from same thread. * - * @param id task id. - * @param function function to be wrapped in a task and later executed + * @param componentId component id + * @param function function to be wrapped in a task and later executed */ template <class FunctionType> - inline void execute(const vespalib::stringref id, FunctionType &&function) { - vespalib::hash<vespalib::stringref> hashfun; - executeTask(hashfun(id), + inline void execute(vespalib::stringref componentId, FunctionType &&function) { + uint32_t executorId = getExecutorId(componentId); + executeTask(executorId, makeLambdaTask(std::forward<FunctionType>(function))); } }; diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp index 00398ef684d..062853f1275 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp @@ -35,18 +35,23 @@ SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) } } -void -SequencedTaskExecutor::executeTask(uint64_t id, - vespalib::Executor::Task::UP task) +uint32_t +SequencedTaskExecutor::getExecutorId(uint64_t componentId) { - auto itr = _ids.find(id); + auto itr = _ids.find(componentId); if (itr == _ids.end()) { - auto insarg = std::make_pair(id, _ids.size() % _executors.size()); + auto insarg = std::make_pair(componentId, _ids.size() % _executors.size()); auto insres = _ids.insert(insarg); assert(insres.second); itr = insres.first; } - size_t executorId = itr->second; + return itr->second; +} + +void +SequencedTaskExecutor::executeTask(uint32_t executorId, vespalib::Executor::Task::UP task) +{ + assert(executorId < _executors.size()); vespalib::ThreadStackExecutorBase &executor(*_executors[executorId]); auto rejectedTask = executor.execute(std::move(task)); assert(!rejectedTask); diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h index 4f20c5b14a4..acfec837309 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h @@ -24,14 +24,17 @@ class SequencedTaskExecutor : public ISequencedTaskExecutor std::vector<std::shared_ptr<vespalib::BlockingThreadStackExecutor>> _executors; vespalib::hash_map<size_t, size_t> _ids; public: + using ISequencedTaskExecutor::getExecutorId; + SequencedTaskExecutor(uint32_t threads, uint32_t taskLimit = 1000); ~SequencedTaskExecutor(); void setTaskLimit(uint32_t taskLimit); - virtual void executeTask(uint64_t id, - vespalib::Executor::Task::UP task) override; + virtual uint32_t getExecutorId(uint64_t componentId) override; + + virtual void executeTask(uint32_t executorId, vespalib::Executor::Task::UP task) override; virtual void sync() override; }; diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp new file mode 100644 index 00000000000..b0a8fb05e41 --- /dev/null +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp @@ -0,0 +1,53 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "sequencedtaskexecutorobserver.h" + +namespace search +{ + +SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor) + : _executor(executor), + _executeCnt(0u), + _syncCnt(0u), + _executeHistory(), + _mutex() +{ +} + +SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() +{ +} + +uint32_t +SequencedTaskExecutorObserver::getExecutorId(uint64_t componentId) +{ + return _executor.getExecutorId(componentId); +} + +void +SequencedTaskExecutorObserver::executeTask(uint32_t executorId, + vespalib::Executor::Task::UP task) +{ + ++_executeCnt; + { + std::lock_guard<std::mutex> guard(_mutex); + _executeHistory.emplace_back(executorId); + } + _executor.executeTask(executorId, std::move(task)); +} + +void +SequencedTaskExecutorObserver::sync() +{ + ++_syncCnt; + _executor.sync(); +} + +std::vector<uint32_t> +SequencedTaskExecutorObserver::getExecuteHistory() +{ + std::lock_guard<std::mutex> guard(_mutex); + return _executeHistory; +} + +} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h index ffc6ba7f55b..12a84d0bd81 100644 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h +++ b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h @@ -3,6 +3,8 @@ #include "isequencedtaskexecutor.h" #include <atomic> +#include <vector> +#include <mutex> namespace search { @@ -16,29 +18,22 @@ class SequencedTaskExecutorObserver : public ISequencedTaskExecutor ISequencedTaskExecutor &_executor; std::atomic<uint32_t> _executeCnt; std::atomic<uint32_t> _syncCnt; + std::vector<uint32_t> _executeHistory; + std::mutex _mutex; public: - SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor) - : _executor(executor), - _executeCnt(0u), - _syncCnt(0u) - { - } + using ISequencedTaskExecutor::getExecutorId; - virtual ~SequencedTaskExecutorObserver() { } + SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor); - virtual void executeTask(uint64_t id, - vespalib::Executor::Task::UP task) override { - ++_executeCnt; - _executor.executeTask(id, std::move(task)); - } - - virtual void sync() override { - ++_syncCnt; - _executor.sync(); - } + virtual ~SequencedTaskExecutorObserver() override; + virtual uint32_t getExecutorId(uint64_t componentId) override; + virtual void executeTask(uint32_t executorId, + vespalib::Executor::Task::UP task) override; + virtual void sync() override; uint32_t getExecuteCnt() const { return _executeCnt; } uint32_t getSyncCnt() const { return _syncCnt; } + std::vector<uint32_t> getExecuteHistory(); }; } // namespace search |