diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-04 22:20:35 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-04 22:35:17 +0000 |
commit | 416ff1764ce98954b3b15fcae0f6a50d76b38323 (patch) | |
tree | 8974071929be2d3723db0a14567dcbeb2f7a1797 /searchlib | |
parent | 130d4607a359ae2740bdeeb0179a731751f979a0 (diff) |
Move sequenced task executors to staging vespalib
Diffstat (limited to 'searchlib')
31 files changed, 41 insertions, 1668 deletions
diff --git a/searchlib/CMakeLists.txt b/searchlib/CMakeLists.txt index a76baeced04..4237bede9d5 100644 --- a/searchlib/CMakeLists.txt +++ b/searchlib/CMakeLists.txt @@ -100,11 +100,9 @@ vespa_define_module( src/tests/bitvector src/tests/bytecomplens src/tests/common/bitvector - src/tests/common/foregroundtaskexecutor src/tests/common/location src/tests/common/matching_elements src/tests/common/resultset - src/tests/common/sequencedtaskexecutor src/tests/common/struct_field_mapper src/tests/common/summaryfeatures src/tests/diskindex/bitvector diff --git a/searchlib/src/apps/tests/memoryindexstress_test.cpp b/searchlib/src/apps/tests/memoryindexstress_test.cpp index dc950b84508..6bbd93bda84 100644 --- a/searchlib/src/apps/tests/memoryindexstress_test.cpp +++ b/searchlib/src/apps/tests/memoryindexstress_test.cpp @@ -9,7 +9,6 @@ #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/repo/fixedtyperepo.h> #include <vespa/searchlib/common/scheduletaskcallback.h> -#include <vespa/searchlib/common/sequencedtaskexecutor.h> #include <vespa/searchlib/fef/matchdata.h> #include <vespa/searchlib/fef/matchdatalayout.h> #include <vespa/searchlib/fef/termfieldmatchdata.h> @@ -24,6 +23,7 @@ #include <vespa/searchlib/util/rand48.h> #include <vespa/vespalib/testkit/testapp.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <vespa/log/log.h> LOG_SETUP("memoryindexstress_test"); @@ -195,8 +195,8 @@ struct Fixture { Schema schema; DocumentTypeRepo repo; vespalib::ThreadStackExecutor _executor; - std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads; - std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<vespalib::ISequencedTaskExecutor> _pushThreads; MemoryIndex index; uint32_t _readThreads; vespalib::ThreadStackExecutor _writer; // 1 write thread @@ -247,8 +247,8 @@ Fixture::Fixture(uint32_t readThreads) : schema(makeSchema()), repo(makeDocTypeRepoConfig()), _executor(1, 128 * 1024), - _invertThreads(search::SequencedTaskExecutor::create(2)), - _pushThreads(search::SequencedTaskExecutor::create(2)), + _invertThreads(vespalib::SequencedTaskExecutor::create(2)), + _pushThreads(vespalib::SequencedTaskExecutor::create(2)), index(schema, MockFieldLengthInspector(), *_invertThreads, *_pushThreads), _readThreads(readThreads), _writer(1, 128 * 1024), diff --git a/searchlib/src/tests/common/foregroundtaskexecutor/.gitignore b/searchlib/src/tests/common/foregroundtaskexecutor/.gitignore deleted file mode 100644 index 0bd7759156b..00000000000 --- a/searchlib/src/tests/common/foregroundtaskexecutor/.gitignore +++ /dev/null @@ -1 +0,0 @@ -searchlib_foregroundtaskexecutor_test_app diff --git a/searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt b/searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt deleted file mode 100644 index 64354d54396..00000000000 --- a/searchlib/src/tests/common/foregroundtaskexecutor/CMakeLists.txt +++ /dev/null @@ -1,8 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(searchlib_foregroundtaskexecutor_test_app TEST - SOURCES - foregroundtaskexecutor_test.cpp - DEPENDS - searchlib -) -vespa_add_test(NAME searchlib_foregroundtaskexecutor_test_app COMMAND searchlib_foregroundtaskexecutor_test_app) diff --git a/searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp b/searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp deleted file mode 100644 index 0cbd4bd9473..00000000000 --- a/searchlib/src/tests/common/foregroundtaskexecutor/foregroundtaskexecutor_test.cpp +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/searchlib/common/foregroundtaskexecutor.h> -#include <vespa/vespalib/testkit/testapp.h> - -#include <mutex> -#include <condition_variable> -#include <unistd.h> - -#include <vespa/log/log.h> -LOG_SETUP("foregroundtaskexecutor_test"); - -namespace search::common { - - -class Fixture -{ -public: - ForegroundTaskExecutor _threads; - - Fixture() - : _threads() - { - } -}; - - -class TestObj -{ -public: - std::mutex _m; - std::condition_variable _cv; - int _done; - int _fail; - int _val; - - TestObj() - : _m(), - _cv(), - _done(0), - _fail(0), - _val(0) - { - } - - void - modify(int oldValue, int newValue) - { - { - std::lock_guard<std::mutex> guard(_m); - if (_val == oldValue) { - _val = newValue; - } else { - ++_fail; - } - ++_done; - } - _cv.notify_all(); - } - - void - wait(int wantDone) - { - std::unique_lock<std::mutex> guard(_m); - _cv.wait(guard, [=] { return this->_done >= wantDone; }); - } -}; - -TEST_F("testExecute", Fixture) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(1, [=]() { tv->modify(0, 42); }); - tv->wait(1); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - - -TEST_F("require that task with same id are serialized", Fixture) -{ - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(0, [=]() { tv->modify(14, 42); }); - tv->wait(2); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - -TEST_F("require that task with different ids are serialized", Fixture) -{ - int tryCnt = 0; - for (tryCnt = 0; tryCnt < 100; ++tryCnt) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(1, [=]() { tv->modify(14, 42); }); - tv->wait(2); - if (tv->_fail != 1) { - continue; - } - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - break; - } - EXPECT_TRUE(tryCnt >= 100); -} - - -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/.gitignore b/searchlib/src/tests/common/sequencedtaskexecutor/.gitignore deleted file mode 100644 index 4bd94f124fb..00000000000 --- a/searchlib/src/tests/common/sequencedtaskexecutor/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -searchlib_sequencedtaskexecutor_test_app -searchlib_sequencedtaskexecutor_benchmark_app diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt b/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt deleted file mode 100644 index fd6cd9efc43..00000000000 --- a/searchlib/src/tests/common/sequencedtaskexecutor/CMakeLists.txt +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -vespa_add_executable(searchlib_sequencedtaskexecutor_benchmark_app TEST - SOURCES - sequencedtaskexecutor_benchmark.cpp - DEPENDS - searchlib -) - -vespa_add_executable(searchlib_sequencedtaskexecutor_test_app TEST - SOURCES - sequencedtaskexecutor_test.cpp - DEPENDS - searchlib -) -vespa_add_test(NAME searchlib_sequencedtaskexecutor_test_app COMMAND searchlib_sequencedtaskexecutor_test_app) - -vespa_add_executable(searchlib_adaptive_sequenced_executor_test_app TEST - SOURCES - adaptive_sequenced_executor_test.cpp - DEPENDS - searchlib -) -vespa_add_test(NAME searchlib_adaptive_sequenced_executor_test_app COMMAND searchlib_adaptive_sequenced_executor_test_app) diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp deleted file mode 100644 index ba66b28108c..00000000000 --- a/searchlib/src/tests/common/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp +++ /dev/null @@ -1,251 +0,0 @@ -// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/searchlib/common/adaptive_sequenced_executor.h> -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/vespalib/test/insertion_operators.h> - -#include <mutex> -#include <condition_variable> -#include <unistd.h> - -#include <vespa/log/log.h> -LOG_SETUP("adaptive_sequenced_executor_test"); - -namespace search::common { - - -class Fixture -{ -public: - AdaptiveSequencedExecutor _threads; - - Fixture() : _threads(2, 2, 0, 1000) { } -}; - - -class TestObj -{ -public: - std::mutex _m; - std::condition_variable _cv; - int _done; - int _fail; - int _val; - - TestObj() - : _m(), - _cv(), - _done(0), - _fail(0), - _val(0) - { - } - - void - modify(int oldValue, int newValue) - { - { - std::lock_guard<std::mutex> guard(_m); - if (_val == oldValue) { - _val = newValue; - } else { - ++_fail; - } - ++_done; - } - _cv.notify_all(); - } - - void - wait(int wantDone) - { - std::unique_lock<std::mutex> guard(_m); - _cv.wait(guard, [&] { return this->_done >= wantDone; }); - } -}; - -TEST_F("testExecute", Fixture) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(1, [&]() { tv->modify(0, 42); }); - tv->wait(1); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - - -TEST_F("require that task with same component id are serialized", Fixture) -{ - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(0, [&]() { tv->modify(14, 42); }); - tv->wait(2); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - -TEST_F("require that task with different component ids are not serialized", Fixture) -{ - int tryCnt = 0; - for (tryCnt = 0; tryCnt < 100; ++tryCnt) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(2, [&]() { tv->modify(14, 42); }); - tv->wait(2); - if (tv->_fail != 1) { - continue; - } - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - break; - } - EXPECT_TRUE(tryCnt < 100); -} - - -TEST_F("require that task with same string component id are serialized", Fixture) -{ - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - auto test2 = [&]() { tv->modify(14, 42); }; - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId("0"), test2); - tv->wait(2); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - -namespace { - -int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) -{ - int tryCnt = 0; - for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); - f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); }); - tv->wait(2); - if (tv->_fail != 1) { - continue; - } - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - f._threads.sync(); - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - break; - } - return tryCnt; -} - -vespalib::string makeAltComponentId(Fixture &f) -{ - int tryCnt = 0; - char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); - for (tryCnt = 1; tryCnt < 100; ++tryCnt) { - sprintf(altComponentId, "%d", tryCnt); - if (f._threads.getExecutorId(altComponentId) == executorId0) { - break; - } - } - EXPECT_TRUE(tryCnt < 100); - return altComponentId; -} - -} - -TEST_F("require that task with different string component ids are not serialized", Fixture) -{ - int tryCnt = detectSerializeFailure(f, "2", 100); - EXPECT_TRUE(tryCnt < 100); -} - - -TEST_F("require that task with different string component ids mapping to the same executor id are serialized", - Fixture) -{ - vespalib::string altComponentId = makeAltComponentId(f); - LOG(info, "second string component id is \"%s\"", altComponentId.c_str()); - int tryCnt = detectSerializeFailure(f, altComponentId, 100); - EXPECT_TRUE(tryCnt == 100); -} - - -TEST_F("require that execute works with const lambda", Fixture) -{ - int i = 5; - std::vector<int> res; - const auto lambda = [i, &res]() mutable - { res.push_back(i--); res.push_back(i--); }; - f._threads.execute(0, lambda); - f._threads.execute(0, lambda); - f._threads.sync(); - std::vector<int> exp({5, 4, 5, 4}); - EXPECT_EQUAL(exp, res); - EXPECT_EQUAL(5, i); -} - -TEST_F("require that execute works with reference to lambda", Fixture) -{ - int i = 5; - std::vector<int> res; - auto lambda = [i, &res]() mutable - { res.push_back(i--); res.push_back(i--); }; - auto &lambdaref = lambda; - f._threads.execute(0, lambdaref); - f._threads.execute(0, lambdaref); - f._threads.sync(); - std::vector<int> exp({5, 4, 5, 4}); - EXPECT_EQUAL(exp, res); - EXPECT_EQUAL(5, i); -} - -TEST_F("require that executeLambda works", Fixture) -{ - int i = 5; - std::vector<int> res; - const auto lambda = [i, &res]() mutable - { res.push_back(i--); res.push_back(i--); }; - f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); - f._threads.sync(); - std::vector<int> exp({5, 4}); - EXPECT_EQUAL(exp, res); - EXPECT_EQUAL(5, i); -} - -TEST("require that you get correct number of executors") { - AdaptiveSequencedExecutor seven(7, 1, 0, 10); - EXPECT_EQUAL(7u, seven.getNumExecutors()); -} - -TEST("require that you distribute well") { - AdaptiveSequencedExecutor seven(7, 1, 0, 10); - EXPECT_EQUAL(7u, seven.getNumExecutors()); - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize()); - for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId()); - } - EXPECT_EQUAL(97u, seven.getComponentHashSize()); - EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize()); -} - -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp deleted file mode 100644 index 362dc28d36a..00000000000 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/searchlib/common/sequencedtaskexecutor.h> -#include <vespa/searchlib/common/adaptive_sequenced_executor.h> -#include <vespa/vespalib/util/lambdatask.h> -#include <vespa/vespalib/util/time.h> -#include <atomic> - -using search::ISequencedTaskExecutor; -using search::SequencedTaskExecutor; -using search::AdaptiveSequencedExecutor; -using ExecutorId = search::ISequencedTaskExecutor::ExecutorId; - -size_t do_work(size_t size) { - size_t ret = 0; - for (size_t i = 0; i < size; ++i) { - for (size_t j = 0; j < 128; ++j) { - ret = (ret + i) * j; - } - } - return ret; -} - -struct SimpleParams { - int argc; - char **argv; - int idx; - SimpleParams(int argc_in, char **argv_in) : argc(argc_in), argv(argv_in), idx(0) {} - int next(const char *name, int fallback) { - ++idx; - int value = 0; - if (argc > idx) { - value = atoi(argv[idx]); - } else { - value = fallback; - } - fprintf(stderr, "param %s: %d\n", name, value); - return value; - } -}; - -int main(int argc, char **argv) { - SimpleParams params(argc, argv); - bool use_adaptive_executor = params.next("use_adaptive_executor", 0); - bool optimize_for_throughput = params.next("optimize_for_throughput", 0); - size_t num_tasks = params.next("num_tasks", 1000000); - size_t num_strands = params.next("num_strands", 4); - size_t task_limit = params.next("task_limit", 1000); - size_t num_threads = params.next("num_threads", num_strands); - size_t max_waiting = params.next("max_waiting", optimize_for_throughput ? 32 : 0); - size_t work_size = params.next("work_size", 0); - std::atomic<long> counter(0); - std::unique_ptr<ISequencedTaskExecutor> executor; - if (use_adaptive_executor) { - executor = std::make_unique<AdaptiveSequencedExecutor>(num_strands, num_threads, max_waiting, task_limit); - } else { - auto optimize = optimize_for_throughput - ? vespalib::Executor::OptimizeFor::THROUGHPUT - : vespalib::Executor::OptimizeFor::LATENCY; - executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize); - } - vespalib::Timer timer; - for (size_t task_id = 0; task_id < num_tasks; ++task_id) { - executor->executeTask(ExecutorId(task_id % num_strands), - vespalib::makeLambdaTask([&counter,work_size] { (void) do_work(work_size); counter++; })); - } - executor.reset(); - fprintf(stderr, "\ntotal time: %zu ms\n", vespalib::count_ms(timer.elapsed())); - return (size_t(counter) == num_tasks) ? 0 : 1; -} diff --git a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp deleted file mode 100644 index c311a59a56c..00000000000 --- a/searchlib/src/tests/common/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp +++ /dev/null @@ -1,251 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include <vespa/searchlib/common/sequencedtaskexecutor.h> -#include <vespa/vespalib/testkit/testapp.h> -#include <vespa/vespalib/test/insertion_operators.h> - -#include <mutex> -#include <condition_variable> -#include <unistd.h> - -#include <vespa/log/log.h> -LOG_SETUP("sequencedtaskexecutor_test"); - -namespace search::common { - - -class Fixture -{ -public: - std::unique_ptr<ISequencedTaskExecutor> _threads; - - Fixture() : _threads(SequencedTaskExecutor::create(2)) { } -}; - - -class TestObj -{ -public: - std::mutex _m; - std::condition_variable _cv; - int _done; - int _fail; - int _val; - - TestObj() - : _m(), - _cv(), - _done(0), - _fail(0), - _val(0) - { - } - - void - modify(int oldValue, int newValue) - { - { - std::lock_guard<std::mutex> guard(_m); - if (_val == oldValue) { - _val = newValue; - } else { - ++_fail; - } - ++_done; - } - _cv.notify_all(); - } - - void - wait(int wantDone) - { - std::unique_lock<std::mutex> guard(_m); - _cv.wait(guard, [=] { return this->_done >= wantDone; }); - } -}; - -TEST_F("testExecute", Fixture) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads->execute(1, [=]() { tv->modify(0, 42); }); - tv->wait(1); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads->sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - - -TEST_F("require that task with same component id are serialized", Fixture) -{ - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(0, [=]() { tv->modify(14, 42); }); - tv->wait(2); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads->sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - -TEST_F("require that task with different component ids are not serialized", Fixture) -{ - int tryCnt = 0; - for (tryCnt = 0; tryCnt < 100; ++tryCnt) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(2, [=]() { tv->modify(14, 42); }); - tv->wait(2); - if (tv->_fail != 1) { - continue; - } - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - f._threads->sync(); - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - break; - } - EXPECT_TRUE(tryCnt < 100); -} - - -TEST_F("require that task with same string component id are serialized", Fixture) -{ - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - auto test2 = [=]() { tv->modify(14, 42); }; - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId("0"), test2); - tv->wait(2); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); - f._threads->sync(); - EXPECT_EQUAL(0, tv->_fail); - EXPECT_EQUAL(42, tv->_val); -} - -namespace { - -int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) -{ - int tryCnt = 0; - for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { - std::shared_ptr<TestObj> tv(std::make_shared<TestObj>()); - EXPECT_EQUAL(0, tv->_val); - f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); - f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); - tv->wait(2); - if (tv->_fail != 1) { - continue; - } - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - f._threads->sync(); - EXPECT_EQUAL(1, tv->_fail); - EXPECT_EQUAL(14, tv->_val); - break; - } - return tryCnt; -} - -vespalib::string makeAltComponentId(Fixture &f) -{ - int tryCnt = 0; - char altComponentId[20]; - ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); - for (tryCnt = 1; tryCnt < 100; ++tryCnt) { - sprintf(altComponentId, "%d", tryCnt); - if (f._threads->getExecutorId(altComponentId) == executorId0) { - break; - } - } - EXPECT_TRUE(tryCnt < 100); - return altComponentId; -} - -} - -TEST_F("require that task with different string component ids are not serialized", Fixture) -{ - int tryCnt = detectSerializeFailure(f, "2", 100); - EXPECT_TRUE(tryCnt < 100); -} - - -TEST_F("require that task with different string component ids mapping to the same executor id are serialized", - Fixture) -{ - vespalib::string altComponentId = makeAltComponentId(f); - LOG(info, "second string component id is \"%s\"", altComponentId.c_str()); - int tryCnt = detectSerializeFailure(f, altComponentId, 100); - EXPECT_TRUE(tryCnt == 100); -} - - -TEST_F("require that execute works with const lambda", Fixture) -{ - int i = 5; - std::vector<int> res; - const auto lambda = [i, &res]() mutable - { res.push_back(i--); res.push_back(i--); }; - f._threads->execute(0, lambda); - f._threads->execute(0, lambda); - f._threads->sync(); - std::vector<int> exp({5, 4, 5, 4}); - EXPECT_EQUAL(exp, res); - EXPECT_EQUAL(5, i); -} - -TEST_F("require that execute works with reference to lambda", Fixture) -{ - int i = 5; - std::vector<int> res; - auto lambda = [i, &res]() mutable - { res.push_back(i--); res.push_back(i--); }; - auto &lambdaref = lambda; - f._threads->execute(0, lambdaref); - f._threads->execute(0, lambdaref); - f._threads->sync(); - std::vector<int> exp({5, 4, 5, 4}); - EXPECT_EQUAL(exp, res); - EXPECT_EQUAL(5, i); -} - -TEST_F("require that executeLambda works", Fixture) -{ - int i = 5; - std::vector<int> res; - const auto lambda = [i, &res]() mutable - { res.push_back(i--); res.push_back(i--); }; - f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); - f._threads->sync(); - std::vector<int> exp({5, 4}); - EXPECT_EQUAL(exp, res); - EXPECT_EQUAL(5, i); -} - -TEST("require that you get correct number of executors") { - auto seven = SequencedTaskExecutor::create(7); - EXPECT_EQUAL(7u, seven->getNumExecutors()); -} - -TEST("require that you distribute well") { - auto seven = SequencedTaskExecutor::create(7); - EXPECT_EQUAL(7u, seven->getNumExecutors()); - EXPECT_EQUAL(97u, seven->getComponentHashSize()); - EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize()); - for (uint32_t id=0; id < 1000; id++) { - EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); - } - EXPECT_EQUAL(97u, seven->getComponentHashSize()); - EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); -} - -} - -TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp index 92d0659d984..c2fd048ce3e 100644 --- a/searchlib/src/tests/diskindex/fusion/fusion_test.cpp +++ b/searchlib/src/tests/diskindex/fusion/fusion_test.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/searchlib/common/sequencedtaskexecutor.h> #include <vespa/searchlib/diskindex/diskindex.h> #include <vespa/searchlib/diskindex/fusion.h> #include <vespa/searchlib/diskindex/indexbuilder.h> @@ -19,6 +18,7 @@ #include <vespa/vespalib/btree/btreeroot.hpp> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <gtest/gtest.h> #include <vespa/log/log.h> @@ -37,6 +37,7 @@ using search::common::FileHeaderContext; using search::index::schema::CollectionType; using search::index::schema::DataType; using search::index::test::MockFieldLengthInspector; +using vespalib::SequencedTaskExecutor; using namespace index; diff --git a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp index 60b21699406..58f800dec23 100644 --- a/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp +++ b/searchlib/src/tests/memoryindex/document_inverter/document_inverter_test.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/searchlib/common/sequencedtaskexecutor.h> #include <vespa/searchlib/index/docbuilder.h> #include <vespa/searchlib/index/field_length_calculator.h> #include <vespa/searchlib/memoryindex/document_inverter.h> @@ -9,6 +8,8 @@ #include <vespa/searchlib/memoryindex/i_field_index_collection.h> #include <vespa/searchlib/memoryindex/word_store.h> #include <vespa/searchlib/test/memoryindex/ordered_field_index_inserter.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> + #include <vespa/vespalib/gtest/gtest.h> namespace search { @@ -18,6 +19,8 @@ using index::DocBuilder; using index::Schema; using index::schema::CollectionType; using index::schema::DataType; +using vespalib::SequencedTaskExecutor; +using vespalib::ISequencedTaskExecutor; using namespace index; diff --git a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp index 7a0c240dea5..ed6bd7168f2 100644 --- a/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp +++ b/searchlib/src/tests/memoryindex/field_index/field_index_test.cpp @@ -1,6 +1,5 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/searchlib/common/sequencedtaskexecutor.h> #include <vespa/searchlib/diskindex/fusion.h> #include <vespa/searchlib/diskindex/indexbuilder.h> #include <vespa/searchlib/diskindex/zcposoccrandread.h> @@ -19,6 +18,8 @@ #include <vespa/searchlib/test/memoryindex/wrap_inserter.h> #include <vespa/vespalib/btree/btreenodeallocator.hpp> #include <vespa/vespalib/btree/btreeroot.hpp> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> + #include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> @@ -38,6 +39,9 @@ using search::index::schema::CollectionType; using search::index::schema::DataType; using search::index::test::MockFieldLengthInspector; using vespalib::GenerationHandler; +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; + namespace memoryindex { diff --git a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp index 5032ed2c179..6476ac7cf8a 100644 --- a/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp +++ b/searchlib/src/tests/memoryindex/memory_index/memory_index_test.cpp @@ -1,7 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/searchlib/common/scheduletaskcallback.h> -#include <vespa/searchlib/common/sequencedtaskexecutor.h> #include <vespa/searchlib/fef/matchdata.h> #include <vespa/searchlib/fef/matchdatalayout.h> #include <vespa/searchlib/fef/termfieldmatchdata.h> @@ -14,9 +13,10 @@ #include <vespa/searchlib/queryeval/fake_search.h> #include <vespa/searchlib/queryeval/fake_searchable.h> #include <vespa/searchlib/queryeval/searchiterator.h> -#include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/util/sequencedtaskexecutor.h> +#include <vespa/vespalib/gtest/gtest.h> #include <vespa/log/log.h> LOG_SETUP("memory_index_test"); @@ -31,6 +31,8 @@ using vespalib::makeLambdaTask; using search::query::Node; using search::query::SimplePhrase; using search::query::SimpleStringTerm; +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; using namespace search::fef; using namespace search::index; using namespace search::memoryindex; @@ -64,8 +66,8 @@ struct MySetup : public IFieldLengthInspector { struct Index { Schema schema; vespalib::ThreadStackExecutor _executor; - std::unique_ptr<search::ISequencedTaskExecutor> _invertThreads; - std::unique_ptr<search::ISequencedTaskExecutor> _pushThreads; + std::unique_ptr<ISequencedTaskExecutor> _invertThreads; + std::unique_ptr<ISequencedTaskExecutor> _pushThreads; MemoryIndex index; DocBuilder builder; uint32_t docid; @@ -123,8 +125,8 @@ private: Index::Index(const MySetup &setup) : schema(setup.schema), _executor(1, 128 * 1024), - _invertThreads(search::SequencedTaskExecutor::create(2)), - _pushThreads(search::SequencedTaskExecutor::create(2)), + _invertThreads(SequencedTaskExecutor::create(2)), + _pushThreads(SequencedTaskExecutor::create(2)), index(schema, setup, *_invertThreads, *_pushThreads), builder(schema), docid(1), diff --git a/searchlib/src/vespa/searchlib/common/CMakeLists.txt b/searchlib/src/vespa/searchlib/common/CMakeLists.txt index f4a9e27b79d..36264a2035b 100644 --- a/searchlib/src/vespa/searchlib/common/CMakeLists.txt +++ b/searchlib/src/vespa/searchlib/common/CMakeLists.txt @@ -1,7 +1,6 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(searchlib_common OBJECT SOURCES - adaptive_sequenced_executor.cpp allocatedbitvector.cpp bitvector.cpp bitvectorcache.cpp @@ -12,11 +11,9 @@ vespa_add_library(searchlib_common OBJECT documentsummary.cpp featureset.cpp fileheadercontext.cpp - foregroundtaskexecutor.cpp gatecallback.cpp growablebitvector.cpp indexmetainfo.cpp - isequencedtaskexecutor.cpp location.cpp locationiterators.cpp mapnames.cpp @@ -24,8 +21,6 @@ vespa_add_library(searchlib_common OBJECT packets.cpp partialbitvector.cpp resultset.cpp - sequencedtaskexecutor.cpp - sequencedtaskexecutorobserver.cpp serialnumfileheadercontext.cpp sort.cpp sortdata.cpp diff --git a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp deleted file mode 100644 index f31172b1eba..00000000000 --- a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.cpp +++ /dev/null @@ -1,324 +0,0 @@ -// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "adaptive_sequenced_executor.h" - -namespace search { - -//----------------------------------------------------------------------------- - -AdaptiveSequencedExecutor::Strand::Strand() - : state(State::IDLE), - queue() -{ -} - -AdaptiveSequencedExecutor::Strand::~Strand() -{ - assert(queue.empty()); -} - -//----------------------------------------------------------------------------- - -AdaptiveSequencedExecutor::Worker::Worker() - : cond(), - state(State::RUNNING), - strand(nullptr) -{ -} - -AdaptiveSequencedExecutor::Worker::~Worker() -{ - assert(state == State::DONE); - assert(strand == nullptr); -} - -//----------------------------------------------------------------------------- - -AdaptiveSequencedExecutor::Self::Self() - : cond(), - state(State::OPEN), - waiting_tasks(0), - pending_tasks(0) -{ -} - -AdaptiveSequencedExecutor::Self::~Self() -{ - assert(state == State::CLOSED); - assert(waiting_tasks == 0); - assert(pending_tasks == 0); -} - -//----------------------------------------------------------------------------- - -AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in) - : parent(parent_in), - pool(std::make_unique<FastOS_ThreadPool>(STACK_SIZE)), - allow_worker_exit() -{ -} - -AdaptiveSequencedExecutor::ThreadTools::~ThreadTools() -{ - assert(pool->isClosed()); -} - -void -AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *) -{ - parent.worker_main(); -} - -void -AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads) -{ - for (size_t i = 0; i < num_threads; ++i) { - FastOS_ThreadInterface *thread = pool->NewThread(this); - assert(thread != nullptr); - (void)thread; - } -} - -void -AdaptiveSequencedExecutor::ThreadTools::close() -{ - allow_worker_exit.countDown(); - pool->Close(); -} - -//----------------------------------------------------------------------------- - -void -AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock<std::mutex> &lock) -{ - while (_self.state == Self::State::BLOCKED) { - _self.cond.wait(lock); - } - while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) { - _self.state = Self::State::BLOCKED; - while (_self.state == Self::State::BLOCKED) { - _self.cond.wait(lock); - } - } -} - -bool -AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock<std::mutex> &) -{ - if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) { - _self.state = Self::State::OPEN; - return true; - } - return false; -} - -AdaptiveSequencedExecutor::Worker * -AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock<std::mutex> &) -{ - if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) { - assert(!_wait_queue.empty()); - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::RUNNING; - worker->strand = _wait_queue.front(); - _wait_queue.pop(); - assert(worker->strand->state == Strand::State::WAITING); - assert(!worker->strand->queue.empty()); - worker->strand->state = Strand::State::ACTIVE; - assert(_self.waiting_tasks >= worker->strand->queue.size()); - _self.waiting_tasks -= worker->strand->queue.size(); - return worker; - } - return nullptr; -} - -bool -AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock) -{ - assert(worker.strand == nullptr); - if (!_wait_queue.empty()) { - worker.strand = _wait_queue.front(); - _wait_queue.pop(); - assert(worker.strand->state == Strand::State::WAITING); - assert(!worker.strand->queue.empty()); - worker.strand->state = Strand::State::ACTIVE; - assert(_self.waiting_tasks >= worker.strand->queue.size()); - _self.waiting_tasks -= worker.strand->queue.size(); - } else if (_self.state == Self::State::CLOSED) { - worker.state = Worker::State::DONE; - } else { - worker.state = Worker::State::BLOCKED; - _worker_stack.push(&worker); - while (worker.state == Worker::State::BLOCKED) { - worker.cond.wait(lock); - } - } - return (worker.state == Worker::State::RUNNING); -} - -bool -AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock) -{ - if (worker.strand == nullptr) { - return obtain_strand(worker, lock); - } - if (worker.strand->queue.empty()) { - worker.strand->state = Strand::State::IDLE; - worker.strand = nullptr; - return obtain_strand(worker, lock); - } - if (!_wait_queue.empty()) { - worker.strand->state = Strand::State::WAITING; - _self.waiting_tasks += worker.strand->queue.size(); - _wait_queue.push(worker.strand); - worker.strand = nullptr; - return obtain_strand(worker, lock); - } - return true; -} - -AdaptiveSequencedExecutor::Task::UP -AdaptiveSequencedExecutor::next_task(Worker &worker) -{ - Task::UP task; - Worker *worker_to_wake = nullptr; - auto guard = std::unique_lock(_mutex); - if (exchange_strand(worker, guard)) { - assert(worker.state == Worker::State::RUNNING); - assert(worker.strand != nullptr); - assert(!worker.strand->queue.empty()); - task = std::move(worker.strand->queue.front()); - worker.strand->queue.pop(); - _stats.queueSize.add(--_self.pending_tasks); - worker_to_wake = get_worker_to_wake(guard); - } else { - assert(worker.state == Worker::State::DONE); - assert(worker.strand == nullptr); - } - bool signal_self = maybe_unblock_self(guard); - guard.unlock(); // UNLOCK - if (worker_to_wake != nullptr) { - worker_to_wake->cond.notify_one(); - } - if (signal_self) { - _self.cond.notify_all(); - } - return task; -} - -void -AdaptiveSequencedExecutor::worker_main() -{ - Worker worker; - while (Task::UP my_task = next_task(worker)) { - my_task->run(); - } - _thread_tools->allow_worker_exit.await(); -} - -AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, - size_t max_waiting, size_t max_pending) - : ISequencedTaskExecutor(num_strands), - _thread_tools(std::make_unique<ThreadTools>(*this)), - _mutex(), - _strands(num_strands), - _wait_queue(num_strands), - _worker_stack(num_threads), - _self(), - _stats(), - _cfg(num_threads, max_waiting, max_pending) -{ - _stats.queueSize.add(_self.pending_tasks); - _thread_tools->start(num_threads); -} - -AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() -{ - sync(); - { - auto guard = std::unique_lock(_mutex); - assert(_self.state == Self::State::OPEN); - _self.state = Self::State::CLOSED; - while (!_worker_stack.empty()) { - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::DONE; - worker->cond.notify_one(); - } - _self.cond.notify_all(); - } - _thread_tools->close(); - assert(_wait_queue.empty()); - assert(_worker_stack.empty()); -} - -void -AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) -{ - assert(id.getId() < _strands.size()); - Strand &strand = _strands[id.getId()]; - auto guard = std::unique_lock(_mutex); - maybe_block_self(guard); - assert(_self.state != Self::State::CLOSED); - strand.queue.push(std::move(task)); - _stats.queueSize.add(++_self.pending_tasks); - ++_stats.acceptedTasks; - if (strand.state == Strand::State::WAITING) { - ++_self.waiting_tasks; - } else if (strand.state == Strand::State::IDLE) { - if (_worker_stack.size() < _cfg.num_threads) { - strand.state = Strand::State::WAITING; - _wait_queue.push(&strand); - _self.waiting_tasks += strand.queue.size(); - } else { - strand.state = Strand::State::ACTIVE; - assert(_wait_queue.empty()); - Worker *worker = _worker_stack.back(); - _worker_stack.popBack(); - assert(worker->state == Worker::State::BLOCKED); - assert(worker->strand == nullptr); - worker->state = Worker::State::RUNNING; - worker->strand = &strand; - guard.unlock(); // UNLOCK - worker->cond.notify_one(); - } - } -} - -void -AdaptiveSequencedExecutor::sync() -{ - vespalib::CountDownLatch latch(_strands.size()); - for (size_t i = 0; i < _strands.size(); ++i) { - execute(ExecutorId(i), [&](){ latch.countDown(); }); - } - latch.await(); -} - -void -AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit) -{ - auto guard = std::unique_lock(_mutex); - _cfg.set_max_pending(task_limit); - bool signal_self = maybe_unblock_self(guard); - guard.unlock(); // UNLOCK - if (signal_self) { - _self.cond.notify_all(); - } -} - -AdaptiveSequencedExecutor::Stats -AdaptiveSequencedExecutor::getStats() -{ - auto guard = std::lock_guard(_mutex); - Stats stats = _stats; - _stats = Stats(); - _stats.queueSize.add(_self.pending_tasks); - return stats; -} - -} diff --git a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h b/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h deleted file mode 100644 index 3abc095e9df..00000000000 --- a/searchlib/src/vespa/searchlib/common/adaptive_sequenced_executor.h +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#pragma once - -#include "isequencedtaskexecutor.h" -#include <vespa/vespalib/util/arrayqueue.hpp> -#include <vespa/vespalib/util/gate.h> -#include <vespa/fastos/thread.h> -#include <mutex> -#include <condition_variable> -#include <cassert> - -namespace search { - -/** - * Sequenced executor that balances the number of active threads in - * order to optimize for throughput over latency by minimizing the - * number of critical-path wakeups. - **/ -class AdaptiveSequencedExecutor : public ISequencedTaskExecutor -{ -private: - using Stats = vespalib::ExecutorStats; - using Task = vespalib::Executor::Task; - - /** - * Values used to configure the executor. - **/ - struct Config { - size_t num_threads; - size_t max_waiting; - size_t max_pending; - size_t wakeup_limit; - void set_max_pending(size_t max_pending_in) { - max_pending = std::max(1uL, max_pending_in); - wakeup_limit = std::max(1uL, size_t(max_pending * 0.9)); - assert(wakeup_limit > 0); - assert(wakeup_limit <= max_pending); - } - Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in) - : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900) - { - assert(num_threads > 0); - set_max_pending(max_pending_in); - } - }; - - /** - * Tasks that need to be sequenced are handled by a single strand. - **/ - struct Strand { - enum class State { IDLE, WAITING, ACTIVE }; - State state; - vespalib::ArrayQueue<Task::UP> queue; - Strand(); - ~Strand(); - }; - - /** - * The state of a single worker thread. - **/ - struct Worker { - enum class State { RUNNING, BLOCKED, DONE }; - std::condition_variable cond; - State state; - Strand *strand; - Worker(); - ~Worker(); - }; - - /** - * State related to the executor itself. - **/ - struct Self { - enum class State { OPEN, BLOCKED, CLOSED }; - std::condition_variable cond; - State state; - size_t waiting_tasks; - size_t pending_tasks; - Self(); - ~Self(); - }; - - /** - * Stuff related to worker thread startup and shutdown. - **/ - struct ThreadTools : FastOS_Runnable { - static constexpr size_t STACK_SIZE = (256 * 1024); - AdaptiveSequencedExecutor &parent; - std::unique_ptr<FastOS_ThreadPool> pool; - vespalib::Gate allow_worker_exit; - ThreadTools(AdaptiveSequencedExecutor &parent_in); - ~ThreadTools(); - void Run(FastOS_ThreadInterface *, void *) override; - void start(size_t num_threads); - void close(); - }; - - std::unique_ptr<ThreadTools> _thread_tools; - std::mutex _mutex; - std::vector<Strand> _strands; - vespalib::ArrayQueue<Strand*> _wait_queue; - vespalib::ArrayQueue<Worker*> _worker_stack; - Self _self; - Stats _stats; - Config _cfg; - - void maybe_block_self(std::unique_lock<std::mutex> &lock); - bool maybe_unblock_self(const std::unique_lock<std::mutex> &lock); - - Worker *get_worker_to_wake(const std::unique_lock<std::mutex> &lock); - bool obtain_strand(Worker &worker, std::unique_lock<std::mutex> &lock); - bool exchange_strand(Worker &worker, std::unique_lock<std::mutex> &lock); - Task::UP next_task(Worker &worker); - void worker_main(); -public: - AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, - size_t max_waiting, size_t max_pending); - ~AdaptiveSequencedExecutor() override; - void executeTask(ExecutorId id, Task::UP task) override; - void sync() override; - void setTaskLimit(uint32_t task_limit) override; - vespalib::ExecutorStats getStats() override; -}; - -} diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp deleted file mode 100644 index a93eb1ff4bc..00000000000 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.cpp +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "foregroundtaskexecutor.h" -#include <vespa/vespalib/util/threadstackexecutor.h> -#include <vespa/vespalib/stllike/hash_map.hpp> - -using vespalib::ThreadStackExecutor; - -namespace search { - -ForegroundTaskExecutor::ForegroundTaskExecutor() - : ForegroundTaskExecutor(1) -{ -} - -ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) - : ISequencedTaskExecutor(threads), - _accepted(0) -{ -} - -ForegroundTaskExecutor::~ForegroundTaskExecutor() = default; - -void -ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) -{ - assert(id.getId() < getNumExecutors()); - task->run(); - _accepted++; -} - -void -ForegroundTaskExecutor::sync() -{ -} - -void ForegroundTaskExecutor::setTaskLimit(uint32_t) { - -} - -vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { - return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); -} - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h b/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h deleted file mode 100644 index 0b604fb140d..00000000000 --- a/searchlib/src/vespa/searchlib/common/foregroundtaskexecutor.h +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "isequencedtaskexecutor.h" -#include <atomic> - -namespace vespalib { class ThreadStackExecutorBase; } - -namespace search { - -/** - * Class to run multiple tasks in parallel, but tasks with same - * id has to be run in sequence. - * - * Currently, this is a dummy version that runs everything in the foreground. - */ -class ForegroundTaskExecutor : public ISequencedTaskExecutor -{ -public: - using ISequencedTaskExecutor::getExecutorId; - - ForegroundTaskExecutor(); - ForegroundTaskExecutor(uint32_t threads); - ~ForegroundTaskExecutor() override; - - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; - void sync() override; - - void setTaskLimit(uint32_t taskLimit) override; - - vespalib::ExecutorStats getStats() override; -private: - std::atomic<uint64_t> _accepted; -}; - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp deleted file mode 100644 index 9d1bc99bebb..00000000000 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.cpp +++ /dev/null @@ -1,45 +0,0 @@ -// Copyright 2020 Oath inc.. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "isequencedtaskexecutor.h" -#include <vespa/vespalib/stllike/hash_fun.h> -#include <vespa/vespalib/stllike/hashtable.h> -#include <cassert> - -namespace search { - namespace { - constexpr uint8_t MAGIC = 255; - } - -ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) - : _component2Id(vespalib::hashtable_base::getModuloStl(numExecutors*8), MAGIC), - _mutex(), - _numExecutors(numExecutors), - _nextId(0) -{ - assert(numExecutors < 256); -} - -ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; - -ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const { - vespalib::hash<vespalib::stringref> hashfun; - return getExecutorId(hashfun(componentId)); -} - -ISequencedTaskExecutor::ExecutorId -ISequencedTaskExecutor::getExecutorId(uint64_t componentId) const { - uint32_t shrunkId = componentId % _component2Id.size(); - uint8_t executorId = _component2Id[shrunkId]; - if (executorId == MAGIC) { - std::lock_guard guard(_mutex); - if (_component2Id[shrunkId] == MAGIC) { - _component2Id[shrunkId] = _nextId % getNumExecutors(); - _nextId++; - } - executorId = _component2Id[shrunkId]; - } - return ExecutorId(executorId); -} - -} diff --git a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h deleted file mode 100644 index 55c26abc3d8..00000000000 --- a/searchlib/src/vespa/searchlib/common/isequencedtaskexecutor.h +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include <vespa/vespalib/util/executor.h> -#include <vespa/vespalib/util/executor_stats.h> -#include <vespa/vespalib/stllike/string.h> -#include <vespa/vespalib/util/lambdatask.h> -#include <vector> -#include <mutex> - -namespace search { - -/** - * Interface class to run multiple tasks in parallel, but tasks with same - * id has to be run in sequence. - */ -class ISequencedTaskExecutor -{ -public: - class ExecutorId { - public: - ExecutorId() : ExecutorId(0) { } - explicit ExecutorId(uint32_t id) : _id(id) { } - uint32_t getId() const { return _id; } - bool operator != (ExecutorId rhs) const { return _id != rhs._id; } - bool operator == (ExecutorId rhs) const { return _id == rhs._id; } - bool operator < (ExecutorId rhs) const { return _id < rhs._id; } - private: - uint32_t _id; - }; - ISequencedTaskExecutor(uint32_t numExecutors); - virtual ~ISequencedTaskExecutor(); - - /** - * Calculate which executor will handle an component. - * - * @param componentId component id - * @return executor id - */ - ExecutorId getExecutorId(uint64_t componentId) const; - uint32_t getNumExecutors() const { return _numExecutors; } - - ExecutorId getExecutorId(vespalib::stringref componentId) const; - - /** - * Schedule a task to run after all previously scheduled tasks with - * same id. - * - * @param id which internal executor to use - * @param task unique pointer to the task to be executed - */ - virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; - - /** - * Wrap lambda function into a task and schedule it to be run. - * Caller must ensure that pointers and references are valid and - * call sync before tearing down pointed to/referenced data. - * - * @param id which internal executor to use - * @param function function to be wrapped in a task and later executed - */ - template <class FunctionType> - void executeLambda(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); - } - /** - * Wait for all scheduled tasks to complete. - */ - virtual void sync() = 0; - - virtual void setTaskLimit(uint32_t taskLimit) = 0; - - virtual vespalib::ExecutorStats getStats() = 0; - - /** - * Wrap lambda function into a task and schedule it to be run. - * Caller must ensure that pointers and references are valid and - * call sync before tearing down pointed to/referenced data. - * - * @param componentId component id - * @param function function to be wrapped in a task and later executed - */ - template <class FunctionType> - void execute(uint64_t componentId, FunctionType &&function) { - ExecutorId id = getExecutorId(componentId); - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); - } - - /** - * Wrap lambda function into a task and schedule it to be run. - * Caller must ensure that pointers and references are valid and - * call sync before tearing down pointed to/referenced data. - * - * @param id executor id - * @param function function to be wrapped in a task and later executed - */ - template <class FunctionType> - void execute(ExecutorId id, FunctionType &&function) { - executeTask(id, vespalib::makeLambdaTask(std::forward<FunctionType>(function))); - } - /** - * For testing only - */ - uint32_t getComponentHashSize() const { return _component2Id.size(); } - uint32_t getComponentEffectiveHashSize() const { return _nextId; } -private: - mutable std::vector<uint8_t> _component2Id; - mutable std::mutex _mutex; - uint32_t _numExecutors; - mutable uint32_t _nextId; -}; - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp deleted file mode 100644 index c9b4af6cb55..00000000000 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.cpp +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "sequencedtaskexecutor.h" -#include <vespa/vespalib/util/blockingthreadstackexecutor.h> -#include <vespa/vespalib/util/singleexecutor.h> - -using vespalib::BlockingThreadStackExecutor; -using vespalib::SingleExecutor; - -namespace search { - -namespace { - -constexpr uint32_t stackSize = 128 * 1024; - -} - - -std::unique_ptr<ISequencedTaskExecutor> -SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) -{ - auto executors = std::make_unique<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>>(); - executors->reserve(threads); - for (uint32_t id = 0; id < threads; ++id) { - if (optimize == OptimizeFor::THROUGHPUT) { - executors->push_back(std::make_unique<SingleExecutor>(taskLimit)); - } else { - executors->push_back(std::make_unique<BlockingThreadStackExecutor>(1, stackSize, taskLimit)); - } - } - return std::unique_ptr<ISequencedTaskExecutor>(new SequencedTaskExecutor(std::move(executors))); -} - -SequencedTaskExecutor::~SequencedTaskExecutor() -{ - sync(); -} - -SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executors) - : ISequencedTaskExecutor(executors->size()), - _executors(std::move(executors)) -{ -} - -void -SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) -{ - for (const auto &executor : *_executors) { - executor->setTaskLimit(taskLimit); - } -} - -void -SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) -{ - assert(id.getId() < _executors->size()); - auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task)); - assert(!rejectedTask); -} - -void -SequencedTaskExecutor::sync() -{ - for (auto &executor : *_executors) { - SingleExecutor * single = dynamic_cast<vespalib::SingleExecutor *>(executor.get()); - if (single) { - //Enforce parallel wakeup of napping executors. - single->startSync(); - } - } - for (auto &executor : *_executors) { - executor->sync(); - } -} - -SequencedTaskExecutor::Stats -SequencedTaskExecutor::getStats() -{ - Stats accumulatedStats; - for (auto &executor :* _executors) { - accumulatedStats += executor->getStats(); - } - return accumulatedStats; -} - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h deleted file mode 100644 index 8568901006f..00000000000 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutor.h +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "isequencedtaskexecutor.h" -#include <vector> - -namespace vespalib { - struct ExecutorStats; - class SyncableThreadExecutor; -} - -namespace search { - -/** - * Class to run multiple tasks in parallel, but tasks with same - * id has to be run in sequence. - */ -class SequencedTaskExecutor final : public ISequencedTaskExecutor -{ - using Stats = vespalib::ExecutorStats; - std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> _executors; - - SequencedTaskExecutor(std::unique_ptr<std::vector<std::unique_ptr<vespalib::SyncableThreadExecutor>>> executor); -public: - using ISequencedTaskExecutor::getExecutorId; - using OptimizeFor = vespalib::Executor::OptimizeFor; - - ~SequencedTaskExecutor(); - - void setTaskLimit(uint32_t taskLimit) override; - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; - void sync() override; - Stats getStats() override; - - /* - * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. - */ - static std::unique_ptr<ISequencedTaskExecutor> - create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY); -}; - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp deleted file mode 100644 index e553e757c37..00000000000 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.cpp +++ /dev/null @@ -1,52 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. - -#include "sequencedtaskexecutorobserver.h" - -namespace search { - -SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor) - : ISequencedTaskExecutor(executor.getNumExecutors()), - _executor(executor), - _executeCnt(0u), - _syncCnt(0u), - _executeHistory(), - _mutex() -{ -} - -SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default; - -void -SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) -{ - ++_executeCnt; - { - std::lock_guard<std::mutex> guard(_mutex); - _executeHistory.emplace_back(id.getId()); - } - _executor.executeTask(id, std::move(task)); -} - -void -SequencedTaskExecutorObserver::sync() -{ - ++_syncCnt; - _executor.sync(); -} - -std::vector<uint32_t> -SequencedTaskExecutorObserver::getExecuteHistory() -{ - std::lock_guard<std::mutex> guard(_mutex); - return _executeHistory; -} - -void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { - _executor.setTaskLimit(taskLimit); -} - -vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { - return _executor.getStats(); -} - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h b/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h deleted file mode 100644 index dadd4bf59cf..00000000000 --- a/searchlib/src/vespa/searchlib/common/sequencedtaskexecutorobserver.h +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#pragma once - -#include "isequencedtaskexecutor.h" -#include <atomic> - -namespace search { - -/** - * Observer class to observe class to run multiple tasks in parallel, - * but tasks with same id has to be run in sequence. - */ -class SequencedTaskExecutorObserver : public ISequencedTaskExecutor -{ - ISequencedTaskExecutor &_executor; - std::atomic<uint32_t> _executeCnt; - std::atomic<uint32_t> _syncCnt; - std::vector<uint32_t> _executeHistory; - std::mutex _mutex; -public: - using ISequencedTaskExecutor::getExecutorId; - - SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor); - ~SequencedTaskExecutorObserver() override; - - void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; - void sync() override; - - uint32_t getExecuteCnt() const { return _executeCnt; } - uint32_t getSyncCnt() const { return _syncCnt; } - std::vector<uint32_t> getExecuteHistory(); - - void setTaskLimit(uint32_t taskLimit) override; - - vespalib::ExecutorStats getStats() override; -}; - -} // namespace search diff --git a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp index defb537be0e..4b253387f15 100644 --- a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp +++ b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.cpp @@ -1,9 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <cstddef> -#include <cstdint> #include "threaded_compactable_lid_space.h" -#include "isequencedtaskexecutor.h" #include <future> namespace search::common { diff --git a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h index 02d54acf666..f22d5946295 100644 --- a/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h +++ b/searchlib/src/vespa/searchlib/common/threaded_compactable_lid_space.h @@ -3,7 +3,7 @@ #pragma once #include "i_compactable_lid_space.h" -#include "isequencedtaskexecutor.h" +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <memory> namespace search::common { @@ -15,6 +15,7 @@ namespace search::common { */ class ThreadedCompactableLidSpace : public ICompactableLidSpace { + using ISequencedTaskExecutor = vespalib::ISequencedTaskExecutor; std::shared_ptr<ICompactableLidSpace> _target; ISequencedTaskExecutor &_executor; ISequencedTaskExecutor::ExecutorId _executorId; diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp index d032f06fc58..c8b23b2256f 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.cpp @@ -8,7 +8,7 @@ #include <vespa/document/annotation/alternatespanlist.h> #include <vespa/document/datatype/urldatatype.h> #include <vespa/document/repo/fixedtyperepo.h> -#include <vespa/searchlib/common/isequencedtaskexecutor.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/searchlib/common/sort.h> #include <vespa/searchlib/util/url.h> #include <vespa/vespalib/text/lowercase.h> diff --git a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h index ffa9dd0fab8..57f51e7fdb0 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h +++ b/searchlib/src/vespa/searchlib/memoryindex/document_inverter.h @@ -6,18 +6,21 @@ #include <vespa/searchlib/index/schema_index_fields.h> namespace document { -class DataType; -class Document; -class DocumentType; -class Field; -class FieldValue; + class DataType; + class Document; + class DocumentType; + class Field; + class FieldValue; } namespace search { - class ISequencedTaskExecutor; class IDestructorCallback; } +namespace vespalib { + class ISequencedTaskExecutor; +} + namespace search::memoryindex { class FieldInverter; @@ -31,6 +34,7 @@ class IFieldIndexCollection; */ class DocumentInverter { private: + using ISequencedTaskExecutor = vespalib::ISequencedTaskExecutor; DocumentInverter(const DocumentInverter &) = delete; DocumentInverter &operator=(const DocumentInverter &) = delete; diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp index d8e48e84fb7..34c5d65ab23 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.cpp @@ -5,7 +5,7 @@ #include "memory_index.h" #include <vespa/document/fieldvalue/arrayfieldvalue.h> #include <vespa/document/fieldvalue/document.h> -#include <vespa/searchlib/common/sequencedtaskexecutor.h> +#include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/searchlib/index/field_length_calculator.h> #include <vespa/searchlib/index/schemautil.h> #include <vespa/searchlib/queryeval/create_blueprint_visitor_helper.h> @@ -44,6 +44,7 @@ using queryeval::EmptyBlueprint; using queryeval::FieldSpec; using queryeval::IRequestContext; using queryeval::Searchable; +using vespalib::ISequencedTaskExecutor; } diff --git a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h index 4e20bcf81e3..a9f153f7dd8 100644 --- a/searchlib/src/vespa/searchlib/memoryindex/memory_index.h +++ b/searchlib/src/vespa/searchlib/memoryindex/memory_index.h @@ -14,7 +14,7 @@ namespace search::index { class IndexBuilder; } -namespace search { class ISequencedTaskExecutor; } +namespace vespalib { class ISequencedTaskExecutor; } namespace document { class Document; } @@ -40,6 +40,7 @@ class FieldIndexCollection; */ class MemoryIndex : public queryeval::Searchable { private: + using ISequencedTaskExecutor = vespalib::ISequencedTaskExecutor; index::Schema _schema; ISequencedTaskExecutor &_invertThreads; ISequencedTaskExecutor &_pushThreads; |