aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHÃ¥vard Pettersen <3535158+havardpe@users.noreply.github.com>2023-06-19 15:19:09 +0200
committerGitHub <noreply@github.com>2023-06-19 15:19:09 +0200
commit49aade8d0e1373863368af1ecdabcb9f0ed53889 (patch)
tree6bc79b7935d18a15d7e8ef31da951a42d0ff30b2
parentba83ad3508f3a591bb433c4160d60d3e45c0955b (diff)
parent7de3194caeb6227f6d2dbaff4b456fb625129e3a (diff)
Merge pull request #27462 from vespa-engine/havardpe/rw-spin-lock-2
rw spin lock
-rw-r--r--vespalib/CMakeLists.txt2
-rw-r--r--vespalib/src/tests/nexus/CMakeLists.txt9
-rw-r--r--vespalib/src/tests/nexus/nexus_test.cpp92
-rw-r--r--vespalib/src/tests/rw_spin_lock/CMakeLists.txt9
-rw-r--r--vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp323
-rw-r--r--vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp41
-rw-r--r--vespalib/src/tests/spin_lock/spin_lock_test.cpp2
-rw-r--r--vespalib/src/vespa/vespalib/test/CMakeLists.txt1
-rw-r--r--vespalib/src/vespa/vespalib/test/nexus.cpp15
-rw-r--r--vespalib/src/vespa/vespalib/test/nexus.h84
-rw-r--r--vespalib/src/vespa/vespalib/test/thread_meets.cpp31
-rw-r--r--vespalib/src/vespa/vespalib/test/thread_meets.h63
-rw-r--r--vespalib/src/vespa/vespalib/util/rendezvous.h13
-rw-r--r--vespalib/src/vespa/vespalib/util/rw_spin_lock.h189
14 files changed, 825 insertions, 49 deletions
diff --git a/vespalib/CMakeLists.txt b/vespalib/CMakeLists.txt
index d2892ee4429..c1d7e17b457 100644
--- a/vespalib/CMakeLists.txt
+++ b/vespalib/CMakeLists.txt
@@ -126,6 +126,7 @@ vespa_define_module(
src/tests/net/tls/policy_checking_certificate_verifier
src/tests/net/tls/protocol_snooping
src/tests/net/tls/transport_options
+ src/tests/nexus
src/tests/nice
src/tests/objects/identifiable
src/tests/objects/nbostream
@@ -149,6 +150,7 @@ vespa_define_module(
src/tests/require
src/tests/runnable_pair
src/tests/rusage
+ src/tests/rw_spin_lock
src/tests/sequencedtaskexecutor
src/tests/sha1
src/tests/shared_operation_throttler
diff --git a/vespalib/src/tests/nexus/CMakeLists.txt b/vespalib/src/tests/nexus/CMakeLists.txt
new file mode 100644
index 00000000000..4b1b4bc9c25
--- /dev/null
+++ b/vespalib/src/tests/nexus/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_nexus_test_app TEST
+ SOURCES
+ nexus_test.cpp
+ DEPENDS
+ vespalib
+ GTest::GTest
+)
+vespa_add_test(NAME vespalib_nexus_test_app COMMAND vespalib_nexus_test_app)
diff --git a/vespalib/src/tests/nexus/nexus_test.cpp b/vespalib/src/tests/nexus/nexus_test.cpp
new file mode 100644
index 00000000000..09f913dccd1
--- /dev/null
+++ b/vespalib/src/tests/nexus/nexus_test.cpp
@@ -0,0 +1,92 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/test/nexus.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/require.h>
+
+using namespace vespalib::test;
+
+TEST(NexusTest, run_void_tasks) {
+ std::atomic<size_t> value = 0;
+ auto task = [&value](Nexus &) {
+ value.fetch_add(1, std::memory_order_relaxed);
+ };
+ Nexus ctx(10);
+ ctx.run(task);
+ EXPECT_EQ(value, 10);
+ ctx.run(task);
+ EXPECT_EQ(value, 20);
+}
+
+TEST(NexusTest, run_value_tasks_select_thread_0) {
+ std::atomic<size_t> value = 0;
+ auto task = [&value](Nexus &ctx) {
+ value.fetch_add(1, std::memory_order_relaxed);
+ return ctx.thread_id() + 5;
+ };
+ Nexus ctx(10);
+ EXPECT_EQ(ctx.run(task), 5);
+ EXPECT_EQ(value, 10);
+}
+
+TEST(NexusTest, run_value_tasks_merge_results) {
+ std::atomic<size_t> value = 0;
+ auto task = [&value](Nexus &) {
+ return value.fetch_add(1, std::memory_order_relaxed) + 1;
+ };
+ Nexus ctx(10);
+ EXPECT_EQ(ctx.run(task, Nexus::merge_sum()), 55);
+ EXPECT_EQ(value, 10);
+}
+
+TEST(NexusTest, run_inline_voted_loop) {
+ // Each thread wants to run a loop <thread_id> times, but the loop
+ // condition is a vote between all threads. After 3 iterations,
+ // threads 0,1,2,3 vote to exit while threads 4,5,6,7,8 vote to
+ // continue. After 4 iterations, threads 0,1,2,3,4 vote to exit
+ // while threads 5,6,7,8 vote to continue. The result is that all
+ // threads end up doing the loop exactly 4 times.
+ auto res = Nexus(9).run([](Nexus &ctx) {
+ size_t times = 0;
+ for (size_t i = 0; ctx.vote(i < ctx.thread_id()); ++i) {
+ ++times;
+ }
+ return times;
+ }, [](auto a, auto b){ EXPECT_EQ(a, b); return a; });
+ EXPECT_EQ(res, 4);
+}
+
+TEST(NexusTest, run_return_type_decay) {
+ int value = 3;
+ auto task = [&](Nexus &)->int&{ return value; };
+ Nexus ctx(3);
+ auto res = ctx.run(task);
+ EXPECT_EQ(res, 3);
+ EXPECT_EQ(std::addressof(value), std::addressof(task(ctx)));
+ using task_res_t = decltype(task(ctx));
+ using run_res_t = decltype(ctx.run(task));
+ static_assert(std::same_as<task_res_t, int&>);
+ static_assert(std::same_as<run_res_t, int>);
+}
+
+TEST(NexusTest, example_multi_threaded_unit_test) {
+ int a = 0;
+ int b = 0;
+ auto work = [&](Nexus &ctx) {
+ EXPECT_EQ(ctx.num_threads(), 2);
+ if (ctx.thread_id() == 0) {
+ a = 5;
+ ctx.barrier();
+ EXPECT_EQ(b, 7);
+ } else {
+ b = 7;
+ ctx.barrier();
+ EXPECT_EQ(a, 5);
+ }
+ };
+ Nexus(2).run(work);
+ EXPECT_EQ(a, 5);
+ EXPECT_EQ(b, 7);
+}
+
+GTEST_MAIN_RUN_ALL_TESTS()
diff --git a/vespalib/src/tests/rw_spin_lock/CMakeLists.txt b/vespalib/src/tests/rw_spin_lock/CMakeLists.txt
new file mode 100644
index 00000000000..76bcb918ce9
--- /dev/null
+++ b/vespalib/src/tests/rw_spin_lock/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(vespalib_rw_spin_lock_test_app TEST
+ SOURCES
+ rw_spin_lock_test.cpp
+ DEPENDS
+ vespalib
+ GTest::GTest
+)
+vespa_add_test(NAME vespalib_rw_spin_lock_test_app NO_VALGRIND COMMAND vespalib_rw_spin_lock_test_app)
diff --git a/vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp b/vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp
new file mode 100644
index 00000000000..50621338d8c
--- /dev/null
+++ b/vespalib/src/tests/rw_spin_lock/rw_spin_lock_test.cpp
@@ -0,0 +1,323 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/vespalib/util/spin_lock.h>
+#include <vespa/vespalib/util/rw_spin_lock.h>
+#include <vespa/vespalib/util/time.h>
+#include <vespa/vespalib/util/classname.h>
+#include <vespa/vespalib/test/thread_meets.h>
+#include <vespa/vespalib/test/nexus.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <type_traits>
+#include <ranges>
+#include <random>
+#include <array>
+
+using namespace vespalib;
+using namespace vespalib::test;
+
+bool bench = false;
+duration budget = 250ms;
+constexpr size_t LOOP_CNT = 4096;
+size_t thread_safety_work = 1'000'000;
+size_t state_loop = 1;
+
+//-----------------------------------------------------------------------------
+
+struct DummyLock {
+ constexpr DummyLock() noexcept {}
+ // BasicLockable
+ constexpr void lock() noexcept {}
+ constexpr void unlock() noexcept {}
+ // SharedLockable
+ constexpr void lock_shared() noexcept {}
+ [[nodiscard]] constexpr bool try_lock_shared() noexcept { return true; }
+ constexpr void unlock_shared() noexcept {}
+ // rw_upgrade_downgrade_lock
+ [[nodiscard]] constexpr bool try_convert_read_to_write() noexcept { return true; }
+ constexpr void convert_write_to_read() noexcept {}
+};
+
+//-----------------------------------------------------------------------------
+
+struct MyState {
+ static constexpr size_t SZ = 5;
+ std::array<std::atomic<size_t>,SZ> state = {0,0,0,0,0};
+ std::atomic<size_t> inconsistent_reads = 0;
+ std::atomic<size_t> expected_writes = 0;
+ [[nodiscard]] size_t update() {
+ std::array<size_t,SZ> tmp;
+ for (size_t i = 0; i < SZ; ++i) {
+ tmp[i] = state[i].load(std::memory_order_relaxed);
+ }
+ for (size_t n = 0; n < state_loop; ++n) {
+ for (size_t i = 0; i < SZ; ++i) {
+ state[i].store(tmp[i] + 1, std::memory_order_relaxed);
+ }
+ }
+ return 1;
+ }
+ [[nodiscard]] size_t peek() {
+ size_t my_inconsistent_reads = 0;
+ std::array<size_t,SZ> tmp;
+ for (size_t i = 0; i < SZ; ++i) {
+ tmp[i] = state[i].load(std::memory_order_relaxed);
+ }
+ for (size_t n = 0; n < state_loop; ++n) {
+ for (size_t i = 0; i < SZ; ++i) {
+ if (state[i].load(std::memory_order_relaxed) != tmp[i]) [[unlikely]] {
+ ++my_inconsistent_reads;
+ }
+ }
+ }
+ return my_inconsistent_reads;
+ }
+ void commit_inconsistent_reads(size_t n) {
+ inconsistent_reads.fetch_add(n, std::memory_order_relaxed);
+ }
+ void commit_expected_writes(size_t n) {
+ expected_writes.fetch_add(n, std::memory_order_relaxed);
+ }
+ [[nodiscard]] bool check() const {
+ if (inconsistent_reads > 0) {
+ return false;
+ }
+ for (const auto& value: state) {
+ if (value != expected_writes) {
+ return false;
+ }
+ }
+ return true;
+ }
+ void report(const char *name) const {
+ if (check()) {
+ fprintf(stderr, "%s is thread safe\n", name);
+ } else {
+ fprintf(stderr, "%s is not thread safe\n", name);
+ fprintf(stderr, " inconsistent reads: %zu\n", inconsistent_reads.load());
+ fprintf(stderr, " expected %zu, got [%zu,%zu,%zu,%zu,%zu]\n",
+ expected_writes.load(), state[0].load(), state[1].load(), state[2].load(), state[3].load(), state[4].load());
+ }
+ }
+};
+
+// random generator used to make per-thread decisions
+class Rnd {
+private:
+ std::mt19937 _engine;
+ std::uniform_int_distribution<int> _dist;
+public:
+ Rnd(uint32_t seed) : _engine(seed), _dist(0,9999) {}
+ bool operator()(int bp) { return _dist(_engine) < bp; }
+};
+
+//-----------------------------------------------------------------------------
+
+template<typename T>
+concept basic_lockable = requires(T a) {
+ { a.lock() } -> std::same_as<void>;
+ { a.unlock() } -> std::same_as<void>;
+};
+
+template<typename T>
+concept lockable = requires(T a) {
+ { a.try_lock() } -> std::same_as<bool>;
+ { a.lock() } -> std::same_as<void>;
+ { a.unlock() } -> std::same_as<void>;
+};
+
+template<typename T>
+concept shared_lockable = requires(T a) {
+ { a.try_lock_shared() } -> std::same_as<bool>;
+ { a.lock_shared() } -> std::same_as<void>;
+ { a.unlock_shared() } -> std::same_as<void>;
+};
+
+template<typename T>
+concept can_upgrade = requires(std::shared_lock<T> a, std::unique_lock<T> b) {
+ { try_upgrade(std::move(a)) } -> std::same_as<std::unique_lock<T>>;
+ { downgrade(std::move(b)) } -> std::same_as<std::shared_lock<T>>;
+};
+
+//-----------------------------------------------------------------------------
+
+template <size_t N>
+auto run_loop(auto &f) {
+ static_assert(N % 4 == 0);
+ for (size_t i = 0; i < N / 4; ++i) {
+ f(); f(); f(); f();
+ }
+}
+
+double measure_ns(auto &work) __attribute__((noinline));
+double measure_ns(auto &work) {
+ constexpr double factor = LOOP_CNT;
+ auto t0 = steady_clock::now();
+ run_loop<LOOP_CNT>(work);
+ return count_ns(steady_clock::now() - t0) / factor;
+}
+
+struct BenchmarkResult {
+ double cost_ns;
+ double range_ns;
+ BenchmarkResult()
+ : cost_ns(std::numeric_limits<double>::max()), range_ns(0.0) {}
+ BenchmarkResult(double cost_ns_in, double range_ns_in)
+ : cost_ns(cost_ns_in), range_ns(range_ns_in) {}
+};
+
+struct Meets {
+ vespalib::test::ThreadMeets::Avg avg;
+ vespalib::test::ThreadMeets::Range<double> range;
+ Meets(size_t num_threads) : avg(num_threads), range(num_threads) {}
+};
+
+BenchmarkResult benchmark_ns(auto &&work, size_t num_threads = 1) {
+ Meets meets(num_threads);
+ auto entry = [&](Nexus &ctx) {
+ Timer timer;
+ BenchmarkResult result;
+ for (bool once_more = true; ctx.vote(once_more); once_more = (timer.elapsed() < budget)) {
+ auto my_ns = measure_ns(work);
+ auto cost_ns = meets.avg(my_ns);
+ auto range_ns = meets.range(my_ns);
+ if (cost_ns < result.cost_ns) {
+ result.cost_ns = cost_ns;
+ result.range_ns = range_ns;
+ }
+ }
+ return result;
+ };
+ return Nexus(num_threads).run(entry);
+}
+
+//-----------------------------------------------------------------------------
+
+template <typename T>
+void estimate_cost() {
+ T lock;
+ auto name = getClassName(lock);
+ static_assert(basic_lockable<T>);
+ fprintf(stderr, "%s exclusive lock/unlock: %g ns\n", name.c_str(),
+ benchmark_ns([&lock]{ lock.lock(); lock.unlock(); }).cost_ns);
+ if constexpr (shared_lockable<T>) {
+ fprintf(stderr, "%s shared lock/unlock: %g ns\n", name.c_str(),
+ benchmark_ns([&lock]{ lock.lock_shared(); lock.unlock_shared(); }).cost_ns);
+ }
+ if constexpr (can_upgrade<T>) {
+ auto guard = std::shared_lock(lock);
+ fprintf(stderr, "%s upgrade/downgrade: %g ns\n", name.c_str(),
+ benchmark_ns([&lock]{
+ assert(lock.try_convert_read_to_write());
+ lock.convert_write_to_read();
+ }).cost_ns);
+ }
+}
+
+//-----------------------------------------------------------------------------
+
+template <typename T>
+void thread_safety_loop(Nexus &ctx, T &lock, MyState &state, Meets &meets, int read_bp) {
+ Rnd rnd(ctx.thread_id());
+ size_t write_cnt = 0;
+ size_t bad_reads = 0;
+ size_t loop_cnt = thread_safety_work / ctx.num_threads();
+ ctx.barrier();
+ auto t0 = steady_clock::now();
+ for (size_t i = 0; i < loop_cnt; ++i) {
+ if (rnd(read_bp)) {
+ if constexpr (shared_lockable<T>) {
+ std::shared_lock guard(lock);
+ bad_reads += state.peek();
+ } else {
+ std::lock_guard guard(lock);
+ bad_reads += state.peek();
+ }
+ } else {
+ {
+ std::lock_guard guard(lock);
+ write_cnt += state.update();
+ }
+ }
+ }
+ auto t1 = steady_clock::now();
+ ctx.barrier();
+ auto t2 = steady_clock::now();
+ auto my_ms = count_ns(t1 - t0) / 1'000'000.0;
+ auto total_ms = count_ns(t2 - t0) / 1'000'000.0;
+ auto cost_ms = meets.avg(my_ms);
+ auto range_ms = meets.range(my_ms);
+ if (ctx.thread_id() == 0) {
+ fprintf(stderr, "---> %s with %2zu threads (%5d bp r): avg: %10.2f ms, range: %10.2f ms, max: %10.2f ms\n",
+ getClassName(lock).c_str(), ctx.num_threads(), read_bp, cost_ms, range_ms, total_ms);
+ }
+ state.commit_inconsistent_reads(bad_reads);
+ state.commit_expected_writes(write_cnt);
+}
+
+//-----------------------------------------------------------------------------
+
+TEST(RWSpinLockTest, different_guards_work_with_rw_spin_lock) {
+ static_assert(basic_lockable<RWSpinLock>);
+ static_assert(lockable<RWSpinLock>);
+ static_assert(shared_lockable<RWSpinLock>);
+ static_assert(can_upgrade<RWSpinLock>);
+ RWSpinLock lock;
+ { auto guard = std::lock_guard(lock); }
+ { auto guard = std::unique_lock(lock); }
+ { auto guard = std::shared_lock(lock); }
+}
+
+TEST(RWSpinLockTest, estimate_basic_costs) {
+ Rnd rnd(123);
+ MyState state;
+ fprintf(stderr, " rnd cost: %8.2f ns\n", benchmark_ns([&]{ rnd(50); }).cost_ns);
+ fprintf(stderr, " peek cost: %8.2f ns\n", benchmark_ns([&]{ (void) state.peek(); }).cost_ns);
+ fprintf(stderr, "update cost: %8.2f ns\n", benchmark_ns([&]{ (void) state.update(); }).cost_ns);
+}
+
+template <typename T>
+void benchmark_lock() {
+ auto lock = std::make_unique<T>();
+ auto state = std::make_unique<MyState>();
+ for (size_t bp: {10000, 9999, 5000, 0}) {
+ for (size_t num_threads: {8, 4, 2, 1}) {
+ if (bench || (bp == 9999 && num_threads == 8)) {
+ Meets meets(num_threads);
+ Nexus(num_threads).run([&](Nexus &ctx) {
+ thread_safety_loop(ctx, *lock, *state, meets, bp);
+ });
+ }
+ }
+ }
+ state->report(getClassName(*lock).c_str());
+ if (!std::same_as<T,DummyLock>) {
+ EXPECT_TRUE(state->check());
+ }
+}
+
+TEST(RWSpinLockTest, benchmark_dummy_lock) { benchmark_lock<DummyLock>(); }
+TEST(RWSpinLockTest, benchmark_rw_spin_lock) { benchmark_lock<RWSpinLock>(); }
+TEST(RWSpinLockTest, benchmark_shared_mutex) { benchmark_lock<std::shared_mutex>(); }
+TEST(RWSpinLockTest, benchmark_mutex) { benchmark_lock<std::mutex>(); }
+TEST(RWSpinLockTest, benchmark_spin_lock) { benchmark_lock<SpinLock>(); }
+
+TEST(RWSpinLockTest, estimate_single_threaded_costs) {
+ estimate_cost<DummyLock>();
+ estimate_cost<SpinLock>();
+ estimate_cost<std::mutex>();
+ estimate_cost<RWSpinLock>();
+ estimate_cost<std::shared_mutex>();
+}
+
+int main(int argc, char **argv) {
+ if (argc > 1 && (argv[1] == std::string("bench"))) {
+ bench = true;
+ budget = 5s;
+ state_loop = 1024;
+ fprintf(stderr, "running in benchmarking mode\n");
+ ++argv;
+ --argc;
+ }
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp
index dfcba14ba63..910c2d017ba 100644
--- a/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp
+++ b/vespalib/src/tests/shared_string_repo/shared_string_repo_test.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/vespalib/util/shared_string_repo.h>
-#include <vespa/vespalib/util/rendezvous.h>
+#include <vespa/vespalib/test/thread_meets.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/vespalib/util/stringfmt.h>
@@ -115,41 +115,8 @@ std::unique_ptr<StringIdVector> make_weak_handles(const Handles &handles) {
//-----------------------------------------------------------------------------
-struct Avg : Rendezvous<double, double> {
- explicit Avg(size_t n) : Rendezvous<double, double>(n) {}
- void mingle() override {
- double sum = 0;
- for (size_t i = 0; i < size(); ++i) {
- sum += in(i);
- }
- double result = sum / size();
- for (size_t i = 0; i < size(); ++i) {
- out(i) = result;
- }
- }
- double operator()(double value) { return rendezvous(value); }
-};
-
-struct Vote : Rendezvous<bool, bool> {
- explicit Vote(size_t n) : Rendezvous<bool, bool>(n) {}
- void mingle() override {
- size_t true_cnt = 0;
- size_t false_cnt = 0;
- for (size_t i = 0; i < size(); ++i) {
- if (in(i)) {
- ++true_cnt;
- } else {
- ++false_cnt;
- }
- }
- bool result = (true_cnt > false_cnt);
- for (size_t i = 0; i < size(); ++i) {
- out(i) = result;
- }
- }
- [[nodiscard]] size_t num_threads() const { return size(); }
- bool operator()(bool flag) { return rendezvous(flag); }
-};
+using Avg = vespalib::test::ThreadMeets::Avg;
+using Vote = vespalib::test::ThreadMeets::Vote;
//-----------------------------------------------------------------------------
@@ -174,7 +141,7 @@ struct Fixture {
: avg(num_threads), vote(num_threads), work(make_strings(work_size)), direct_work(make_direct_strings(work_size)), start_time(steady_clock::now()) {}
~Fixture() {
if (verbose) {
- fprintf(stderr, "benchmark results for %zu threads:\n", vote.num_threads());
+ fprintf(stderr, "benchmark results for %zu threads:\n", vote.size());
for (const auto &[tag, ms_cost]: time_ms) {
fprintf(stderr, " %s: %g ms\n", tag.c_str(), ms_cost);
}
diff --git a/vespalib/src/tests/spin_lock/spin_lock_test.cpp b/vespalib/src/tests/spin_lock/spin_lock_test.cpp
index 78e35a3e8d1..84044bfabcf 100644
--- a/vespalib/src/tests/spin_lock/spin_lock_test.cpp
+++ b/vespalib/src/tests/spin_lock/spin_lock_test.cpp
@@ -77,8 +77,8 @@ template <typename T> size_t thread_safety_loop(T &lock, MyState &state, size_t
state.update();
}
}
- auto t1 = steady_clock::now();
TEST_BARRIER();
+ auto t1 = steady_clock::now();
if (thread_id == 0) {
auto t2 = steady_clock::now();
size_t total_ms = count_ms(t2 - t0);
diff --git a/vespalib/src/vespa/vespalib/test/CMakeLists.txt b/vespalib/src/vespa/vespalib/test/CMakeLists.txt
index a60eb15a4d4..02ce1ba3416 100644
--- a/vespalib/src/vespa/vespalib/test/CMakeLists.txt
+++ b/vespalib/src/vespa/vespalib/test/CMakeLists.txt
@@ -3,6 +3,7 @@ vespa_add_library(vespalib_vespalib_test OBJECT
SOURCES
make_tls_options_for_testing.cpp
memory_allocator_observer.cpp
+ nexus.cpp
peer_policy_utils.cpp
thread_meets.cpp
time_tracer.cpp
diff --git a/vespalib/src/vespa/vespalib/test/nexus.cpp b/vespalib/src/vespa/vespalib/test/nexus.cpp
new file mode 100644
index 00000000000..b5d7b194576
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/test/nexus.cpp
@@ -0,0 +1,15 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "nexus.h"
+
+namespace vespalib::test {
+
+size_t &
+Nexus::my_thread_id() {
+ thread_local size_t thread_id = invalid_thread_id;
+ return thread_id;
+}
+
+Nexus::~Nexus() = default;
+
+}
diff --git a/vespalib/src/vespa/vespalib/test/nexus.h b/vespalib/src/vespa/vespalib/test/nexus.h
new file mode 100644
index 00000000000..aeb9337b975
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/test/nexus.h
@@ -0,0 +1,84 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "thread_meets.h"
+#include <vespa/vespalib/util/thread.h>
+#include <vespa/vespalib/util/require.h>
+#include <optional>
+#include <variant>
+
+namespace vespalib::test {
+
+class Nexus;
+template <typename T>
+concept nexus_thread_entry = requires(Nexus &ctx, T &&entry) {
+ entry(ctx);
+};
+
+/**
+ * Utility intended to make it easier to write multi-threaded code for
+ * testing and benchmarking.
+ **/
+class Nexus
+{
+private:
+ vespalib::test::ThreadMeets::Vote _vote;
+ static size_t &my_thread_id();
+public:
+ constexpr static size_t invalid_thread_id = -1;
+ Nexus(size_t num_threads) noexcept : _vote(num_threads) {}
+ size_t num_threads() const noexcept { return _vote.size(); }
+ size_t thread_id() const noexcept { return my_thread_id(); }
+ bool vote(bool my_vote) { return _vote(my_vote); }
+ void barrier() { REQUIRE_EQ(_vote(true), true); }
+ struct select_thread_0 {};
+ constexpr static auto merge_sum() { return [](auto a, auto b){ return a + b; }; }
+ auto run(auto &&entry, auto &&merge) requires nexus_thread_entry<decltype(entry)> {
+ ThreadPool pool;
+ using result_t = std::decay_t<decltype(entry(std::declval<Nexus&>()))>;
+ constexpr bool is_void = std::same_as<result_t, void>;
+ using stored_t = std::conditional<is_void, std::monostate, result_t>::type;
+ std::mutex lock;
+ std::optional<stored_t> result;
+ auto handle_result = [&](stored_t thread_result) noexcept {
+ if constexpr (std::same_as<std::decay_t<decltype(merge)>,select_thread_0>) {
+ if (thread_id() == 0) {
+ result = std::move(thread_result);
+ }
+ } else {
+ std::lock_guard guard(lock);
+ if (result.has_value()) {
+ result = merge(std::move(result).value(),
+ std::move(thread_result));
+ } else {
+ result = std::move(thread_result);
+ }
+ }
+ };
+ auto thread_main = [&](size_t thread_id) noexcept {
+ size_t old_thread_id = my_thread_id();
+ my_thread_id() = thread_id;
+ if constexpr (is_void) {
+ entry(*this);
+ } else {
+ handle_result(entry(*this));
+ }
+ my_thread_id() = old_thread_id;
+ };
+ for (size_t i = 1; i < num_threads(); ++i) {
+ pool.start([i,&thread_main]() noexcept { thread_main(i); });
+ }
+ thread_main(0);
+ pool.join();
+ if constexpr (!is_void) {
+ return std::move(result).value();
+ }
+ }
+ auto run(auto &&entry) requires nexus_thread_entry<decltype(entry)> {
+ return run(std::forward<decltype(entry)>(entry), select_thread_0{});
+ }
+ ~Nexus();
+};
+
+}
diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.cpp b/vespalib/src/vespa/vespalib/test/thread_meets.cpp
index 9d23e0eab28..607179c53f9 100644
--- a/vespalib/src/vespa/vespalib/test/thread_meets.cpp
+++ b/vespalib/src/vespa/vespalib/test/thread_meets.cpp
@@ -9,4 +9,35 @@ ThreadMeets::Nop::mingle()
{
}
+void
+ThreadMeets::Avg::mingle()
+{
+ double sum = 0;
+ for (size_t i = 0; i < size(); ++i) {
+ sum += in(i);
+ }
+ double result = sum / size();
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = result;
+ }
+}
+
+void
+ThreadMeets::Vote::mingle()
+{
+ size_t true_cnt = 0;
+ size_t false_cnt = 0;
+ for (size_t i = 0; i < size(); ++i) {
+ if (in(i)) {
+ ++true_cnt;
+ } else {
+ ++false_cnt;
+ }
+ }
+ bool result = (true_cnt > false_cnt);
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = result;
+ }
+}
+
}
diff --git a/vespalib/src/vespa/vespalib/test/thread_meets.h b/vespalib/src/vespa/vespalib/test/thread_meets.h
index 62ca7779935..7ef4dcb9921 100644
--- a/vespalib/src/vespa/vespalib/test/thread_meets.h
+++ b/vespalib/src/vespa/vespalib/test/thread_meets.h
@@ -12,10 +12,67 @@ namespace vespalib::test {
struct ThreadMeets {
// can be used as a simple thread barrier
struct Nop : vespalib::Rendezvous<bool,bool> {
- Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {}
+ explicit Nop(size_t N) : vespalib::Rendezvous<bool,bool>(N) {}
void operator()() { rendezvous(false); }
void mingle() override;
};
+ // calculate the average value across threads
+ struct Avg : Rendezvous<double, double> {
+ explicit Avg(size_t n) : Rendezvous<double, double>(n) {}
+ double operator()(double value) { return rendezvous(value); }
+ void mingle() override;
+ };
+ // threads vote for true/false, majority wins (false on tie)
+ struct Vote : Rendezvous<bool, bool> {
+ explicit Vote(size_t n) : Rendezvous<bool, bool>(n) {}
+ bool operator()(bool flag) { return rendezvous(flag); }
+ void mingle() override;
+ };
+ // sum of values across all threads
+ template <typename T>
+ struct Sum : vespalib::Rendezvous<T,T> {
+ using vespalib::Rendezvous<T,T>::in;
+ using vespalib::Rendezvous<T,T>::out;
+ using vespalib::Rendezvous<T,T>::size;
+ using vespalib::Rendezvous<T,T>::rendezvous;
+ explicit Sum(size_t N) : vespalib::Rendezvous<T,T>(N) {}
+ T operator()(T value) { return rendezvous(value); }
+ void mingle() override {
+ T acc{};
+ for (size_t i = 0; i < size(); ++i) {
+ acc += in(i);
+ }
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = acc;
+ }
+ }
+ };
+ // range of values across all threads
+ template <typename T>
+ struct Range : vespalib::Rendezvous<T,T> {
+ using vespalib::Rendezvous<T,T>::in;
+ using vespalib::Rendezvous<T,T>::out;
+ using vespalib::Rendezvous<T,T>::size;
+ using vespalib::Rendezvous<T,T>::rendezvous;
+ explicit Range(size_t N) : vespalib::Rendezvous<T,T>(N) {}
+ T operator()(T value) { return rendezvous(value); }
+ void mingle() override {
+ T min = in(0);
+ T max = in(0);
+ for (size_t i = 1; i < size(); ++i) {
+ if (in(i) < min) {
+ min = in(i);
+ }
+ if (in(i) > max) {
+ max = in(i);
+ }
+ }
+ T result = (max - min);
+ for (size_t i = 0; i < size(); ++i) {
+ out(i) = result;
+ }
+ }
+ };
// swap values between 2 threads
template <typename T>
struct Swap : vespalib::Rendezvous<T,T> {
@@ -25,8 +82,8 @@ struct ThreadMeets {
Swap() : vespalib::Rendezvous<T,T>(2) {}
T operator()(T input) { return rendezvous(input); }
void mingle() override {
- out(1) = in(0);
- out(0) = in(1);
+ out(1) = std::move(in(0));
+ out(0) = std::move(in(1));
}
};
};
diff --git a/vespalib/src/vespa/vespalib/util/rendezvous.h b/vespalib/src/vespa/vespalib/util/rendezvous.h
index 2880f325d96..17a8729c54c 100644
--- a/vespalib/src/vespa/vespalib/util/rendezvous.h
+++ b/vespalib/src/vespa/vespalib/util/rendezvous.h
@@ -50,14 +50,6 @@ private:
protected:
/**
- * Obtain the number of input and output values to be handled by
- * mingle. This function is called by mingle.
- *
- * @return number of input and output values
- **/
- size_t size() const { return _size; }
-
- /**
* Obtain an input parameter. This function is called by mingle.
*
* @return reference to the appropriate input
@@ -87,6 +79,11 @@ public:
virtual ~Rendezvous();
/**
+ * @return number of participants
+ **/
+ size_t size() const { return _size; }
+
+ /**
* Called by individual threads to synchronize execution and share
* state with the mingle function.
*
diff --git a/vespalib/src/vespa/vespalib/util/rw_spin_lock.h b/vespalib/src/vespa/vespalib/util/rw_spin_lock.h
new file mode 100644
index 00000000000..f2c15dcc0eb
--- /dev/null
+++ b/vespalib/src/vespa/vespalib/util/rw_spin_lock.h
@@ -0,0 +1,189 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <mutex>
+#include <shared_mutex>
+#include <atomic>
+#include <thread>
+#include <cassert>
+#include <utility>
+
+namespace vespalib {
+
+/**
+ * A reader-writer spin lock implementation.
+ *
+ * reader: shared access for any number of readers
+ * writer: exclusive access for a single writer
+ *
+ * valid lock combinations:
+ * {}
+ * {N readers}
+ * {1 writer}
+ *
+ * Trying to obtain a write lock will lead to not granting new read
+ * locks.
+ *
+ * This lock is intended for use-cases that involves mostly reading,
+ * with a little bit of writing.
+ *
+ * This class implements the Lockable and SharedLockable named
+ * requirements from the standard library, making it directly usable
+ * with std::shared_lock (reader) and std::unique_lock (writer)
+ *
+ * There is also some special glue added for lock upgrading and
+ * downgrading.
+ *
+ * NOTE: this implementation is experimental, mostly intended for
+ * benchmarking and trying to identify use-cases that work with
+ * rw locks. Upgrade locks that do not block readers might be
+ * implementet in the future.
+ **/
+class RWSpinLock {
+private:
+ // [31: num readers][1: pending writer]
+ // a reader gets the lock by:
+ // increasing the number of readers while the pending writer bit is not set.
+ // a writer gets the lock by:
+ // changing the pending writer bit from 0 to 1 and then
+ // waiting for the number of readers to become 0
+ // an upgrade is successful when:
+ // a reader is able to obtain the pending writer bit
+ std::atomic<uint32_t> _state;
+
+ // Convenience function used to check if the pending writer bit is
+ // set in the given value.
+ bool has_pending_writer(uint32_t value) noexcept {
+ return (value & 1);
+ }
+
+ // Wait for all readers to release their locks.
+ void wait_for_zero_readers(uint32_t &value) {
+ while (value != 1) {
+ std::this_thread::yield();
+ value = _state.load(std::memory_order_acquire);
+ }
+ }
+
+public:
+ RWSpinLock() noexcept : _state(0) {
+ static_assert(std::atomic<uint32_t>::is_always_lock_free);
+ }
+
+ // implementation of Lockable named requirement - vvv
+
+ void lock() noexcept {
+ uint32_t expected = 0;
+ uint32_t desired = 1;
+ while (!_state.compare_exchange_weak(expected, desired,
+ std::memory_order_acquire,
+ std::memory_order_relaxed))
+ {
+ while (has_pending_writer(expected)) {
+ std::this_thread::yield();
+ expected = _state.load(std::memory_order_relaxed);
+ }
+ desired = expected + 1;
+ }
+ wait_for_zero_readers(desired);
+ }
+
+ [[nodiscard]] bool try_lock() noexcept {
+ uint32_t expected = 0;
+ return _state.compare_exchange_strong(expected, 1,
+ std::memory_order_acquire,
+ std::memory_order_relaxed);
+ }
+
+ void unlock() noexcept {
+ _state.store(0, std::memory_order_release);
+ }
+
+ // implementation of Lockable named requirement - ^^^
+
+ // implementation of SharedLockable named requirement - vvv
+
+ void lock_shared() noexcept {
+ uint32_t expected = 0;
+ uint32_t desired = 2;
+ while (!_state.compare_exchange_weak(expected, desired,
+ std::memory_order_acquire,
+ std::memory_order_relaxed))
+ {
+ while (has_pending_writer(expected)) {
+ std::this_thread::yield();
+ expected = _state.load(std::memory_order_relaxed);
+ }
+ desired = expected + 2;
+ }
+ }
+
+ [[nodiscard]] bool try_lock_shared() noexcept {
+ uint32_t expected = 0;
+ uint32_t desired = 2;
+ while (!_state.compare_exchange_weak(expected, desired,
+ std::memory_order_acquire,
+ std::memory_order_relaxed))
+ {
+ if (has_pending_writer(expected)) {
+ return false;
+ }
+ desired = expected + 2;
+ }
+ return true;
+ }
+
+ void unlock_shared() noexcept {
+ _state.fetch_sub(2, std::memory_order_release);
+ }
+
+ // implementation of SharedLockable named requirement - ^^^
+
+ // try to upgrade a read (shared) lock to a write (unique) lock
+ bool try_convert_read_to_write() noexcept {
+ uint32_t expected = 2;
+ uint32_t desired = 1;
+ while (!_state.compare_exchange_weak(expected, desired,
+ std::memory_order_acquire,
+ std::memory_order_relaxed))
+ {
+ if (has_pending_writer(expected)) {
+ return false;
+ }
+ desired = expected - 1;
+ }
+ wait_for_zero_readers(desired);
+ return true;
+ }
+
+ // convert a write (unique) lock to a read (shared) lock
+ void convert_write_to_read() noexcept {
+ _state.store(2, std::memory_order_release);
+ }
+};
+
+template<typename T>
+concept rw_upgrade_downgrade_lock = requires(T a, T b) {
+ { a.try_convert_read_to_write() } -> std::same_as<bool>;
+ { b.convert_write_to_read() } -> std::same_as<void>;
+};
+
+template <rw_upgrade_downgrade_lock T>
+[[nodiscard]] std::unique_lock<T> try_upgrade(std::shared_lock<T> &&guard) noexcept {
+ assert(guard.owns_lock());
+ if (guard.mutex()->try_convert_read_to_write()) {
+ return {*guard.release(), std::adopt_lock};
+ } else {
+ return {};
+ }
+}
+
+template <rw_upgrade_downgrade_lock T>
+[[nodiscard]] std::shared_lock<T> downgrade(std::unique_lock<T> &&guard) noexcept {
+ assert(guard.owns_lock());
+ guard.mutex()->convert_write_to_read();
+ return {*guard.release(), std::adopt_lock};
+}
+
+}