summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-04-27 13:28:47 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-04-29 12:22:22 +0000
commit42c2917d2093673a8ca4658d6d800d9b2d8ee4ee (patch)
tree7598f346c492d8f1b2382c89c1c9c45e440ec1f5 /storage/src/tests/distributor/distributor_stripe_pool_test.cpp
parent2b9209357f649d18e91bd3d9cf382fcc20591201 (diff)
Add DistributorStripe thread pool with thread park/unpark support
To enable safe and well-defined access to underlying stripe data structures from the main distributor thread, the pool has functionality for "parking" and "unparking" all stripe threads: * Parking makes all threads go into a blocked holding pattern where it is guaranteed that they may not race with any other threads. * Unparking releases all threads from their holding pattern, allowing them to continue their event processing loop. Also adds a custom run loop for distributor threads that largely emulates the waiting semantics found in the current framework ticking thread pool run loop. But unlike the framework pool, there is no global mutex that must be acquired by all threads in the pool. All stripe event handling uses per-thread mutexes and condition variables. Global state is only accessed when thread parking is requested, which happens very rarely.
Diffstat (limited to 'storage/src/tests/distributor/distributor_stripe_pool_test.cpp')
-rw-r--r--storage/src/tests/distributor/distributor_stripe_pool_test.cpp91
1 files changed, 91 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
new file mode 100644
index 00000000000..089400bc18c
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
@@ -0,0 +1,91 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
+#include <vespa/storage/distributor/tickable_stripe.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct DistributorStripePoolThreadingTest : Test {
+ static constexpr vespalib::duration min_test_duration = 50ms;
+
+ DistributorStripePool _pool;
+ vespalib::steady_time _start_time;
+ std::atomic<bool> _is_parked;
+
+ DistributorStripePoolThreadingTest()
+ : _pool(),
+ _start_time(std::chrono::steady_clock::now()),
+ _is_parked(false)
+ {
+ // Set an absurdly high tick wait duration to catch any regressions where
+ // thread wakeups aren't triggering as expected.
+ _pool.set_tick_wait_duration(600s);
+ // Ensure we always trigger a wait if tick() returns false.
+ _pool.set_ticks_before_wait(0);
+ }
+
+ bool min_test_time_reached() const noexcept {
+ return ((std::chrono::steady_clock::now() - _start_time) > min_test_duration);
+ }
+
+ void loop_park_unpark_cycle_until_test_time_expired() {
+ constexpr size_t min_cycles = 100;
+ size_t cycle = 0;
+ while ((cycle < min_cycles) || !min_test_time_reached()) {
+ _pool.park_all_threads();
+ _is_parked = true;
+ std::this_thread::sleep_for(50us);
+ _is_parked = false;
+ _pool.unpark_all_threads();
+ ++cycle;
+ }
+ }
+};
+
+// Optimistic invariant checker that cannot prove correctness, but will hopefully
+// make tests scream if something is obviously incorrect.
+struct ParkingInvariantCheckingMockStripe : TickableStripe {
+ std::atomic<bool>& _is_parked;
+ bool _to_return;
+
+ explicit ParkingInvariantCheckingMockStripe(std::atomic<bool>& is_parked)
+ : _is_parked(is_parked),
+ _to_return(true)
+ {}
+
+ bool tick() override {
+ std::this_thread::sleep_for(50us);
+ assert(!_is_parked.load());
+ // Alternate between returning whether or not work was done to trigger
+ // both waiting and non-waiting edges. Note that this depends on the
+ // ticks_before_wait value being 0.
+ _to_return = !_to_return;
+ return _to_return;
+ }
+};
+
+TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_single_stripe) {
+ ParkingInvariantCheckingMockStripe stripe(_is_parked);
+
+ _pool.start({{&stripe}});
+ loop_park_unpark_cycle_until_test_time_expired();
+ _pool.stop_and_join();
+}
+
+TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_multiple_stripes) {
+ ParkingInvariantCheckingMockStripe s1(_is_parked);
+ ParkingInvariantCheckingMockStripe s2(_is_parked);
+ ParkingInvariantCheckingMockStripe s3(_is_parked);
+ ParkingInvariantCheckingMockStripe s4(_is_parked);
+
+ _pool.start({{&s1, &s2, &s3, &s4}});
+ loop_park_unpark_cycle_until_test_time_expired();
+ _pool.stop_and_join();
+}
+
+}