From 416ff1764ce98954b3b15fcae0f6a50d76b38323 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Sat, 4 Apr 2020 22:20:35 +0000 Subject: Move sequenced task executors to staging vespalib --- staging_vespalib/CMakeLists.txt | 1 + .../src/tests/sequencedtaskexecutor/.gitignore | 4 + .../src/tests/sequencedtaskexecutor/CMakeLists.txt | 31 ++ .../adaptive_sequenced_executor_test.cpp | 250 ++++++++++++++++ .../foregroundtaskexecutor_test.cpp | 120 ++++++++ .../sequencedtaskexecutor_benchmark.cpp | 70 +++++ .../sequencedtaskexecutor_test.cpp | 250 ++++++++++++++++ .../src/vespa/vespalib/util/CMakeLists.txt | 5 + .../vespalib/util/adaptive_sequenced_executor.cpp | 324 +++++++++++++++++++++ .../vespalib/util/adaptive_sequenced_executor.h | 126 ++++++++ .../vespa/vespalib/util/foregroundtaskexecutor.cpp | 42 +++ .../vespa/vespalib/util/foregroundtaskexecutor.h | 34 +++ .../vespa/vespalib/util/isequencedtaskexecutor.cpp | 46 +++ .../vespa/vespalib/util/isequencedtaskexecutor.h | 113 +++++++ .../vespa/vespalib/util/sequencedtaskexecutor.cpp | 83 ++++++ .../vespa/vespalib/util/sequencedtaskexecutor.h | 38 +++ .../util/sequencedtaskexecutorobserver.cpp | 52 ++++ .../vespalib/util/sequencedtaskexecutorobserver.h | 38 +++ 18 files changed, 1627 insertions(+) create mode 100644 staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore create mode 100644 staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt create mode 100644 staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp create mode 100644 staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp create mode 100644 staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp create mode 100644 staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp create mode 100644 staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp create mode 100644 staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h create mode 100644 staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp create mode 100644 staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h create mode 100644 staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp create mode 100644 staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h create mode 100644 staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp create mode 100644 staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h create mode 100644 staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp create mode 100644 staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h (limited to 'staging_vespalib') diff --git a/staging_vespalib/CMakeLists.txt b/staging_vespalib/CMakeLists.txt index 9fb48cbd4fc..e76d3078630 100644 --- a/staging_vespalib/CMakeLists.txt +++ b/staging_vespalib/CMakeLists.txt @@ -30,6 +30,7 @@ vespa_define_module( src/tests/shutdownguard src/tests/state_server src/tests/stllike + src/tests/sequencedtaskexecutor src/tests/singleexecutor src/tests/timer src/tests/util/process_memory_stats diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore b/staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore new file mode 100644 index 00000000000..523cfe5e3e1 --- /dev/null +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/.gitignore @@ -0,0 +1,4 @@ +staging_vespalib_sequencedtaskexecutor_test_app +staging_vespalib_sequencedtaskexecutor_benchmark_app +staging_vespalib_adaptive_sequenced_executor_test_app +staging_vespalib_foregroundtaskexecutor_test_app diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt b/staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt new file mode 100644 index 00000000000..6895eafd94a --- /dev/null +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/CMakeLists.txt @@ -0,0 +1,31 @@ +# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(staging_vespalib_sequencedtaskexecutor_benchmark_app TEST + SOURCES + sequencedtaskexecutor_benchmark.cpp + DEPENDS + staging_vespalib +) + +vespa_add_executable(staging_vespalib_sequencedtaskexecutor_test_app TEST + SOURCES + sequencedtaskexecutor_test.cpp + DEPENDS + staging_vespalib +) +vespa_add_test(NAME staging_vespalib_sequencedtaskexecutor_test_app COMMAND staging_vespalib_sequencedtaskexecutor_test_app) + +vespa_add_executable(staging_vespalib_adaptive_sequenced_executor_test_app TEST + SOURCES + adaptive_sequenced_executor_test.cpp + DEPENDS + staging_vespalib +) +vespa_add_test(NAME staging_vespalib_adaptive_sequenced_executor_test_app COMMAND staging_vespalib_adaptive_sequenced_executor_test_app) + +vespa_add_executable(staging_vespalib_foregroundtaskexecutor_test_app TEST + SOURCES + foregroundtaskexecutor_test.cpp + DEPENDS + staging_vespalib +) +vespa_add_test(NAME staging_vespalib_foregroundtaskexecutor_test_app COMMAND staging_vespalib_foregroundtaskexecutor_test_app) diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp new file mode 100644 index 00000000000..10f3f6089e3 --- /dev/null +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/adaptive_sequenced_executor_test.cpp @@ -0,0 +1,250 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include + +#include +#include + +#include +LOG_SETUP("adaptive_sequenced_executor_test"); + +namespace vespalib { + + +class Fixture +{ +public: + AdaptiveSequencedExecutor _threads; + + Fixture() : _threads(2, 2, 0, 1000) { } +}; + + +class TestObj +{ +public: + std::mutex _m; + std::condition_variable _cv; + int _done; + int _fail; + int _val; + + TestObj() + : _m(), + _cv(), + _done(0), + _fail(0), + _val(0) + { + } + + void + modify(int oldValue, int newValue) + { + { + std::lock_guard guard(_m); + if (_val == oldValue) { + _val = newValue; + } else { + ++_fail; + } + ++_done; + } + _cv.notify_all(); + } + + void + wait(int wantDone) + { + std::unique_lock guard(_m); + _cv.wait(guard, [&] { return this->_done >= wantDone; }); + } +}; + +TEST_F("testExecute", Fixture) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(1, [&]() { tv->modify(0, 42); }); + tv->wait(1); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + + +TEST_F("require that task with same component id are serialized", Fixture) +{ + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(0, [&]() { tv->modify(14, 42); }); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +TEST_F("require that task with different component ids are not serialized", Fixture) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < 100; ++tryCnt) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(0, [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(2, [&]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + EXPECT_TRUE(tryCnt < 100); +} + + +TEST_F("require that task with same string component id are serialized", Fixture) +{ + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + auto test2 = [&]() { tv->modify(14, 42); }; + f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorId("0"), test2); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +namespace { + +int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(f._threads.getExecutorId("0"), [&]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(f._threads.getExecutorId(altComponentId), [&]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + return tryCnt; +} + +vespalib::string makeAltComponentId(Fixture &f) +{ + int tryCnt = 0; + char altComponentId[20]; + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads.getExecutorId("0"); + for (tryCnt = 1; tryCnt < 100; ++tryCnt) { + sprintf(altComponentId, "%d", tryCnt); + if (f._threads.getExecutorId(altComponentId) == executorId0) { + break; + } + } + EXPECT_TRUE(tryCnt < 100); + return altComponentId; +} + +} + +TEST_F("require that task with different string component ids are not serialized", Fixture) +{ + int tryCnt = detectSerializeFailure(f, "2", 100); + EXPECT_TRUE(tryCnt < 100); +} + + +TEST_F("require that task with different string component ids mapping to the same executor id are serialized", + Fixture) +{ + vespalib::string altComponentId = makeAltComponentId(f); + LOG(info, "second string component id is \"%s\"", altComponentId.c_str()); + int tryCnt = detectSerializeFailure(f, altComponentId, 100); + EXPECT_TRUE(tryCnt == 100); +} + + +TEST_F("require that execute works with const lambda", Fixture) +{ + int i = 5; + std::vector res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads.execute(0, lambda); + f._threads.execute(0, lambda); + f._threads.sync(); + std::vector exp({5, 4, 5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST_F("require that execute works with reference to lambda", Fixture) +{ + int i = 5; + std::vector res; + auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + auto &lambdaref = lambda; + f._threads.execute(0, lambdaref); + f._threads.execute(0, lambdaref); + f._threads.sync(); + std::vector exp({5, 4, 5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST_F("require that executeLambda works", Fixture) +{ + int i = 5; + std::vector res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads.executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); + f._threads.sync(); + std::vector exp({5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST("require that you get correct number of executors") { + AdaptiveSequencedExecutor seven(7, 1, 0, 10); + EXPECT_EQUAL(7u, seven.getNumExecutors()); +} + +TEST("require that you distribute well") { + AdaptiveSequencedExecutor seven(7, 1, 0, 10); + EXPECT_EQUAL(7u, seven.getNumExecutors()); + EXPECT_EQUAL(97u, seven.getComponentHashSize()); + EXPECT_EQUAL(0u, seven.getComponentEffectiveHashSize()); + for (uint32_t id=0; id < 1000; id++) { + EXPECT_EQUAL((id%97)%7, seven.getExecutorId(id).getId()); + } + EXPECT_EQUAL(97u, seven.getComponentHashSize()); + EXPECT_EQUAL(97u, seven.getComponentEffectiveHashSize()); +} + +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp new file mode 100644 index 00000000000..a2671bb81a7 --- /dev/null +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/foregroundtaskexecutor_test.cpp @@ -0,0 +1,120 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include + +#include +#include + +#include +LOG_SETUP("foregroundtaskexecutor_test"); + +namespace vespalib { + + +class Fixture +{ +public: + ForegroundTaskExecutor _threads; + + Fixture() + : _threads() + { + } +}; + + +class TestObj +{ +public: + std::mutex _m; + std::condition_variable _cv; + int _done; + int _fail; + int _val; + + TestObj() + : _m(), + _cv(), + _done(0), + _fail(0), + _val(0) + { + } + + void + modify(int oldValue, int newValue) + { + { + std::lock_guard guard(_m); + if (_val == oldValue) { + _val = newValue; + } else { + ++_fail; + } + ++_done; + } + _cv.notify_all(); + } + + void + wait(int wantDone) + { + std::unique_lock guard(_m); + _cv.wait(guard, [=] { return this->_done >= wantDone; }); + } +}; + +TEST_F("testExecute", Fixture) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(1, [=]() { tv->modify(0, 42); }); + tv->wait(1); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + + +TEST_F("require that task with same id are serialized", Fixture) +{ + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(0, [=]() { tv->modify(14, 42); }); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +TEST_F("require that task with different ids are serialized", Fixture) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < 100; ++tryCnt) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads.execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads.execute(1, [=]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads.sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + EXPECT_TRUE(tryCnt >= 100); +} + + +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp new file mode 100644 index 00000000000..042408d439f --- /dev/null +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_benchmark.cpp @@ -0,0 +1,70 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include +#include +#include + +using vespalib::ISequencedTaskExecutor; +using vespalib::SequencedTaskExecutor; +using vespalib::AdaptiveSequencedExecutor; +using ExecutorId = vespalib::ISequencedTaskExecutor::ExecutorId; + +size_t do_work(size_t size) { + size_t ret = 0; + for (size_t i = 0; i < size; ++i) { + for (size_t j = 0; j < 128; ++j) { + ret = (ret + i) * j; + } + } + return ret; +} + +struct SimpleParams { + int argc; + char **argv; + int idx; + SimpleParams(int argc_in, char **argv_in) : argc(argc_in), argv(argv_in), idx(0) {} + int next(const char *name, int fallback) { + ++idx; + int value = 0; + if (argc > idx) { + value = atoi(argv[idx]); + } else { + value = fallback; + } + fprintf(stderr, "param %s: %d\n", name, value); + return value; + } +}; + +int main(int argc, char **argv) { + SimpleParams params(argc, argv); + bool use_adaptive_executor = params.next("use_adaptive_executor", 0); + bool optimize_for_throughput = params.next("optimize_for_throughput", 0); + size_t num_tasks = params.next("num_tasks", 1000000); + size_t num_strands = params.next("num_strands", 4); + size_t task_limit = params.next("task_limit", 1000); + size_t num_threads = params.next("num_threads", num_strands); + size_t max_waiting = params.next("max_waiting", optimize_for_throughput ? 32 : 0); + size_t work_size = params.next("work_size", 0); + std::atomic counter(0); + std::unique_ptr executor; + if (use_adaptive_executor) { + executor = std::make_unique(num_strands, num_threads, max_waiting, task_limit); + } else { + auto optimize = optimize_for_throughput + ? vespalib::Executor::OptimizeFor::THROUGHPUT + : vespalib::Executor::OptimizeFor::LATENCY; + executor = SequencedTaskExecutor::create(num_strands, task_limit, optimize); + } + vespalib::Timer timer; + for (size_t task_id = 0; task_id < num_tasks; ++task_id) { + executor->executeTask(ExecutorId(task_id % num_strands), + vespalib::makeLambdaTask([&counter,work_size] { (void) do_work(work_size); counter++; })); + } + executor.reset(); + fprintf(stderr, "\ntotal time: %zu ms\n", vespalib::count_ms(timer.elapsed())); + return (size_t(counter) == num_tasks) ? 0 : 1; +} diff --git a/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp new file mode 100644 index 00000000000..f5f04738e92 --- /dev/null +++ b/staging_vespalib/src/tests/sequencedtaskexecutor/sequencedtaskexecutor_test.cpp @@ -0,0 +1,250 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include + +#include +#include + +#include +LOG_SETUP("sequencedtaskexecutor_test"); + +namespace vespalib { + + +class Fixture +{ +public: + std::unique_ptr _threads; + + Fixture() : _threads(SequencedTaskExecutor::create(2)) { } +}; + + +class TestObj +{ +public: + std::mutex _m; + std::condition_variable _cv; + int _done; + int _fail; + int _val; + + TestObj() + : _m(), + _cv(), + _done(0), + _fail(0), + _val(0) + { + } + + void + modify(int oldValue, int newValue) + { + { + std::lock_guard guard(_m); + if (_val == oldValue) { + _val = newValue; + } else { + ++_fail; + } + ++_done; + } + _cv.notify_all(); + } + + void + wait(int wantDone) + { + std::unique_lock guard(_m); + _cv.wait(guard, [=] { return this->_done >= wantDone; }); + } +}; + +TEST_F("testExecute", Fixture) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads->execute(1, [=]() { tv->modify(0, 42); }); + tv->wait(1); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads->sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + + +TEST_F("require that task with same component id are serialized", Fixture) +{ + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(0, [=]() { tv->modify(14, 42); }); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads->sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +TEST_F("require that task with different component ids are not serialized", Fixture) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < 100; ++tryCnt) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads->execute(0, [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(2, [=]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads->sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + EXPECT_TRUE(tryCnt < 100); +} + + +TEST_F("require that task with same string component id are serialized", Fixture) +{ + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + auto test2 = [=]() { tv->modify(14, 42); }; + f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorId("0"), test2); + tv->wait(2); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); + f._threads->sync(); + EXPECT_EQUAL(0, tv->_fail); + EXPECT_EQUAL(42, tv->_val); +} + +namespace { + +int detectSerializeFailure(Fixture &f, vespalib::stringref altComponentId, int tryLimit) +{ + int tryCnt = 0; + for (tryCnt = 0; tryCnt < tryLimit; ++tryCnt) { + std::shared_ptr tv(std::make_shared()); + EXPECT_EQUAL(0, tv->_val); + f._threads->execute(f._threads->getExecutorId("0"), [=]() { usleep(2000); tv->modify(0, 14); }); + f._threads->execute(f._threads->getExecutorId(altComponentId), [=]() { tv->modify(14, 42); }); + tv->wait(2); + if (tv->_fail != 1) { + continue; + } + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + f._threads->sync(); + EXPECT_EQUAL(1, tv->_fail); + EXPECT_EQUAL(14, tv->_val); + break; + } + return tryCnt; +} + +vespalib::string makeAltComponentId(Fixture &f) +{ + int tryCnt = 0; + char altComponentId[20]; + ISequencedTaskExecutor::ExecutorId executorId0 = f._threads->getExecutorId("0"); + for (tryCnt = 1; tryCnt < 100; ++tryCnt) { + sprintf(altComponentId, "%d", tryCnt); + if (f._threads->getExecutorId(altComponentId) == executorId0) { + break; + } + } + EXPECT_TRUE(tryCnt < 100); + return altComponentId; +} + +} + +TEST_F("require that task with different string component ids are not serialized", Fixture) +{ + int tryCnt = detectSerializeFailure(f, "2", 100); + EXPECT_TRUE(tryCnt < 100); +} + + +TEST_F("require that task with different string component ids mapping to the same executor id are serialized", + Fixture) +{ + vespalib::string altComponentId = makeAltComponentId(f); + LOG(info, "second string component id is \"%s\"", altComponentId.c_str()); + int tryCnt = detectSerializeFailure(f, altComponentId, 100); + EXPECT_TRUE(tryCnt == 100); +} + + +TEST_F("require that execute works with const lambda", Fixture) +{ + int i = 5; + std::vector res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads->execute(0, lambda); + f._threads->execute(0, lambda); + f._threads->sync(); + std::vector exp({5, 4, 5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST_F("require that execute works with reference to lambda", Fixture) +{ + int i = 5; + std::vector res; + auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + auto &lambdaref = lambda; + f._threads->execute(0, lambdaref); + f._threads->execute(0, lambdaref); + f._threads->sync(); + std::vector exp({5, 4, 5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST_F("require that executeLambda works", Fixture) +{ + int i = 5; + std::vector res; + const auto lambda = [i, &res]() mutable + { res.push_back(i--); res.push_back(i--); }; + f._threads->executeLambda(ISequencedTaskExecutor::ExecutorId(0), lambda); + f._threads->sync(); + std::vector exp({5, 4}); + EXPECT_EQUAL(exp, res); + EXPECT_EQUAL(5, i); +} + +TEST("require that you get correct number of executors") { + auto seven = SequencedTaskExecutor::create(7); + EXPECT_EQUAL(7u, seven->getNumExecutors()); +} + +TEST("require that you distribute well") { + auto seven = SequencedTaskExecutor::create(7); + EXPECT_EQUAL(7u, seven->getNumExecutors()); + EXPECT_EQUAL(97u, seven->getComponentHashSize()); + EXPECT_EQUAL(0u, seven->getComponentEffectiveHashSize()); + for (uint32_t id=0; id < 1000; id++) { + EXPECT_EQUAL((id%97)%7, seven->getExecutorId(id).getId()); + } + EXPECT_EQUAL(97u, seven->getComponentHashSize()); + EXPECT_EQUAL(97u, seven->getComponentEffectiveHashSize()); +} + +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt index ba03b77c941..586e06396e7 100644 --- a/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt +++ b/staging_vespalib/src/vespa/vespalib/util/CMakeLists.txt @@ -1,11 +1,14 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(staging_vespalib_vespalib_util OBJECT SOURCES + adaptive_sequenced_executor.cpp bits.cpp clock.cpp crc.cpp doom.cpp + foregroundtaskexecutor.cpp growablebytebuffer.cpp + isequencedtaskexecutor.cpp jsonexception.cpp jsonstream.cpp jsonwriter.cpp @@ -15,6 +18,8 @@ vespa_add_library(staging_vespalib_vespalib_util OBJECT programoptions_testutils.cpp document_runnable.cpp rusage.cpp + sequencedtaskexecutor.cpp + sequencedtaskexecutorobserver.cpp shutdownguard.cpp scheduledexecutor.cpp singleexecutor.cpp diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp new file mode 100644 index 00000000000..50bc3b020a8 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.cpp @@ -0,0 +1,324 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "adaptive_sequenced_executor.h" + +namespace vespalib { + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::Strand::Strand() + : state(State::IDLE), + queue() +{ +} + +AdaptiveSequencedExecutor::Strand::~Strand() +{ + assert(queue.empty()); +} + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::Worker::Worker() + : cond(), + state(State::RUNNING), + strand(nullptr) +{ +} + +AdaptiveSequencedExecutor::Worker::~Worker() +{ + assert(state == State::DONE); + assert(strand == nullptr); +} + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::Self::Self() + : cond(), + state(State::OPEN), + waiting_tasks(0), + pending_tasks(0) +{ +} + +AdaptiveSequencedExecutor::Self::~Self() +{ + assert(state == State::CLOSED); + assert(waiting_tasks == 0); + assert(pending_tasks == 0); +} + +//----------------------------------------------------------------------------- + +AdaptiveSequencedExecutor::ThreadTools::ThreadTools(AdaptiveSequencedExecutor &parent_in) + : parent(parent_in), + pool(std::make_unique(STACK_SIZE)), + allow_worker_exit() +{ +} + +AdaptiveSequencedExecutor::ThreadTools::~ThreadTools() +{ + assert(pool->isClosed()); +} + +void +AdaptiveSequencedExecutor::ThreadTools::Run(FastOS_ThreadInterface *, void *) +{ + parent.worker_main(); +} + +void +AdaptiveSequencedExecutor::ThreadTools::start(size_t num_threads) +{ + for (size_t i = 0; i < num_threads; ++i) { + FastOS_ThreadInterface *thread = pool->NewThread(this); + assert(thread != nullptr); + (void)thread; + } +} + +void +AdaptiveSequencedExecutor::ThreadTools::close() +{ + allow_worker_exit.countDown(); + pool->Close(); +} + +//----------------------------------------------------------------------------- + +void +AdaptiveSequencedExecutor::maybe_block_self(std::unique_lock &lock) +{ + while (_self.state == Self::State::BLOCKED) { + _self.cond.wait(lock); + } + while ((_self.state == Self::State::OPEN) && (_self.pending_tasks >= _cfg.max_pending)) { + _self.state = Self::State::BLOCKED; + while (_self.state == Self::State::BLOCKED) { + _self.cond.wait(lock); + } + } +} + +bool +AdaptiveSequencedExecutor::maybe_unblock_self(const std::unique_lock &) +{ + if ((_self.state == Self::State::BLOCKED) && (_self.pending_tasks < _cfg.wakeup_limit)) { + _self.state = Self::State::OPEN; + return true; + } + return false; +} + +AdaptiveSequencedExecutor::Worker * +AdaptiveSequencedExecutor::get_worker_to_wake(const std::unique_lock &) +{ + if ((_self.waiting_tasks > _cfg.max_waiting) && (!_worker_stack.empty())) { + assert(!_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = _wait_queue.front(); + _wait_queue.pop(); + assert(worker->strand->state == Strand::State::WAITING); + assert(!worker->strand->queue.empty()); + worker->strand->state = Strand::State::ACTIVE; + assert(_self.waiting_tasks >= worker->strand->queue.size()); + _self.waiting_tasks -= worker->strand->queue.size(); + return worker; + } + return nullptr; +} + +bool +AdaptiveSequencedExecutor::obtain_strand(Worker &worker, std::unique_lock &lock) +{ + assert(worker.strand == nullptr); + if (!_wait_queue.empty()) { + worker.strand = _wait_queue.front(); + _wait_queue.pop(); + assert(worker.strand->state == Strand::State::WAITING); + assert(!worker.strand->queue.empty()); + worker.strand->state = Strand::State::ACTIVE; + assert(_self.waiting_tasks >= worker.strand->queue.size()); + _self.waiting_tasks -= worker.strand->queue.size(); + } else if (_self.state == Self::State::CLOSED) { + worker.state = Worker::State::DONE; + } else { + worker.state = Worker::State::BLOCKED; + _worker_stack.push(&worker); + while (worker.state == Worker::State::BLOCKED) { + worker.cond.wait(lock); + } + } + return (worker.state == Worker::State::RUNNING); +} + +bool +AdaptiveSequencedExecutor::exchange_strand(Worker &worker, std::unique_lock &lock) +{ + if (worker.strand == nullptr) { + return obtain_strand(worker, lock); + } + if (worker.strand->queue.empty()) { + worker.strand->state = Strand::State::IDLE; + worker.strand = nullptr; + return obtain_strand(worker, lock); + } + if (!_wait_queue.empty()) { + worker.strand->state = Strand::State::WAITING; + _self.waiting_tasks += worker.strand->queue.size(); + _wait_queue.push(worker.strand); + worker.strand = nullptr; + return obtain_strand(worker, lock); + } + return true; +} + +AdaptiveSequencedExecutor::Task::UP +AdaptiveSequencedExecutor::next_task(Worker &worker) +{ + Task::UP task; + Worker *worker_to_wake = nullptr; + auto guard = std::unique_lock(_mutex); + if (exchange_strand(worker, guard)) { + assert(worker.state == Worker::State::RUNNING); + assert(worker.strand != nullptr); + assert(!worker.strand->queue.empty()); + task = std::move(worker.strand->queue.front()); + worker.strand->queue.pop(); + _stats.queueSize.add(--_self.pending_tasks); + worker_to_wake = get_worker_to_wake(guard); + } else { + assert(worker.state == Worker::State::DONE); + assert(worker.strand == nullptr); + } + bool signal_self = maybe_unblock_self(guard); + guard.unlock(); // UNLOCK + if (worker_to_wake != nullptr) { + worker_to_wake->cond.notify_one(); + } + if (signal_self) { + _self.cond.notify_all(); + } + return task; +} + +void +AdaptiveSequencedExecutor::worker_main() +{ + Worker worker; + while (Task::UP my_task = next_task(worker)) { + my_task->run(); + } + _thread_tools->allow_worker_exit.await(); +} + +AdaptiveSequencedExecutor::AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, + size_t max_waiting, size_t max_pending) + : ISequencedTaskExecutor(num_strands), + _thread_tools(std::make_unique(*this)), + _mutex(), + _strands(num_strands), + _wait_queue(num_strands), + _worker_stack(num_threads), + _self(), + _stats(), + _cfg(num_threads, max_waiting, max_pending) +{ + _stats.queueSize.add(_self.pending_tasks); + _thread_tools->start(num_threads); +} + +AdaptiveSequencedExecutor::~AdaptiveSequencedExecutor() +{ + sync(); + { + auto guard = std::unique_lock(_mutex); + assert(_self.state == Self::State::OPEN); + _self.state = Self::State::CLOSED; + while (!_worker_stack.empty()) { + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::DONE; + worker->cond.notify_one(); + } + _self.cond.notify_all(); + } + _thread_tools->close(); + assert(_wait_queue.empty()); + assert(_worker_stack.empty()); +} + +void +AdaptiveSequencedExecutor::executeTask(ExecutorId id, Task::UP task) +{ + assert(id.getId() < _strands.size()); + Strand &strand = _strands[id.getId()]; + auto guard = std::unique_lock(_mutex); + maybe_block_self(guard); + assert(_self.state != Self::State::CLOSED); + strand.queue.push(std::move(task)); + _stats.queueSize.add(++_self.pending_tasks); + ++_stats.acceptedTasks; + if (strand.state == Strand::State::WAITING) { + ++_self.waiting_tasks; + } else if (strand.state == Strand::State::IDLE) { + if (_worker_stack.size() < _cfg.num_threads) { + strand.state = Strand::State::WAITING; + _wait_queue.push(&strand); + _self.waiting_tasks += strand.queue.size(); + } else { + strand.state = Strand::State::ACTIVE; + assert(_wait_queue.empty()); + Worker *worker = _worker_stack.back(); + _worker_stack.popBack(); + assert(worker->state == Worker::State::BLOCKED); + assert(worker->strand == nullptr); + worker->state = Worker::State::RUNNING; + worker->strand = &strand; + guard.unlock(); // UNLOCK + worker->cond.notify_one(); + } + } +} + +void +AdaptiveSequencedExecutor::sync() +{ + vespalib::CountDownLatch latch(_strands.size()); + for (size_t i = 0; i < _strands.size(); ++i) { + execute(ExecutorId(i), [&](){ latch.countDown(); }); + } + latch.await(); +} + +void +AdaptiveSequencedExecutor::setTaskLimit(uint32_t task_limit) +{ + auto guard = std::unique_lock(_mutex); + _cfg.set_max_pending(task_limit); + bool signal_self = maybe_unblock_self(guard); + guard.unlock(); // UNLOCK + if (signal_self) { + _self.cond.notify_all(); + } +} + +AdaptiveSequencedExecutor::Stats +AdaptiveSequencedExecutor::getStats() +{ + auto guard = std::lock_guard(_mutex); + Stats stats = _stats; + _stats = Stats(); + _stats.queueSize.add(_self.pending_tasks); + return stats; +} + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h new file mode 100644 index 00000000000..bc3457a72ef --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/adaptive_sequenced_executor.h @@ -0,0 +1,126 @@ +// Copyright 2020 Oath Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "isequencedtaskexecutor.h" +#include +#include +#include +#include +#include +#include + +namespace vespalib { + +/** + * Sequenced executor that balances the number of active threads in + * order to optimize for throughput over latency by minimizing the + * number of critical-path wakeups. + **/ +class AdaptiveSequencedExecutor : public ISequencedTaskExecutor +{ +private: + using Stats = vespalib::ExecutorStats; + using Task = vespalib::Executor::Task; + + /** + * Values used to configure the executor. + **/ + struct Config { + size_t num_threads; + size_t max_waiting; + size_t max_pending; + size_t wakeup_limit; + void set_max_pending(size_t max_pending_in) { + max_pending = std::max(1uL, max_pending_in); + wakeup_limit = std::max(1uL, size_t(max_pending * 0.9)); + assert(wakeup_limit > 0); + assert(wakeup_limit <= max_pending); + } + Config(size_t num_threads_in, size_t max_waiting_in, size_t max_pending_in) + : num_threads(num_threads_in), max_waiting(max_waiting_in), max_pending(1000), wakeup_limit(900) + { + assert(num_threads > 0); + set_max_pending(max_pending_in); + } + }; + + /** + * Tasks that need to be sequenced are handled by a single strand. + **/ + struct Strand { + enum class State { IDLE, WAITING, ACTIVE }; + State state; + vespalib::ArrayQueue queue; + Strand(); + ~Strand(); + }; + + /** + * The state of a single worker thread. + **/ + struct Worker { + enum class State { RUNNING, BLOCKED, DONE }; + std::condition_variable cond; + State state; + Strand *strand; + Worker(); + ~Worker(); + }; + + /** + * State related to the executor itself. + **/ + struct Self { + enum class State { OPEN, BLOCKED, CLOSED }; + std::condition_variable cond; + State state; + size_t waiting_tasks; + size_t pending_tasks; + Self(); + ~Self(); + }; + + /** + * Stuff related to worker thread startup and shutdown. + **/ + struct ThreadTools : FastOS_Runnable { + static constexpr size_t STACK_SIZE = (256 * 1024); + AdaptiveSequencedExecutor &parent; + std::unique_ptr pool; + vespalib::Gate allow_worker_exit; + ThreadTools(AdaptiveSequencedExecutor &parent_in); + ~ThreadTools(); + void Run(FastOS_ThreadInterface *, void *) override; + void start(size_t num_threads); + void close(); + }; + + std::unique_ptr _thread_tools; + std::mutex _mutex; + std::vector _strands; + vespalib::ArrayQueue _wait_queue; + vespalib::ArrayQueue _worker_stack; + Self _self; + Stats _stats; + Config _cfg; + + void maybe_block_self(std::unique_lock &lock); + bool maybe_unblock_self(const std::unique_lock &lock); + + Worker *get_worker_to_wake(const std::unique_lock &lock); + bool obtain_strand(Worker &worker, std::unique_lock &lock); + bool exchange_strand(Worker &worker, std::unique_lock &lock); + Task::UP next_task(Worker &worker); + void worker_main(); +public: + AdaptiveSequencedExecutor(size_t num_strands, size_t num_threads, + size_t max_waiting, size_t max_pending); + ~AdaptiveSequencedExecutor() override; + void executeTask(ExecutorId id, Task::UP task) override; + void sync() override; + void setTaskLimit(uint32_t task_limit) override; + vespalib::ExecutorStats getStats() override; +}; + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp new file mode 100644 index 00000000000..b45ada1c58c --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.cpp @@ -0,0 +1,42 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "foregroundtaskexecutor.h" +#include + +namespace vespalib { + +ForegroundTaskExecutor::ForegroundTaskExecutor() + : ForegroundTaskExecutor(1) +{ +} + +ForegroundTaskExecutor::ForegroundTaskExecutor(uint32_t threads) + : ISequencedTaskExecutor(threads), + _accepted(0) +{ +} + +ForegroundTaskExecutor::~ForegroundTaskExecutor() = default; + +void +ForegroundTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +{ + assert(id.getId() < getNumExecutors()); + task->run(); + _accepted++; +} + +void +ForegroundTaskExecutor::sync() +{ +} + +void ForegroundTaskExecutor::setTaskLimit(uint32_t) { + +} + +vespalib::ExecutorStats ForegroundTaskExecutor::getStats() { + return vespalib::ExecutorStats(vespalib::ExecutorStats::QueueSizeT(0) , _accepted.load(std::memory_order_relaxed), 0); +} + +} // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h new file mode 100644 index 00000000000..d9a348ed012 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/foregroundtaskexecutor.h @@ -0,0 +1,34 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "isequencedtaskexecutor.h" +#include + +namespace vespalib { + +/** + * Class to run multiple tasks in parallel, but tasks with same + * id has to be run in sequence. + * + * Currently, this is a dummy version that runs everything in the foreground. + */ +class ForegroundTaskExecutor : public ISequencedTaskExecutor +{ +public: + using ISequencedTaskExecutor::getExecutorId; + + ForegroundTaskExecutor(); + ForegroundTaskExecutor(uint32_t threads); + ~ForegroundTaskExecutor() override; + + void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void sync() override; + + void setTaskLimit(uint32_t taskLimit) override; + + vespalib::ExecutorStats getStats() override; +private: + std::atomic _accepted; +}; + +} // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp new file mode 100644 index 00000000000..d05702cc85b --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.cpp @@ -0,0 +1,46 @@ +// Copyright 2020 Oath inc.. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "isequencedtaskexecutor.h" +#include +#include +#include + +namespace vespalib { + +namespace { + constexpr uint8_t MAGIC = 255; +} + +ISequencedTaskExecutor::ISequencedTaskExecutor(uint32_t numExecutors) + : _component2Id(vespalib::hashtable_base::getModuloStl(numExecutors*8), MAGIC), + _mutex(), + _numExecutors(numExecutors), + _nextId(0) +{ + assert(numExecutors < 256); +} + +ISequencedTaskExecutor::~ISequencedTaskExecutor() = default; + +ISequencedTaskExecutor::ExecutorId +ISequencedTaskExecutor::getExecutorId(vespalib::stringref componentId) const { + vespalib::hash hashfun; + return getExecutorId(hashfun(componentId)); +} + +ISequencedTaskExecutor::ExecutorId +ISequencedTaskExecutor::getExecutorId(uint64_t componentId) const { + uint32_t shrunkId = componentId % _component2Id.size(); + uint8_t executorId = _component2Id[shrunkId]; + if (executorId == MAGIC) { + std::lock_guard guard(_mutex); + if (_component2Id[shrunkId] == MAGIC) { + _component2Id[shrunkId] = _nextId % getNumExecutors(); + _nextId++; + } + executorId = _component2Id[shrunkId]; + } + return ExecutorId(executorId); +} + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h new file mode 100644 index 00000000000..cd2a6c6f0d8 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/isequencedtaskexecutor.h @@ -0,0 +1,113 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace vespalib { + +/** + * Interface class to run multiple tasks in parallel, but tasks with same + * id has to be run in sequence. + */ +class ISequencedTaskExecutor +{ +public: + class ExecutorId { + public: + ExecutorId() : ExecutorId(0) { } + explicit ExecutorId(uint32_t id) : _id(id) { } + uint32_t getId() const { return _id; } + bool operator != (ExecutorId rhs) const { return _id != rhs._id; } + bool operator == (ExecutorId rhs) const { return _id == rhs._id; } + bool operator < (ExecutorId rhs) const { return _id < rhs._id; } + private: + uint32_t _id; + }; + ISequencedTaskExecutor(uint32_t numExecutors); + virtual ~ISequencedTaskExecutor(); + + /** + * Calculate which executor will handle an component. + * + * @param componentId component id + * @return executor id + */ + ExecutorId getExecutorId(uint64_t componentId) const; + uint32_t getNumExecutors() const { return _numExecutors; } + + ExecutorId getExecutorId(vespalib::stringref componentId) const; + + /** + * Schedule a task to run after all previously scheduled tasks with + * same id. + * + * @param id which internal executor to use + * @param task unique pointer to the task to be executed + */ + virtual void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) = 0; + + /** + * Wrap lambda function into a task and schedule it to be run. + * Caller must ensure that pointers and references are valid and + * call sync before tearing down pointed to/referenced data. + * + * @param id which internal executor to use + * @param function function to be wrapped in a task and later executed + */ + template + void executeLambda(ExecutorId id, FunctionType &&function) { + executeTask(id, vespalib::makeLambdaTask(std::forward(function))); + } + /** + * Wait for all scheduled tasks to complete. + */ + virtual void sync() = 0; + + virtual void setTaskLimit(uint32_t taskLimit) = 0; + + virtual vespalib::ExecutorStats getStats() = 0; + + /** + * Wrap lambda function into a task and schedule it to be run. + * Caller must ensure that pointers and references are valid and + * call sync before tearing down pointed to/referenced data. + * + * @param componentId component id + * @param function function to be wrapped in a task and later executed + */ + template + void execute(uint64_t componentId, FunctionType &&function) { + ExecutorId id = getExecutorId(componentId); + executeTask(id, vespalib::makeLambdaTask(std::forward(function))); + } + + /** + * Wrap lambda function into a task and schedule it to be run. + * Caller must ensure that pointers and references are valid and + * call sync before tearing down pointed to/referenced data. + * + * @param id executor id + * @param function function to be wrapped in a task and later executed + */ + template + void execute(ExecutorId id, FunctionType &&function) { + executeTask(id, vespalib::makeLambdaTask(std::forward(function))); + } + /** + * For testing only + */ + uint32_t getComponentHashSize() const { return _component2Id.size(); } + uint32_t getComponentEffectiveHashSize() const { return _nextId; } +private: + mutable std::vector _component2Id; + mutable std::mutex _mutex; + uint32_t _numExecutors; + mutable uint32_t _nextId; +}; + +} diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp new file mode 100644 index 00000000000..aa43bfaae7d --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.cpp @@ -0,0 +1,83 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "sequencedtaskexecutor.h" +#include "singleexecutor.h" +#include + +namespace vespalib { + +namespace { + +constexpr uint32_t stackSize = 128 * 1024; + +} + + +std::unique_ptr +SequencedTaskExecutor::create(uint32_t threads, uint32_t taskLimit, OptimizeFor optimize) +{ + auto executors = std::make_unique>>(); + executors->reserve(threads); + for (uint32_t id = 0; id < threads; ++id) { + if (optimize == OptimizeFor::THROUGHPUT) { + executors->push_back(std::make_unique(taskLimit)); + } else { + executors->push_back(std::make_unique(1, stackSize, taskLimit)); + } + } + return std::unique_ptr(new SequencedTaskExecutor(std::move(executors))); +} + +SequencedTaskExecutor::~SequencedTaskExecutor() +{ + sync(); +} + +SequencedTaskExecutor::SequencedTaskExecutor(std::unique_ptr>> executors) + : ISequencedTaskExecutor(executors->size()), + _executors(std::move(executors)) +{ +} + +void +SequencedTaskExecutor::setTaskLimit(uint32_t taskLimit) +{ + for (const auto &executor : *_executors) { + executor->setTaskLimit(taskLimit); + } +} + +void +SequencedTaskExecutor::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +{ + assert(id.getId() < _executors->size()); + auto rejectedTask = (*_executors)[id.getId()]->execute(std::move(task)); + assert(!rejectedTask); +} + +void +SequencedTaskExecutor::sync() +{ + for (auto &executor : *_executors) { + SingleExecutor * single = dynamic_cast(executor.get()); + if (single) { + //Enforce parallel wakeup of napping executors. + single->startSync(); + } + } + for (auto &executor : *_executors) { + executor->sync(); + } +} + +SequencedTaskExecutor::Stats +SequencedTaskExecutor::getStats() +{ + Stats accumulatedStats; + for (auto &executor :* _executors) { + accumulatedStats += executor->getStats(); + } + return accumulatedStats; +} + +} // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h new file mode 100644 index 00000000000..14127970403 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutor.h @@ -0,0 +1,38 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "isequencedtaskexecutor.h" + +namespace vespalib { + +class SyncableThreadExecutor; + +/** + * Class to run multiple tasks in parallel, but tasks with same + * id has to be run in sequence. + */ +class SequencedTaskExecutor final : public ISequencedTaskExecutor +{ + using Stats = vespalib::ExecutorStats; + std::unique_ptr>> _executors; + + SequencedTaskExecutor(std::unique_ptr>> executor); +public: + using ISequencedTaskExecutor::getExecutorId; + using OptimizeFor = vespalib::Executor::OptimizeFor; + + ~SequencedTaskExecutor(); + + void setTaskLimit(uint32_t taskLimit) override; + void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void sync() override; + Stats getStats() override; + + /* + * Note that if you choose Optimize::THROUGHPUT, you must ensure only a single producer, or synchronize on the outside. + */ + static std::unique_ptr + create(uint32_t threads, uint32_t taskLimit = 1000, OptimizeFor optimize = OptimizeFor::LATENCY); +}; + +} // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp new file mode 100644 index 00000000000..3d9ed4e21f4 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.cpp @@ -0,0 +1,52 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "sequencedtaskexecutorobserver.h" + +namespace vespalib { + +SequencedTaskExecutorObserver::SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor) + : ISequencedTaskExecutor(executor.getNumExecutors()), + _executor(executor), + _executeCnt(0u), + _syncCnt(0u), + _executeHistory(), + _mutex() +{ +} + +SequencedTaskExecutorObserver::~SequencedTaskExecutorObserver() = default; + +void +SequencedTaskExecutorObserver::executeTask(ExecutorId id, vespalib::Executor::Task::UP task) +{ + ++_executeCnt; + { + std::lock_guard guard(_mutex); + _executeHistory.emplace_back(id.getId()); + } + _executor.executeTask(id, std::move(task)); +} + +void +SequencedTaskExecutorObserver::sync() +{ + ++_syncCnt; + _executor.sync(); +} + +std::vector +SequencedTaskExecutorObserver::getExecuteHistory() +{ + std::lock_guard guard(_mutex); + return _executeHistory; +} + +void SequencedTaskExecutorObserver::setTaskLimit(uint32_t taskLimit) { + _executor.setTaskLimit(taskLimit); +} + +vespalib::ExecutorStats SequencedTaskExecutorObserver::getStats() { + return _executor.getStats(); +} + +} // namespace search diff --git a/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h new file mode 100644 index 00000000000..9307a7ddb37 --- /dev/null +++ b/staging_vespalib/src/vespa/vespalib/util/sequencedtaskexecutorobserver.h @@ -0,0 +1,38 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "isequencedtaskexecutor.h" +#include + +namespace vespalib { + +/** + * Observer class to observe class to run multiple tasks in parallel, + * but tasks with same id has to be run in sequence. + */ +class SequencedTaskExecutorObserver : public ISequencedTaskExecutor +{ + ISequencedTaskExecutor &_executor; + std::atomic _executeCnt; + std::atomic _syncCnt; + std::vector _executeHistory; + std::mutex _mutex; +public: + using ISequencedTaskExecutor::getExecutorId; + + SequencedTaskExecutorObserver(ISequencedTaskExecutor &executor); + ~SequencedTaskExecutorObserver() override; + + void executeTask(ExecutorId id, vespalib::Executor::Task::UP task) override; + void sync() override; + + uint32_t getExecuteCnt() const { return _executeCnt; } + uint32_t getSyncCnt() const { return _syncCnt; } + std::vector getExecuteHistory(); + + void setTaskLimit(uint32_t taskLimit) override; + + vespalib::ExecutorStats getStats() override; +}; + +} // namespace search -- cgit v1.2.3