diff options
author | Håvard Pettersen <havardpe@yahooinc.com> | 2023-05-11 10:03:18 +0000 |
---|---|---|
committer | Håvard Pettersen <havardpe@yahooinc.com> | 2023-06-16 13:44:09 +0000 |
commit | bc79f96ee8f867bdd1ddf4be756c6c24c96f9a14 (patch) | |
tree | b1ba251507fc4a4f07c858541cc925c545fdeb3a | |
parent | c483f99e61db99228262b72734a0417058dea208 (diff) |
rw spin lock
still only experimental; both the lock itself and its benchmarking
spin-off: Nexus utility for multi-threaded testing and benchmarking
-rw-r--r-- | vespalib/CMakeLists.txt | 2 | ||||
-rw-r--r-- | vespalib/src/tests/nexus/CMakeLists.txt | 9 | ||||
-rw-r--r-- | vespalib/src/tests/nexus/nexus_test.cpp | 86 | ||||
-rw-r--r-- | vespalib/src/tests/rw_spin_lock/CMakeLists.txt | 9 | ||||
-rw-r--r-- | vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp | 323 | ||||
-rw-r--r-- | vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp | 41 | ||||
-rw-r--r-- | vespalib/src/tests/spin_lock/spin_lock_test.cpp | 2 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/CMakeLists.txt | 1 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/nexus.cpp | 15 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/nexus.h | 84 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/thread_meets.cpp | 31 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/test/thread_meets.h | 63 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/rendezvous.h | 13 | ||||
-rw-r--r-- | vespalib/src/vespa/vespalib/util/rw_spin_lock.h | 189 |
14 files changed, 819 insertions, 49 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt index d2892ee4429..c1d7e17b457 100644 --- a/vespalib/CMakeLists.txt +++ b/vespalib/CMakeLists.txt @@ -126,6 +126,7 @@ vespa_define_module( src/tests/net/tls/policy_checking_certificate_verifier src/tests/net/tls/protocol_snooping src/tests/net/tls/transport_options + src/tests/nexus src/tests/nice src/tests/objects/identifiable src/tests/objects/nbostream @@ -149,6 +150,7 @@ vespa_define_module( src/tests/require src/tests/runnable_pair src/tests/rusage + src/tests/rw_spin_lock src/tests/sequencedtaskexecutor src/tests/sha1 src/tests/shared_operation_throttler diff --git a/vespalib/src/tests/nexus/CMakeLists.txt b/vespalib/src/tests/nexus/CMakeLists.txt new file mode 100644 index 00000000000..4b1b4bc9c25 --- /dev/null +++ b/vespalib/src/tests/nexus/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_nexus_test_app TEST + SOURCES + nexus_test.cpp + DEPENDS + vespalib + GTest::GTest +) +vespa_add_test(NAME vespalib_nexus_test_app COMMAND vespalib_nexus_test_app) diff --git a/vespalib/src/tests/nexus/nexus_test.cpp b/vespalib/src/tests/nexus/nexus_test.cpp new file mode 100644 index 00000000000..8ed9a8f58cc --- /dev/null +++ b/vespalib/src/tests/nexus/nexus_test.cpp @@ -0,0 +1,86 @@ +// Copyright Yahoo. Licensd under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/test/nexus.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/require.h> + +using namespace vespalib::test; + +TEST(NexusTest, run_void_tasks) { + std::atomic<size_t> value; + auto task = [&value](Nexus &) { + value.fetch_add(1, std::memory_order_relaxed); + }; + Nexus ctx(10); + ctx.run(task); + EXPECT_EQ(value, 10); + ctx.run(task); + EXPECT_EQ(value, 20); +} + +TEST(NexusTest, run_value_tasks_select_thread_0) { + std::atomic<size_t> value; + auto task = [&value](Nexus &ctx) { + value.fetch_add(1, std::memory_order_relaxed); + return ctx.thread_id() + 5; + }; + Nexus ctx(10); + EXPECT_EQ(ctx.run(task), 5); + EXPECT_EQ(value, 10); +} + +TEST(NexusTest, run_value_tasks_merge_results) { + std::atomic<size_t> value; + auto task = [&value](Nexus &) { + return value.fetch_add(1, std::memory_order_relaxed) + 1; + }; + Nexus ctx(10); + EXPECT_EQ(ctx.run(task, Nexus::merge_sum()), 55); + EXPECT_EQ(value, 10); +} + +TEST(NexusTest, run_inline_voted_loop) { + auto res = Nexus(9).run([](Nexus &ctx) { + size_t times = 0; + for (size_t i = 0; ctx.vote(i < ctx.thread_id()); ++i) { + ++times; + } + return times; + }, [](auto a, auto b){ EXPECT_EQ(a, b); return a; }); + EXPECT_EQ(res, 4); +} + +TEST(NexusTest, run_return_type_decay) { + int value = 3; + auto task = [&](Nexus &)->int&{ return value; }; + Nexus ctx(3); + auto res = ctx.run(task); + EXPECT_EQ(res, 3); + EXPECT_EQ(std::addressof(value), std::addressof(task(ctx))); + using task_res_t = decltype(task(ctx)); + using run_res_t = decltype(ctx.run(task)); + static_assert(std::same_as<task_res_t, int&>); + static_assert(std::same_as<run_res_t, int>); +} + +TEST(NexusTest, example_multi_threaded_unit_test) { + int a = 0; + int b = 0; + auto work = [&](Nexus &ctx) { + EXPECT_EQ(ctx.num_threads(2)); + if (ctx.thread_id() == 0) { + a = 5; + ctx.barrier(); + EXPECT_EQ(b, 7); + } else { + b = 7; + ctx.barrier(); + EXPECT_EQ(a, 5); + } + }; + Nexus(2).run(work); + EXPECT_EQ(a, 5); + EXPECT_EQ(b, 7); +} + +GTEST_MAIN_RUN_ALL_TESTS() diff --git a/vespalib/src/tests/rw_spin_lock/CMakeLists.txt b/vespalib/src/tests/rw_spin_lock/CMakeLists.txt new file mode 100644 index 00000000000..76bcb918ce9 --- /dev/null +++ b/vespalib/src/tests/rw_spin_lock/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(vespalib_rw_spin_lock_test_app TEST + SOURCES + rw_spin_lock_test.cpp + DEPENDS + vespalib + GTest::GTest +) +vespa_add_test(NAME vespalib_rw_spin_lock_test_app NO_VALGRIND COMMAND vespalib_rw_spin_lock_test_app) diff --git a/vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp b/vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp new file mode 100644 index 00000000000..50621338d8c --- /dev/null +++ b/vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp @@ -0,0 +1,323 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/vespalib/util/spin_lock.h> +#include <vespa/vespalib/util/rw_spin_lock.h> +#include <vespa/vespalib/util/time.h> +#include <vespa/vespalib/util/classname.h> +#include <vespa/vespalib/test/thread_meets.h> +#include <vespa/vespalib/test/nexus.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <type_traits> +#include <ranges> +#include <random> +#include <array> + +using namespace vespalib; +using namespace vespalib::test; + +bool bench = false; +duration budget = 250ms; +constexpr size_t LOOP_CNT = 4096; +size_t thread_safety_work = 1'000'000; +size_t state_loop = 1; + +//----------------------------------------------------------------------------- + +struct DummyLock { + constexpr DummyLock() noexcept {} + // BasicLockable + constexpr void lock() noexcept {} + constexpr void unlock() noexcept {} + // SharedLockable + constexpr void lock_shared() noexcept {} + [[nodiscard]] constexpr bool try_lock_shared() noexcept { return true; } + constexpr void unlock_shared() noexcept {} + // rw_upgrade_downgrade_lock + [[nodiscard]] constexpr bool try_convert_read_to_write() noexcept { return true; } + constexpr void convert_write_to_read() noexcept {} +}; + +//----------------------------------------------------------------------------- + +struct MyState { + static constexpr size_t SZ = 5; + std::array<std::atomic<size_t>,SZ> state = {0,0,0,0,0}; + std::atomic<size_t> inconsistent_reads = 0; + std::atomic<size_t> expected_writes = 0; + [[nodiscard]] size_t update() { + std::array<size_t,SZ> tmp; + for (size_t i = 0; i < SZ; ++i) { + tmp[i] = state[i].load(std::memory_order_relaxed); + } + for (size_t n = 0; n < state_loop; ++n) { + for (size_t i = 0; i < SZ; ++i) { + state[i].store(tmp[i] + 1, std::memory_order_relaxed); + } + } + return 1; + } + [[nodiscard]] size_t peek() { + size_t my_inconsistent_reads = 0; + std::array<size_t,SZ> tmp; + for (size_t i = 0; i < SZ; ++i) { + tmp[i] = state[i].load(std::memory_order_relaxed); + } + for (size_t n = 0; n < state_loop; ++n) { + for (size_t i = 0; i < SZ; ++i) { + if (state[i].load(std::memory_order_relaxed) != tmp[i]) [[unlikely]] { + ++my_inconsistent_reads; + } + } + } + return my_inconsistent_reads; + } + void commit_inconsistent_reads(size_t n) { + inconsistent_reads.fetch_add(n, std::memory_order_relaxed); + } + void commit_expected_writes(size_t n) { + expected_writes.fetch_add(n, std::memory_order_relaxed); + } + [[nodiscard]] bool check() const { + if (inconsistent_reads > 0) { + return false; + } + for (const auto& value: state) { + if (value != expected_writes) { + return false; + } + } + return true; + } + void report(const char *name) const { + if (check()) { + fprintf(stderr, "%s is thread safe\n", name); + } else { + fprintf(stderr, "%s is not thread safe\n", name); + fprintf(stderr, " inconsistent reads: %zu\n", inconsistent_reads.load()); + fprintf(stderr, " expected %zu, got [%zu,%zu,%zu,%zu,%zu]\n", + expected_writes.load(), state[0].load(), state[1].load(), state[2].load(), state[3].load(), state[4].load()); + } + } +}; + +// random generator used to make per-thread decisions +class Rnd { +private: + std::mt19937 _engine; + std::uniform_int_distribution<int> _dist; +public: + Rnd(uint32_t seed) : _engine(seed), _dist(0,9999) {} + bool operator()(int bp) { return _dist(_engine) < bp; } +}; + +//----------------------------------------------------------------------------- + +template<typename T> +concept basic_lockable = requires(T a) { + { a.lock() } -> std::same_as<void>; + { a.unlock() } -> std::same_as<void>; +}; + +template<typename T> +concept lockable = requires(T a) { + { a.try_lock() } -> std::same_as<bool>; + { a.lock() } -> std::same_as<void>; + { a.unlock() } -> std::same_as<void>; +}; + +template<typename T> +concept shared_lockable = requires(T a) { + { a.try_lock_shared() } -> std::same_as<bool>; + { a.lock_shared() } -> std::same_as<void>; + { a.unlock_shared() } -> std::same_as<void>; +}; + +template<typename T> +concept can_upgrade = requires(std::shared_lock<T> a, std::unique_lock<T> b) { + { try_upgrade(std::move(a)) } -> std::same_as<std::unique_lock<T>>; + { downgrade(std::move(b)) } -> std::same_as<std::shared_lock<T>>; +}; + +//----------------------------------------------------------------------------- + +template <size_t N> +auto run_loop(auto &f) { + static_assert(N % 4 == 0); + for (size_t i = 0; i < N / 4; ++i) { + f(); f(); f(); f(); + } +} + +double measure_ns(auto &work) __attribute__((noinline)); +double measure_ns(auto &work) { + constexpr double factor = LOOP_CNT; + auto t0 = steady_clock::now(); + run_loop<LOOP_CNT>(work); + return count_ns(steady_clock::now() - t0) / factor; +} + +struct BenchmarkResult { + double cost_ns; + double range_ns; + BenchmarkResult() + : cost_ns(std::numeric_limits<double>::max()), range_ns(0.0) {} + BenchmarkResult(double cost_ns_in, double range_ns_in) + : cost_ns(cost_ns_in), range_ns(range_ns_in) {} +}; + +struct Meets { + vespalib::test::ThreadMeets::Avg avg; + vespalib::test::ThreadMeets::Range<double> range; + Meets(size_t num_threads) : avg(num_threads), range(num_threads) {} +}; + +BenchmarkResult benchmark_ns(auto &&work, size_t num_threads = 1) { + Meets meets(num_threads); + auto entry = [&](Nexus &ctx) { + Timer timer; + BenchmarkResult result; + for (bool once_more = true; ctx.vote(once_more); once_more = (timer.elapsed() < budget)) { + auto my_ns = measure_ns(work); + auto cost_ns = meets.avg(my_ns); + auto range_ns = meets.range(my_ns); + if (cost_ns < result.cost_ns) { + result.cost_ns = cost_ns; + result.range_ns = range_ns; + } + } + return result; + }; + return Nexus(num_threads).run(entry); +} + +//----------------------------------------------------------------------------- + +template <typename T> +void estimate_cost() { + T lock; + auto name = getClassName(lock); + static_assert(basic_lockable<T>); + fprintf(stderr, "%s exclusive lock/unlock: %g ns\n", name.c_str(), + benchmark_ns([&lock]{ lock.lock(); lock.unlock(); }).cost_ns); + if constexpr (shared_lockable<T>) { + fprintf(stderr, "%s shared lock/unlock: %g ns\n", name.c_str(), + benchmark_ns([&lock]{ lock.lock_shared(); lock.unlock_shared(); }).cost_ns); + } + if constexpr (can_upgrade<T>) { + auto guard = std::shared_lock(lock); + fprintf(stderr, "%s upgrade/downgrade: %g ns\n", name.c_str(), + benchmark_ns([&lock]{ + assert(lock.try_convert_read_to_write()); + lock.convert_write_to_read(); + }).cost_ns); + } +} + +//----------------------------------------------------------------------------- + +template <typename T> +void thread_safety_loop(Nexus &ctx, T &lock, MyState &state, Meets &meets, int read_bp) { + Rnd rnd(ctx.thread_id()); + size_t write_cnt = 0; + size_t bad_reads = 0; + size_t loop_cnt = thread_safety_work / ctx.num_threads(); + ctx.barrier(); + auto t0 = steady_clock::now(); + for (size_t i = 0; i < loop_cnt; ++i) { + if (rnd(read_bp)) { + if constexpr (shared_lockable<T>) { + std::shared_lock guard(lock); + bad_reads += state.peek(); + } else { + std::lock_guard guard(lock); + bad_reads += state.peek(); + } + } else { + { + std::lock_guard guard(lock); + write_cnt += state.update(); + } + } + } + auto t1 = steady_clock::now(); + ctx.barrier(); + auto t2 = steady_clock::now(); + auto my_ms = count_ns(t1 - t0) / 1'000'000.0; + auto total_ms = count_ns(t2 - t0) / 1'000'000.0; + auto cost_ms = meets.avg(my_ms); + auto range_ms = meets.range(my_ms); + if (ctx.thread_id() == 0) { + fprintf(stderr, "---> %s with %2zu threads (%5d bp r): avg: %10.2f ms, range: %10.2f ms, max: %10.2f ms\n", + getClassName(lock).c_str(), ctx.num_threads(), read_bp, cost_ms, range_ms, total_ms); + } + state.commit_inconsistent_reads(bad_reads); + state.commit_expected_writes(write_cnt); +} + +//----------------------------------------------------------------------------- + +TEST(RWSpinLockTest, different_guards_work_with_rw_spin_lock) { + static_assert(basic_lockable<RWSpinLock>); + static_assert(lockable<RWSpinLock>); + static_assert(shared_lockable<RWSpinLock>); + static_assert(can_upgrade<RWSpinLock>); + RWSpinLock lock; + { auto guard = std::lock_guard(lock); } + { auto guard = std::unique_lock(lock); } + { auto guard = std::shared_lock(lock); } +} + +TEST(RWSpinLockTest, estimate_basic_costs) { + Rnd rnd(123); + MyState state; + fprintf(stderr, " rnd cost: %8.2f ns\n", benchmark_ns([&]{ rnd(50); }).cost_ns); + fprintf(stderr, " peek cost: %8.2f ns\n", benchmark_ns([&]{ (void) state.peek(); }).cost_ns); + fprintf(stderr, "update cost: %8.2f ns\n", benchmark_ns([&]{ (void) state.update(); }).cost_ns); +} + +template <typename T> +void benchmark_lock() { + auto lock = std::make_unique<T>(); + auto state = std::make_unique<MyState>(); + for (size_t bp: {10000, 9999, 5000, 0}) { + for (size_t num_threads: {8, 4, 2, 1}) { + if (bench || (bp == 9999 && num_threads == 8)) { + Meets meets(num_threads); + Nexus(num_threads).run([&](Nexus &ctx) { + thread_safety_loop(ctx, *lock, *state, meets, bp); + }); + } + } + } + state->report(getClassName(*lock).c_str()); + if (!std::same_as<T,DummyLock>) { + EXPECT_TRUE(state->check()); + } +} + +TEST(RWSpinLockTest, benchmark_dummy_lock) { benchmark_lock<DummyLock>(); } +TEST(RWSpinLockTest, benchmark_rw_spin_lock) { benchmark_lock<RWSpinLock>(); } +TEST(RWSpinLockTest, benchmark_shared_mutex) { benchmark_lock<std::shared_mutex>(); } +TEST(RWSpinLockTest, benchmark_mutex) { benchmark_lock<std::mutex>(); } +TEST(RWSpinLockTest, benchmark_spin_lock) { benchmark_lock<SpinLock>(); } + +TEST(RWSpinLockTest, estimate_single_threaded_costs) { + estimate_cost<DummyLock>(); + estimate_cost<SpinLock>(); + estimate_cost<std::mutex>(); + estimate_cost<RWSpinLock>(); + estimate_cost<std::shared_mutex>(); +} + +int main(int argc, char **argv) { + if (argc > 1 && (argv[1] == std::string("bench"))) { + bench = true; + budget = 5s; + state_loop = 1024; + fprintf(stderr, "running in benchmarking mode\n"); + ++argv; + --argc; + } + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp index dfcba14ba63..910c2d017ba 100644 --- a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp +++ b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/vespalib/util/shared_string_repo.h> -#include <vespa/vespalib/util/rendezvous.h> +#include <vespa/vespalib/test/thread_meets.h> #include <vespa/vespalib/util/time.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/vespalib/util/stringfmt.h> @@ -115,41 +115,8 @@ std::unique_ptr<StringIdVector> make_weak_handles(const Handles &handles) { //----------------------------------------------------------------------------- -struct Avg : Rendezvous<double, double> { - explicit Avg(size_t n) : Rendezvous<double, double>(n) {} - void mingle() override { - double sum = 0; - for (size_t i = 0; i < size(); ++i) { - sum += in(i); - } - double result = sum / size(); - for (size_t i = 0; i < size(); ++i) { - out(i) = result; - } - } - double operator()(double value) { return rendezvous(value); } -}; - -struct Vote : Rendezvous<bool, bool> { - explicit Vote(size_t n) : Rendezvous<bool, bool>(n) {} - void mingle() override { - size_t true_cnt = 0; - size_t false_cnt = 0; - for (size_t i = 0; i < size(); ++i) { - if (in(i)) { - ++true_cnt; - } else { - ++false_cnt; - } - } - bool result = (true_cnt > false_cnt); - for (size_t i = 0; i < size(); ++i) { - out(i) = result; - } - } - [[nodiscard]] size_t num_threads() const { return size(); } - bool operator()(bool flag) { return rendezvous(flag); } -}; +using Avg = vespalib::test::ThreadMeets::Avg; +using Vote = vespalib::test::ThreadMeets::Vote; //----------------------------------------------------------------------------- @@ -174,7 +141,7 @@ struct Fixture { : avg(num_threads), vote(num_threads), work(make_strings(work_size)), direct_work(make_direct_strings(work_size)), start_time(steady_clock::now()) {} ~Fixture() { if (verbose) { - fprintf(stderr, "benchmark results for %zu threads:\n", vote.num_threads()); + fprintf(stderr, "benchmark results for %zu threads:\n", vote.size()); for (const auto &[tag, ms_cost]: time_ms) { fprintf(stderr, " %s: %g ms\n", tag.c_str(), ms_cost); } diff --git a/vespalib/src/tests/spin_lock/spin_lock_test.cpp b/vespalib/src/tests/spin_lock/spin_lock_test.cpp index 78e35a3e8d1..84044bfabcf 100644 --- a/vespalib/src/tests/spin_lock/spin_lock_test.cpp +++ b/vespalib/src/tests/spin_lock/spin_lock_test.cpp @@ -77,8 +77,8 @@ template <typename T> size_t thread_safety_loop(T &lock, MyState &state, size_t state.update(); } } - auto t1 = steady_clock::now(); TEST_BARRIER(); + auto t1 = steady_clock::now(); if (thread_id == 0) { auto t2 = steady_clock::now(); size_t total_ms = count_ms(t2 - t0); diff --git a/vespalib/src/vespa/vespalib/test/CMakeLists.txt b/vespalib/src/vespa/vespalib/test/CMakeLists.txt index a60eb15a4d4..02ce1ba3416 100644 --- a/vespalib/src/vespa/vespalib/test/CMakeLists.txt +++ b/vespalib/src/vespa/vespalib/test/CMakeLists.txt @@ -3,6 +3,7 @@ vespa_add_library(vespalib_vespalib_test OBJECT SOURCES make_tls_options_for_testing.cpp memory_allocator_observer.cpp + nexus.cpp peer_policy_utils.cpp thread_meets.cpp time_tracer.cpp diff --git a/vespalib/src/vespa/vespalib/test/nexus.cpp b/vespalib/src/vespa/vespalib/test/nexus.cpp new file mode 100644 index 00000000000..b5d7b194576 --- /dev/null +++ b/vespalib/src/vespa/vespalib/test/nexus.cpp @@ -0,0 +1,15 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "nexus.h" + +namespace vespalib::test { + +size_t & +Nexus::my_thread_id() { + thread_local size_t thread_id = invalid_thread_id; + return thread_id; +} + +Nexus::~Nexus() = default; + +} diff --git a/vespalib/src/vespa/vespalib/test/nexus.h b/vespalib/src/vespa/vespalib/test/nexus.h new file mode 100644 index 00000000000..aeb9337b975 --- /dev/null +++ b/vespalib/src/vespa/vespalib/test/nexus.h @@ -0,0 +1,84 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "thread_meets.h" +#include <vespa/vespalib/util/thread.h> +#include <vespa/vespalib/util/require.h> +#include <optional> +#include <variant> + +namespace vespalib::test { + +class Nexus; +template <typename T> +concept nexus_thread_entry = requires(Nexus &ctx, T &&entry) { + entry(ctx); +}; + +/** + * Utility intended to make it easier to write multi-threaded code for + * testing and benchmarking. + **/ +class Nexus +{ +private: + vespalib::test::ThreadMeets::Vote _vote; + static size_t &my_thread_id(); +public: + constexpr static size_t invalid_thread_id = -1; + Nexus(size_t num_threads) noexcept : _vote(num_threads) {} + size_t num_threads() const noexcept { return _vote.size(); } + size_t thread_id() const noexcept { return my_thread_id(); } + bool vote(bool my_vote) { return _vote(my_vote); } + void barrier() { REQUIRE_EQ(_vote(true), true); } + struct select_thread_0 {}; + constexpr static auto merge_sum() { return [](auto a, auto b){ return a + b; }; } + auto run(auto &&entry, auto &&merge) requires nexus_thread_entry<decltype(entry)> { + ThreadPool pool; + using result_t = std::decay_t<decltype(entry(std::declval<Nexus&>()))>; + constexpr bool is_void = std::same_as<result_t, void>; + using stored_t = std::conditional<is_void, std::monostate, result_t>::type; + std::mutex lock; + std::optional<stored_t> result; + auto handle_result = [&](stored_t thread_result) noexcept { + if constexpr (std::same_as<std::decay_t<decltype(merge)>,select_thread_0>) { + if (thread_id() == 0) { + result = std::move(thread_result); + } + } else { + std::lock_guard guard(lock); + if (result.has_value()) { + result = merge(std::move(result).value(), + std::move(thread_result)); + } else { + result = std::move(thread_result); + } + } + }; + auto thread_main = [&](size_t thread_id) noexcept { + size_t old_thread_id = my_thread_id(); + my_thread_id() = thread_id; + if constexpr (is_void) { + entry(*this); + } else { + handle_result(entry(*this)); + } + my_thread_id() = old_thread_id; + }; + for (size_t i = 1; i < num_threads(); ++i) { + pool.start([i,&thread_main]() noexcept { thread_main(i); }); + } + thread_main(0); + pool.join(); + if constexpr (!is_void) { + return std::move(result).value(); + } + } + auto run(auto &&entry) requires nexus_thread_entry<decltype(entry)> { + return run(std::forward<decltype(entry)>(entry), select_thread_0{}); + } + ~Nexus(); +}; + +} diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.cpp b/vespalib/src/vespa/vespalib/test/thread_meets.cpp index 9d23e0eab28..607179c53f9 100644 --- a/vespalib/src/vespa/vespalib/test/thread_meets.cpp +++ b/vespalib/src/vespa/vespalib/test/thread_meets.cpp @@ -9,4 +9,35 @@ ThreadMeets::Nop::mingle() { } +void +ThreadMeets::Avg::mingle() +{ + double sum = 0; + for (size_t i = 0; i < size(); ++i) { + sum += in(i); + } + double result = sum / size(); + for (size_t i = 0; i < size(); ++i) { + out(i) = result; + } +} + +void +ThreadMeets::Vote::mingle() +{ + size_t true_cnt = 0; + size_t false_cnt = 0; + for (size_t i = 0; i < size(); ++i) { + if (in(i)) { + ++true_cnt; + } else { + ++false_cnt; + } + } + bool result = (true_cnt > false_cnt); + for (size_t i = 0; i < size(); ++i) { + out(i) = result; + } +} + } diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.h b/vespalib/src/vespa/vespalib/test/thread_meets.h index 62ca7779935..7ef4dcb9921 100644 --- a/vespalib/src/vespa/vespalib/test/thread_meets.h +++ b/vespalib/src/vespa/vespalib/test/thread_meets.h @@ -12,10 +12,67 @@ namespace vespalib::test { struct ThreadMeets { // can be used as a simple thread barrier struct Nop : vespalib::Rendezvous<bool,bool> { - Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {} + explicit Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {} void operator()() { rendezvous(false); } void mingle() override; }; + // calculate the average value across threads + struct Avg : Rendezvous<double, double> { + explicit Avg(size_t n) : Rendezvous<double, double>(n) {} + double operator()(double value) { return rendezvous(value); } + void mingle() override; + }; + // threads vote for true/false, majority wins (false on tie) + struct Vote : Rendezvous<bool, bool> { + explicit Vote(size_t n) : Rendezvous<bool, bool>(n) {} + bool operator()(bool flag) { return rendezvous(flag); } + void mingle() override; + }; + // sum of values across all threads + template <typename T> + struct Sum : vespalib::Rendezvous<T,T> { + using vespalib::Rendezvous<T,T>::in; + using vespalib::Rendezvous<T,T>::out; + using vespalib::Rendezvous<T,T>::size; + using vespalib::Rendezvous<T,T>::rendezvous; + explicit Sum(size_t N) : vespalib::Rendezvous<T,T>(N) {} + T operator()(T value) { return rendezvous(value); } + void mingle() override { + T acc{}; + for (size_t i = 0; i < size(); ++i) { + acc += in(i); + } + for (size_t i = 0; i < size(); ++i) { + out(i) = acc; + } + } + }; + // range of values across all threads + template <typename T> + struct Range : vespalib::Rendezvous<T,T> { + using vespalib::Rendezvous<T,T>::in; + using vespalib::Rendezvous<T,T>::out; + using vespalib::Rendezvous<T,T>::size; + using vespalib::Rendezvous<T,T>::rendezvous; + explicit Range(size_t N) : vespalib::Rendezvous<T,T>(N) {} + T operator()(T value) { return rendezvous(value); } + void mingle() override { + T min = in(0); + T max = in(0); + for (size_t i = 1; i < size(); ++i) { + if (in(i) < min) { + min = in(i); + } + if (in(i) > max) { + max = in(i); + } + } + T result = (max - min); + for (size_t i = 0; i < size(); ++i) { + out(i) = result; + } + } + }; // swap values between 2 threads template <typename T> struct Swap : vespalib::Rendezvous<T,T> { @@ -25,8 +82,8 @@ struct ThreadMeets { Swap() : vespalib::Rendezvous<T,T>(2) {} T operator()(T input) { return rendezvous(input); } void mingle() override { - out(1) = in(0); - out(0) = in(1); + out(1) = std::move(in(0)); + out(0) = std::move(in(1)); } }; }; diff --git a/vespalib/src/vespa/vespalib/util/rendezvous.h b/vespalib/src/vespa/vespalib/util/rendezvous.h index 2880f325d96..17a8729c54c 100644 --- a/vespalib/src/vespa/vespalib/util/rendezvous.h +++ b/vespalib/src/vespa/vespalib/util/rendezvous.h @@ -50,14 +50,6 @@ private: protected: /** - * Obtain the number of input and output values to be handled by - * mingle. This function is called by mingle. - * - * @return number of input and output values - **/ - size_t size() const { return _size; } - - /** * Obtain an input parameter. This function is called by mingle. * * @return reference to the appropriate input @@ -87,6 +79,11 @@ public: virtual ~Rendezvous(); /** + * @return number of participants + **/ + size_t size() const { return _size; } + + /** * Called by individual threads to synchronize execution and share * state with the mingle function. * diff --git a/vespalib/src/vespa/vespalib/util/rw_spin_lock.h b/vespalib/src/vespa/vespalib/util/rw_spin_lock.h new file mode 100644 index 00000000000..f2c15dcc0eb --- /dev/null +++ b/vespalib/src/vespa/vespalib/util/rw_spin_lock.h @@ -0,0 +1,189 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <mutex> +#include <shared_mutex> +#include <atomic> +#include <thread> +#include <cassert> +#include <utility> + +namespace vespalib { + +/** + * A reader-writer spin lock implementation. + * + * reader: shared access for any number of readers + * writer: exclusive access for a single writer + * + * valid lock combinations: + * {} + * {N readers} + * {1 writer} + * + * Trying to obtain a write lock will lead to not granting new read + * locks. + * + * This lock is intended for use-cases that involves mostly reading, + * with a little bit of writing. + * + * This class implements the Lockable and SharedLockable named + * requirements from the standard library, making it directly usable + * with std::shared_lock (reader) and std::unique_lock (writer) + * + * There is also some special glue added for lock upgrading and + * downgrading. + * + * NOTE: this implementation is experimental, mostly intended for + * benchmarking and trying to identify use-cases that work with + * rw locks. Upgrade locks that do not block readers might be + * implementet in the future. + **/ +class RWSpinLock { +private: + // [31: num readers][1: pending writer] + // a reader gets the lock by: + // increasing the number of readers while the pending writer bit is not set. + // a writer gets the lock by: + // changing the pending writer bit from 0 to 1 and then + // waiting for the number of readers to become 0 + // an upgrade is successful when: + // a reader is able to obtain the pending writer bit + std::atomic<uint32_t> _state; + + // Convenience function used to check if the pending writer bit is + // set in the given value. + bool has_pending_writer(uint32_t value) noexcept { + return (value & 1); + } + + // Wait for all readers to release their locks. + void wait_for_zero_readers(uint32_t &value) { + while (value != 1) { + std::this_thread::yield(); + value = _state.load(std::memory_order_acquire); + } + } + +public: + RWSpinLock() noexcept : _state(0) { + static_assert(std::atomic<uint32_t>::is_always_lock_free); + } + + // implementation of Lockable named requirement - vvv + + void lock() noexcept { + uint32_t expected = 0; + uint32_t desired = 1; + while (!_state.compare_exchange_weak(expected, desired, + std::memory_order_acquire, + std::memory_order_relaxed)) + { + while (has_pending_writer(expected)) { + std::this_thread::yield(); + expected = _state.load(std::memory_order_relaxed); + } + desired = expected + 1; + } + wait_for_zero_readers(desired); + } + + [[nodiscard]] bool try_lock() noexcept { + uint32_t expected = 0; + return _state.compare_exchange_strong(expected, 1, + std::memory_order_acquire, + std::memory_order_relaxed); + } + + void unlock() noexcept { + _state.store(0, std::memory_order_release); + } + + // implementation of Lockable named requirement - ^^^ + + // implementation of SharedLockable named requirement - vvv + + void lock_shared() noexcept { + uint32_t expected = 0; + uint32_t desired = 2; + while (!_state.compare_exchange_weak(expected, desired, + std::memory_order_acquire, + std::memory_order_relaxed)) + { + while (has_pending_writer(expected)) { + std::this_thread::yield(); + expected = _state.load(std::memory_order_relaxed); + } + desired = expected + 2; + } + } + + [[nodiscard]] bool try_lock_shared() noexcept { + uint32_t expected = 0; + uint32_t desired = 2; + while (!_state.compare_exchange_weak(expected, desired, + std::memory_order_acquire, + std::memory_order_relaxed)) + { + if (has_pending_writer(expected)) { + return false; + } + desired = expected + 2; + } + return true; + } + + void unlock_shared() noexcept { + _state.fetch_sub(2, std::memory_order_release); + } + + // implementation of SharedLockable named requirement - ^^^ + + // try to upgrade a read (shared) lock to a write (unique) lock + bool try_convert_read_to_write() noexcept { + uint32_t expected = 2; + uint32_t desired = 1; + while (!_state.compare_exchange_weak(expected, desired, + std::memory_order_acquire, + std::memory_order_relaxed)) + { + if (has_pending_writer(expected)) { + return false; + } + desired = expected - 1; + } + wait_for_zero_readers(desired); + return true; + } + + // convert a write (unique) lock to a read (shared) lock + void convert_write_to_read() noexcept { + _state.store(2, std::memory_order_release); + } +}; + +template<typename T> +concept rw_upgrade_downgrade_lock = requires(T a, T b) { + { a.try_convert_read_to_write() } -> std::same_as<bool>; + { b.convert_write_to_read() } -> std::same_as<void>; +}; + +template <rw_upgrade_downgrade_lock T> +[[nodiscard]] std::unique_lock<T> try_upgrade(std::shared_lock<T> &&guard) noexcept { + assert(guard.owns_lock()); + if (guard.mutex()->try_convert_read_to_write()) { + return {*guard.release(), std::adopt_lock}; + } else { + return {}; + } +} + +template <rw_upgrade_downgrade_lock T> +[[nodiscard]] std::shared_lock<T> downgrade(std::unique_lock<T> &&guard) noexcept { + assert(guard.owns_lock()); + guard.mutex()->convert_write_to_read(); + return {*guard.release(), std::adopt_lock}; +} + +} |