From 2556fff42957d5e0072fbcd13b329aab7e3231a0 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Tue, 14 Mar 2023 14:23:52 +0000 Subject: enable running sub-co-routines concurrently --- vespalib/CMakeLists.txt | 1 + vespalib/src/tests/coro/active_work/CMakeLists.txt | 9 +++ .../tests/coro/active_work/active_work_test.cpp | 66 ++++++++++++++++++++++ vespalib/src/vespa/vespalib/coro/CMakeLists.txt | 1 + vespalib/src/vespa/vespalib/coro/active_work.cpp | 21 +++++++ vespalib/src/vespa/vespalib/coro/active_work.h | 42 ++++++++++++++ vespalib/src/vespa/vespalib/coro/lazy.h | 4 ++ 7 files changed, 144 insertions(+) create mode 100644 vespalib/src/tests/coro/active_work/CMakeLists.txt create mode 100644 vespalib/src/tests/coro/active_work/active_work_test.cpp create mode 100644 vespalib/src/vespa/vespalib/coro/active_work.cpp create mode 100644 vespalib/src/vespa/vespalib/coro/active_work.h diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index dc99146fd3f..6d19988b96b 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -46,6 +46,7 @@ vespa_define_module( src/tests/component src/tests/compress src/tests/compression + src/tests/coro/active_work src/tests/coro/async_io src/tests/coro/detached src/tests/coro/generator diff --git a/vespalib/src/tests/coro/active_work/CMakeLists.txt b/vespalib/src/tests/coro/active_work/CMakeLists.txt new file mode 100644 index 00000000000..b230e10dbc7 --- /dev/null +++ b/vespalib/src/tests/coro/active_work/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_active_work_test_app TEST + SOURCES + active_work_test.cpp + DEPENDS + vespalib + GTest::GTest +) +vespa_add_test(NAME vespalib_active_work_test_app COMMAND vespalib_active_work_test_app) diff --git a/vespalib/src/tests/coro/active_work/active_work_test.cpp b/vespalib/src/tests/coro/active_work/active_work_test.cpp new file mode 100644 index 00000000000..26c0c3dd71a --- /dev/null +++ b/vespalib/src/tests/coro/active_work/active_work_test.cpp @@ -0,0 +1,66 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include +#include +#include +#include +#include +#include +#include + +using namespace vespalib; +using namespace vespalib::coro; + +Lazy make_expensive_task(Executor &executor, int value) { + co_await schedule(executor); + auto cpu_cost = 20ms; + std::this_thread::sleep_for(cpu_cost); + co_return value; +} + +Lazy make_cheap_task(Executor &, int value) { + co_return value; +} + +Lazy concurrent_sum(Executor &executor, std::vector values, + std::function(Executor &,int)> make_task) +{ + std::vector> work; + for (int v: values) { + work.push_back(make_task(executor, v)); + } + ActiveWork active; + for (auto &task: work) { + active.start(task); + } + co_await active.join(); + int res = 0; + for (auto &task: work) { + res += co_await task; // await_ready == true + } + co_return res; +} + +TEST(ActiveWorkTest, run_expensive_subtasks_concurrently) { + vespalib::ThreadStackExecutor executor(8); + auto t0 = steady_clock::now(); + auto result = sync_wait(concurrent_sum(executor, {1, 2, 3, 4, 5, 6, 7, 8, + 9,10,11,12,13,14,15,16}, + make_expensive_task)); + auto td = steady_clock::now() - t0; + EXPECT_EQ(result, 136); + fprintf(stderr, "time spent: %zu ms\n", count_ms(td)); +} + +TEST(ActiveWorkTest, run_cheap_subtasks_concurrently) { + vespalib::ThreadStackExecutor executor(1); + auto t0 = steady_clock::now(); + auto result = sync_wait(concurrent_sum(executor, {1, 2, 3, 4, 5, 6, 7, 8, + 9,10,11,12,13,14,15,16}, + make_cheap_task)); + auto td = steady_clock::now() - t0; + EXPECT_EQ(result, 136); + fprintf(stderr, "time spent: %zu ms\n", count_ms(td)); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt index 8a7a0ade049..ed30f224eab 100644 --- a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(vespalib_vespalib_coro OBJECT SOURCES + active_work.cpp async_crypto_socket.cpp async_io.cpp DEPENDS diff --git a/vespalib/src/vespa/vespalib/coro/active_work.cpp b/vespalib/src/vespa/vespalib/coro/active_work.cpp new file mode 100644 index 00000000000..403da5173fe --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/active_work.cpp @@ -0,0 +1,21 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "active_work.h" +#include + +namespace vespalib::coro { + +bool +ActiveWork::join_awaiter::await_suspend(std::coroutine_handle<> handle) noexcept +{ + self._waiting = handle; + return (self._pending.fetch_sub(1, std::memory_order_acq_rel) > 1); +} + +ActiveWork::~ActiveWork() +{ + // NB: join must be called, even if there is no other work + assert(_pending.load(std::memory_order_relaxed) == 0); +} + +} diff --git a/vespalib/src/vespa/vespalib/coro/active_work.h b/vespalib/src/vespa/vespalib/coro/active_work.h new file mode 100644 index 00000000000..9f4079615c7 --- /dev/null +++ b/vespalib/src/vespa/vespalib/coro/active_work.h @@ -0,0 +1,42 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "lazy.h" +#include "detached.h" +#include +#include + +namespace vespalib::coro { + +// Tracks work that is being performed concurrently +class ActiveWork { +private: + std::atomic _pending; + std::coroutine_handle<> _waiting; + template + Detached signal_when_done(Lazy &lazy) { + co_await lazy.done(); + if (_pending.fetch_sub(1, std::memory_order_acq_rel) == 1) { + _waiting.resume(); + } + } + struct join_awaiter { + ActiveWork &self; + join_awaiter(ActiveWork &self_in) noexcept : self(self_in) {} + constexpr bool await_ready() const noexcept { return false; } + constexpr void await_resume() const noexcept {} + bool await_suspend(std::coroutine_handle<> handle) noexcept __attribute__((noinline)); + }; +public: + ActiveWork() : _pending(1), _waiting(std::noop_coroutine()) {} + ~ActiveWork(); + template + void start(Lazy &lazy) { + _pending.fetch_add(1, std::memory_order_relaxed); + signal_when_done(lazy); + } + auto join() noexcept { return join_awaiter(*this); } +}; + +} diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h index 17077dccc9f..87abb2e4f99 100644 --- a/vespalib/src/vespa/vespalib/coro/lazy.h +++ b/vespalib/src/vespa/vespalib/coro/lazy.h @@ -75,6 +75,9 @@ private: struct Result { static Received&& get(auto &&promise) { return std::move(promise.result); } }; + struct Nothing { + static void get(auto &&) {} + }; public: Lazy(const Lazy &) = delete; @@ -84,6 +87,7 @@ public: auto operator co_await() & noexcept { return WaitFor(_handle); } auto operator co_await() && noexcept { return WaitFor(_handle); } auto forward() noexcept { return WaitFor(_handle); } + auto done() noexcept { return WaitFor(_handle); } ~Lazy() { if (_handle) { _handle.destroy(); -- cgit v1.2.3