aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHåvard Pettersen <havardpe@yahooinc.com>2023-03-14 14:23:52 +0000
committerHåvard Pettersen <havardpe@yahooinc.com>2023-03-14 15:18:13 +0000
commit2556fff42957d5e0072fbcd13b329aab7e3231a0 (patch)
tree651ce4b9c7f9db9f8bed160589fa197b735f73dc
parent5f4bf637a8ede90fa66ecc18ed2864c0c826f80b (diff)
enable running sub-co-routines concurrently
-rw-r--r--vespalib/CMakeLists.txt1
-rw-r--r--vespalib/src/tests/coro/active_work/CMakeLists.txt9
-rw-r--r--vespalib/src/tests/coro/active_work/active_work_test.cpp66
-rw-r--r--vespalib/src/vespa/vespalib/coro/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/coro/active_work.cpp21
-rw-r--r--vespalib/src/vespa/vespalib/coro/active_work.h42
-rw-r--r--vespalib/src/vespa/vespalib/coro/lazy.h4
7 files changed, 144 insertions, 0 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index dc99146fd3f..6d19988b96b 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -46,6 +46,7 @@ vespa_define_module(
src/tests/component
src/tests/compress
src/tests/compression
+ src/tests/coro/active_work
src/tests/coro/async_io
src/tests/coro/detached
src/tests/coro/generator
diff --git a/vespalib/src/tests/coro/active_work/CMakeLists.txt b/vespalib/src/tests/coro/active_work/CMakeLists.txt
new file mode 100644
index 00000000000..b230e10dbc7
--- /dev/null
+++ b/vespalib/src/tests/coro/active_work/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_active_work_test_app TEST
+ SOURCES
+ active_work_test.cpp
+ DEPENDS
+ vespalib
+ GTest::GTest
+)
+vespa_add_test(NAME vespalib_active_work_test_app COMMAND vespalib_active_work_test_app)
diff --git a/vespalib/src/tests/coro/active_work/active_work_test.cpp b/vespalib/src/tests/coro/active_work/active_work_test.cpp
new file mode 100644
index 00000000000..26c0c3dd71a
--- /dev/null
+++ b/vespalib/src/tests/coro/active_work/active_work_test.cpp
@@ -0,0 +1,66 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/coro/lazy.h>
+#include <vespa/vespalib/coro/schedule.h>
+#include <vespa/vespalib/coro/completion.h>
+#include <vespa/vespalib/coro/active_work.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace vespalib;
+using namespace vespalib::coro;
+
+Lazy<int> make_expensive_task(Executor &executor, int value) {
+ co_await schedule(executor);
+ auto cpu_cost = 20ms;
+ std::this_thread::sleep_for(cpu_cost);
+ co_return value;
+}
+
+Lazy<int> make_cheap_task(Executor &, int value) {
+ co_return value;
+}
+
+Lazy<int> concurrent_sum(Executor &executor, std::vector<int> values,
+ std::function<Lazy<int>(Executor &,int)> make_task)
+{
+ std::vector<Lazy<int>> work;
+ for (int v: values) {
+ work.push_back(make_task(executor, v));
+ }
+ ActiveWork active;
+ for (auto &task: work) {
+ active.start(task);
+ }
+ co_await active.join();
+ int res = 0;
+ for (auto &task: work) {
+ res += co_await task; // await_ready == true
+ }
+ co_return res;
+}
+
+TEST(ActiveWorkTest, run_expensive_subtasks_concurrently) {
+ vespalib::ThreadStackExecutor executor(8);
+ auto t0 = steady_clock::now();
+ auto result = sync_wait(concurrent_sum(executor, {1, 2, 3, 4, 5, 6, 7, 8,
+ 9,10,11,12,13,14,15,16},
+ make_expensive_task));
+ auto td = steady_clock::now() - t0;
+ EXPECT_EQ(result, 136);
+ fprintf(stderr, "time spent: %zu ms\n", count_ms(td));
+}
+
+TEST(ActiveWorkTest, run_cheap_subtasks_concurrently) {
+ vespalib::ThreadStackExecutor executor(1);
+ auto t0 = steady_clock::now();
+ auto result = sync_wait(concurrent_sum(executor, {1, 2, 3, 4, 5, 6, 7, 8,
+ 9,10,11,12,13,14,15,16},
+ make_cheap_task));
+ auto td = steady_clock::now() - t0;
+ EXPECT_EQ(result, 136);
+ fprintf(stderr, "time spent: %zu ms\n", count_ms(td));
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt
index 8a7a0ade049..ed30f224eab 100644
--- a/vespalib/src/vespa/vespalib/coro/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/coro/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(vespalib_vespalib_coro OBJECT
SOURCES
+ active_work.cpp
async_crypto_socket.cpp
async_io.cpp
DEPENDS
diff --git a/vespalib/src/vespa/vespalib/coro/active_work.cpp b/vespalib/src/vespa/vespalib/coro/active_work.cpp
new file mode 100644
index 00000000000..403da5173fe
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/active_work.cpp
@@ -0,0 +1,21 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "active_work.h"
+#include <cassert>
+
+namespace vespalib::coro {
+
+bool
+ActiveWork::join_awaiter::await_suspend(std::coroutine_handle<> handle) noexcept
+{
+ self._waiting = handle;
+ return (self._pending.fetch_sub(1, std::memory_order_acq_rel) > 1);
+}
+
+ActiveWork::~ActiveWork()
+{
+ // NB: join must be called, even if there is no other work
+ assert(_pending.load(std::memory_order_relaxed) == 0);
+}
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/active_work.h b/vespalib/src/vespa/vespalib/coro/active_work.h
new file mode 100644
index 00000000000..9f4079615c7
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/coro/active_work.h
@@ -0,0 +1,42 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "lazy.h"
+#include "detached.h"
+#include <coroutine>
+#include <atomic>
+
+namespace vespalib::coro {
+
+// Tracks work that is being performed concurrently
+class ActiveWork {
+private:
+ std::atomic<uint32_t> _pending;
+ std::coroutine_handle<> _waiting;
+ template <typename T>
+ Detached signal_when_done(Lazy<T> &lazy) {
+ co_await lazy.done();
+ if (_pending.fetch_sub(1, std::memory_order_acq_rel) == 1) {
+ _waiting.resume();
+ }
+ }
+ struct join_awaiter {
+ ActiveWork &self;
+ join_awaiter(ActiveWork &self_in) noexcept : self(self_in) {}
+ constexpr bool await_ready() const noexcept { return false; }
+ constexpr void await_resume() const noexcept {}
+ bool await_suspend(std::coroutine_handle<> handle) noexcept __attribute__((noinline));
+ };
+public:
+ ActiveWork() : _pending(1), _waiting(std::noop_coroutine()) {}
+ ~ActiveWork();
+ template <typename T>
+ void start(Lazy<T> &lazy) {
+ _pending.fetch_add(1, std::memory_order_relaxed);
+ signal_when_done(lazy);
+ }
+ auto join() noexcept { return join_awaiter(*this); }
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/coro/lazy.h b/vespalib/src/vespa/vespalib/coro/lazy.h
index 17077dccc9f..87abb2e4f99 100644
--- a/vespalib/src/vespa/vespalib/coro/lazy.h
+++ b/vespalib/src/vespa/vespalib/coro/lazy.h
@@ -75,6 +75,9 @@ private:
struct Result {
static Received<T>&& get(auto &&promise) { return std::move(promise.result); }
};
+ struct Nothing {
+ static void get(auto &&) {}
+ };
public:
Lazy(const Lazy &) = delete;
@@ -84,6 +87,7 @@ public:
auto operator co_await() & noexcept { return WaitFor<LValue>(_handle); }
auto operator co_await() && noexcept { return WaitFor<RValue>(_handle); }
auto forward() noexcept { return WaitFor<Result>(_handle); }
+ auto done() noexcept { return WaitFor<Nothing>(_handle); }
~Lazy() {
if (_handle) {
_handle.destroy();