summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2021-04-27 13:28:47 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2021-04-29 12:22:22 +0000
commit42c2917d2093673a8ca4658d6d800d9b2d8ee4ee (patch)
tree7598f346c492d8f1b2382c89c1c9c45e440ec1f5 /storage
parent2b9209357f649d18e91bd3d9cf382fcc20591201 (diff)
Add DistributorStripe thread pool with thread park/unpark support
To enable safe and well-defined access to underlying stripe data structures from the main distributor thread, the pool has functionality for "parking" and "unparking" all stripe threads: * Parking makes all threads go into a blocked holding pattern where it is guaranteed that they may not race with any other threads. * Unparking releases all threads from their holding pattern, allowing them to continue their event processing loop. Also adds a custom run loop for distributor threads that largely emulates the waiting semantics found in the current framework ticking thread pool run loop. But unlike the framework pool, there is no global mutex that must be acquired by all threads in the pool. All stripe event handling uses per-thread mutexes and condition variables. Global state is only accessed when thread parking is requested, which happens very rarely.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/distributor_stripe_pool_test.cpp91
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp109
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h80
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp104
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h81
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp113
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h79
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h24
14 files changed, 701 insertions, 2 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 810ffb550bf..d808acc0d3a 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -13,6 +13,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
distributor_bucket_space_test.cpp
distributor_host_info_reporter_test.cpp
distributor_message_sender_stub.cpp
+ distributor_stripe_pool_test.cpp
distributortest.cpp
distributortestutil.cpp
externaloperationhandlertest.cpp
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
new file mode 100644
index 00000000000..089400bc18c
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
@@ -0,0 +1,91 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
+#include <vespa/storage/distributor/tickable_stripe.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct DistributorStripePoolThreadingTest : Test {
+ static constexpr vespalib::duration min_test_duration = 50ms;
+
+ DistributorStripePool _pool;
+ vespalib::steady_time _start_time;
+ std::atomic<bool> _is_parked;
+
+ DistributorStripePoolThreadingTest()
+ : _pool(),
+ _start_time(std::chrono::steady_clock::now()),
+ _is_parked(false)
+ {
+ // Set an absurdly high tick wait duration to catch any regressions where
+ // thread wakeups aren't triggering as expected.
+ _pool.set_tick_wait_duration(600s);
+ // Ensure we always trigger a wait if tick() returns false.
+ _pool.set_ticks_before_wait(0);
+ }
+
+ bool min_test_time_reached() const noexcept {
+ return ((std::chrono::steady_clock::now() - _start_time) > min_test_duration);
+ }
+
+ void loop_park_unpark_cycle_until_test_time_expired() {
+ constexpr size_t min_cycles = 100;
+ size_t cycle = 0;
+ while ((cycle < min_cycles) || !min_test_time_reached()) {
+ _pool.park_all_threads();
+ _is_parked = true;
+ std::this_thread::sleep_for(50us);
+ _is_parked = false;
+ _pool.unpark_all_threads();
+ ++cycle;
+ }
+ }
+};
+
+// Optimistic invariant checker that cannot prove correctness, but will hopefully
+// make tests scream if something is obviously incorrect.
+struct ParkingInvariantCheckingMockStripe : TickableStripe {
+ std::atomic<bool>& _is_parked;
+ bool _to_return;
+
+ explicit ParkingInvariantCheckingMockStripe(std::atomic<bool>& is_parked)
+ : _is_parked(is_parked),
+ _to_return(true)
+ {}
+
+ bool tick() override {
+ std::this_thread::sleep_for(50us);
+ assert(!_is_parked.load());
+ // Alternate between returning whether or not work was done to trigger
+ // both waiting and non-waiting edges. Note that this depends on the
+ // ticks_before_wait value being 0.
+ _to_return = !_to_return;
+ return _to_return;
+ }
+};
+
+TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_single_stripe) {
+ ParkingInvariantCheckingMockStripe stripe(_is_parked);
+
+ _pool.start({{&stripe}});
+ loop_park_unpark_cycle_until_test_time_expired();
+ _pool.stop_and_join();
+}
+
+TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_multiple_stripes) {
+ ParkingInvariantCheckingMockStripe s1(_is_parked);
+ ParkingInvariantCheckingMockStripe s2(_is_parked);
+ ParkingInvariantCheckingMockStripe s3(_is_parked);
+ ParkingInvariantCheckingMockStripe s4(_is_parked);
+
+ _pool.start({{&s1, &s2, &s3, &s4}});
+ loop_park_unpark_cycle_until_test_time_expired();
+ _pool.stop_and_join();
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 57d6a23c79f..cb64c5c1681 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -18,6 +18,8 @@ vespa_add_library(storage_distributor
distributor_status.cpp
distributor_stripe.cpp
distributor_stripe_component.cpp
+ distributor_stripe_pool.cpp
+ distributor_stripe_thread.cpp
distributormessagesender.cpp
distributormetricsset.cpp
externaloperationhandler.cpp
@@ -26,6 +28,7 @@ vespa_add_library(storage_distributor
idealstatemetricsset.cpp
legacy_single_stripe_accessor.cpp
messagetracker.cpp
+ multi_threaded_stripe_access_guard.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
operation_sequencer.cpp
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 8dd414e8def..503ed05f071 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -391,7 +391,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
}
void
-Distributor::enableNextConfig()
+Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_changed()?
{
// Only lazily trigger a config propagation and internal update if something has _actually changed_.
if (_component.internal_config_generation() != _current_internal_config_generation) {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 87e938efd71..03f3a181a48 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -780,6 +780,12 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
return _tickResult;
}
+bool DistributorStripe::tick() {
+ assert(!_use_legacy_mode);
+ auto wait_info = doNonCriticalTick(framework::ThreadIndex(0));
+ return !wait_info.waitWanted(); // If we don't want to wait, we presumably did some useful stuff.
+}
+
bool DistributorStripe::should_inhibit_current_maintenance_scan_tick() const noexcept {
return (workWasDone() && (_inhibited_maintenance_tick_count
< getConfig().max_consecutively_inhibited_maintenance_ticks()));
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index bc058305c09..1885fa79341 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -12,6 +12,7 @@
#include "statusreporterdelegate.h"
#include "stripe_access_guard.h"
#include "stripe_bucket_db_updater.h"
+#include "tickable_stripe.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
@@ -50,7 +51,8 @@ class DistributorStripe final
public framework::StatusReporter,
public MinReplicaProvider,
public BucketSpacesStatsProvider,
- public NonTrackingMessageSender
+ public NonTrackingMessageSender,
+ public TickableStripe
{
public:
DistributorStripe(DistributorComponentRegister&,
@@ -190,13 +192,17 @@ public:
return _db_memory_sample_interval;
}
+ bool tick() override;
+
private:
+ // TODO reduce number of friends. DistributorStripe too popular for its own good.
friend struct DistributorTest;
friend class BucketDBUpdaterTest;
friend class DistributorTestUtil;
friend class MetricUpdateHook;
friend class Distributor;
friend class LegacySingleStripeAccessGuard;
+ friend class MultiThreadedStripeAccessGuard;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
new file mode 100644
index 00000000000..6c819aab821
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -0,0 +1,109 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "distributor_stripe_pool.h"
+#include "distributor_stripe_thread.h"
+#include <vespa/vespalib/util/size_literals.h>
+#include <cassert>
+
+namespace storage::distributor {
+
+DistributorStripePool::DistributorStripePool()
+ : _thread_pool(512_Ki),
+ _stripes(),
+ _threads(),
+ _mutex(),
+ _parker_cond(),
+ _parked_threads(0),
+ _bootstrap_tick_wait_duration(1ms),
+ _bootstrap_ticks_before_wait(10),
+ _stopped(false)
+{}
+
+DistributorStripePool::~DistributorStripePool() {
+ if (!_stopped) {
+ stop_and_join();
+ }
+}
+
+void DistributorStripePool::park_all_threads() noexcept {
+ assert(!_stripes.empty());
+ // Thread pool is not dynamic and signal_wants_park() is thread safe.
+ for (auto& s : _stripes) {
+ s->signal_wants_park();
+ }
+ std::unique_lock lock(_mutex);
+ _parker_cond.wait(lock, [this]{ return (_parked_threads == _threads.size()); });
+}
+
+void DistributorStripePool::unpark_all_threads() noexcept {
+ // Thread pool is not dynamic and unpark_thread(0 is thread safe.
+ for (auto& s : _stripes) {
+ s->unpark_thread();
+ }
+ // We have a full unpark barrier here as a pragmatic way to avoid potential ABA issues
+ // caused by back-to-back park->unpark->park calls causing issues with interleaving
+ // up-counts and down-counts for thread parking/unparking.
+ // It's fully possibly to avoid this, but requires a somewhat more finicky solution for
+ // cross-thread coordination.
+ std::unique_lock lock(_mutex);
+ _parker_cond.wait(lock, [this]{ return (_parked_threads == 0); });
+}
+
+void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept {
+ std::unique_lock lock(_mutex);
+ assert(_parked_threads < _threads.size());
+ ++_parked_threads;
+ if (_parked_threads == _threads.size()) {
+ _parker_cond.notify_all();
+ }
+ lock.unlock();
+ thread.wait_until_unparked();
+ lock.lock();
+ --_parked_threads;
+ if (_parked_threads == 0) {
+ _parker_cond.notify_all();
+ }
+};
+
+void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
+ assert(!stripes.empty());
+ assert(_stripes.empty() && _threads.empty());
+ _stripes.reserve(stripes.size());
+ _threads.reserve(stripes.size());
+
+ for (auto* s : stripes) {
+ auto new_stripe = std::make_unique<DistributorStripeThread>(*s, *this);
+ new_stripe->set_tick_wait_duration(_bootstrap_tick_wait_duration);
+ new_stripe->set_ticks_before_wait(_bootstrap_ticks_before_wait);
+ _stripes.emplace_back(std::move(new_stripe));
+ }
+ for (auto& s : _stripes) {
+ _threads.emplace_back(_thread_pool.NewThread(s.get()));
+ }
+}
+
+void DistributorStripePool::stop_and_join() {
+ for (auto& s : _stripes) {
+ s->signal_should_stop();
+ }
+ for (auto* t : _threads) {
+ t->Join();
+ }
+ _stopped = true;
+}
+
+void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
+ _bootstrap_tick_wait_duration = new_tick_wait_duration;
+ // Stripe set may be empty if start() hasn't been called yet.
+ for (auto& s : _stripes) {
+ s->set_tick_wait_duration(new_tick_wait_duration);
+ }
+}
+void DistributorStripePool::set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept {
+ _bootstrap_ticks_before_wait = new_ticks_before_wait;
+ // Stripe set may be empty if start() hasn't been called yet.
+ for (auto& s : _stripes) {
+ s->set_ticks_before_wait(new_ticks_before_wait);
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
new file mode 100644
index 00000000000..9149482cd5d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -0,0 +1,80 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace storage::distributor {
+
+class DistributorStripeThread;
+class TickableStripe;
+
+/**
+ * Management and coordination of a pool of distributor stripe threads.
+ *
+ * Aside from handling the threads themselves, the pool crucially offers a well-defined
+ * thread synchronization/coordination API meant for ensuring all stripe threads are in
+ * a well defined state before accessing them:
+ *
+ * - park_all_threads() returns once ALL threads are in a "parked" state where they
+ * may not race with any operations performed on them by the caller. In essence, this
+ * acts as if a (very large) mutex is held by the caller that prevents the stripe
+ * from doing anything of its own volition. Must be followed by:
+ * - unpark_all_threads() returns once ALL threads have been confirmed released from
+ * a previously parked state. Must be called after park_all_threads().
+ *
+ * Neither park_all_threads() or unpark_all_threads() may be called prior to calling start().
+ *
+ * It's possible to set stripe thread tick-specific options (wait duration, ticks before
+ * wait) both before and after start() is called. The options will be propagated to any
+ * running stripe threads in a thread-safe, lock-free manner.
+ */
+class DistributorStripePool {
+ using StripeVector = std::vector<std::unique_ptr<DistributorStripeThread>>;
+ using NativeThreadVector = std::vector<FastOS_ThreadInterface*>;
+
+ FastOS_ThreadPool _thread_pool;
+ StripeVector _stripes;
+ NativeThreadVector _threads;
+ std::mutex _mutex;
+ std::condition_variable _parker_cond;
+ size_t _parked_threads; // Must be protected by _park_mutex
+ vespalib::duration _bootstrap_tick_wait_duration;
+ uint32_t _bootstrap_ticks_before_wait;
+ bool _stopped;
+
+ friend class DistributorStripeThread;
+public:
+ DistributorStripePool();
+ ~DistributorStripePool();
+
+ // Set up the stripe pool with a 1-1 relationship between the provided
+ // stripes and running threads. Can only be called once per pool.
+ //
+ // Precondition: stripes.size() > 0
+ void start(const std::vector<TickableStripe*>& stripes);
+ void stop_and_join();
+
+ void park_all_threads() noexcept;
+ void unpark_all_threads() noexcept;
+
+ [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept {
+ return *_stripes[idx];
+ }
+ [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept {
+ return *_stripes[idx];
+ }
+ [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
+
+ // Applies to all threads. May be called both before and after start(). Thread safe.
+ void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
+ void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;
+private:
+ void park_thread_until_released(DistributorStripeThread& thread) noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
new file mode 100644
index 00000000000..5edbc45b8d6
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
@@ -0,0 +1,104 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "distributor_stripe_thread.h"
+#include "distributor_stripe.h"
+#include "distributor_stripe_pool.h"
+#include "tickable_stripe.h"
+#include <cassert>
+
+namespace storage::distributor {
+
+DistributorStripeThread::DistributorStripeThread(TickableStripe& stripe,
+ DistributorStripePool& stripe_pool)
+ : _stripe(stripe),
+ _stripe_pool(stripe_pool),
+ _tick_wait_duration(1ms),
+ _mutex(),
+ _event_cond(),
+ _park_cond(),
+ _ticks_before_wait(10),
+ _should_park(false),
+ _should_stop(false),
+ _waiting_for_event(false)
+{}
+
+DistributorStripeThread::~DistributorStripeThread() = default;
+
+void DistributorStripeThread::Run(FastOS_ThreadInterface*, void*) {
+ uint32_t tick_waits_inhibited = 0;
+ while (!should_stop_thread_relaxed()) {
+ while (should_park_relaxed()) {
+ _stripe_pool.park_thread_until_released(*this);
+ }
+ // TODO consider enum to only trigger "ticks before wait"-behavior when maintenance was done
+ const bool did_work = _stripe.tick();
+ if (did_work) {
+ tick_waits_inhibited = 0;
+ } else if (tick_waits_inhibited >= ticks_before_wait_relaxed()) {
+ wait_until_event_notified_or_timed_out();
+ tick_waits_inhibited = 0;
+ } else {
+ ++tick_waits_inhibited;
+ }
+ }
+}
+
+void DistributorStripeThread::signal_wants_park() noexcept {
+ std::lock_guard lock(_mutex);
+ assert(!should_park_relaxed());
+ _should_park.store(true, std::memory_order_relaxed);
+ if (_waiting_for_event) {
+ _event_cond.notify_one(); // TODO after unlock?
+ }
+}
+
+void DistributorStripeThread::unpark_thread() noexcept {
+ std::lock_guard lock(_mutex);
+ assert(should_park_relaxed());
+ _should_park.store(false, std::memory_order_relaxed);
+ _park_cond.notify_one(); // TODO after unlock?
+}
+
+void DistributorStripeThread::wait_until_event_notified_or_timed_out() noexcept {
+ std::unique_lock lock(_mutex);
+ if (should_stop_thread_relaxed() || should_park_relaxed()) {
+ return;
+ }
+ _waiting_for_event = true;
+ _event_cond.wait_for(lock, tick_wait_duration_relaxed());
+ _waiting_for_event = false;
+}
+
+void DistributorStripeThread::wait_until_unparked() noexcept {
+ std::unique_lock lock(_mutex);
+ assert(should_park_relaxed());
+ // _should_park is always written within _mutex, relaxed load is safe.
+ _park_cond.wait(lock, [this]{ return !should_park_relaxed(); });
+}
+
+void DistributorStripeThread::notify_event_has_triggered() noexcept {
+ _event_cond.notify_one();
+}
+
+void DistributorStripeThread::signal_should_stop() noexcept {
+ std::unique_lock lock(_mutex);
+ assert(!should_park_relaxed());
+ _should_stop.store(true, std::memory_order_relaxed);
+ if (_waiting_for_event) {
+ _event_cond.notify_one();
+ }
+ // TODO if we ever need it, handle pending thread park. For now we assume that
+ // the caller never attempts to concurrently park and stop threads.
+}
+
+void DistributorStripeThread::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
+ static_assert(AtomicDuration::is_always_lock_free);
+ // No memory ordering required for a "lazy" single value setting such as the tick duration
+ _tick_wait_duration.store(new_tick_wait_duration, std::memory_order_relaxed);
+}
+
+void DistributorStripeThread::set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept {
+ _ticks_before_wait.store(new_ticks_before_wait, std::memory_order_relaxed);
+}
+
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
new file mode 100644
index 00000000000..b02d733895e
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -0,0 +1,81 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace storage::distributor {
+
+class DistributorStripe;
+class DistributorStripePool;
+class TickableStripe;
+
+/**
+ * A DistributorStripeThread provides threading resources for a single distributor stripe
+ * and the means of synchronizing access towards it through a DistributorStripePool.
+ *
+ * A DistributorStripeThread instance is bidirectionally bound to a particular pool and
+ * should therefore always be created by the pool itself (never standalone).
+ */
+class DistributorStripeThread : private FastOS_Runnable {
+ using AtomicDuration = std::atomic<vespalib::duration>;
+
+ TickableStripe& _stripe;
+ DistributorStripePool& _stripe_pool;
+ AtomicDuration _tick_wait_duration;
+ std::mutex _mutex;
+ std::condition_variable _event_cond;
+ std::condition_variable _park_cond;
+ std::atomic<uint32_t> _ticks_before_wait;
+ std::atomic<bool> _should_park;
+ std::atomic<bool> _should_stop;
+ bool _waiting_for_event;
+
+ friend class DistributorStripePool;
+public:
+ DistributorStripeThread(TickableStripe& stripe,
+ DistributorStripePool& stripe_pool);
+ ~DistributorStripeThread();
+
+ void Run(FastOS_ThreadInterface*, void*) override;
+
+ // Wakes up stripe thread if it's currently waiting for an external event to be triggered,
+ // such as the arrival of a new RPC message. If thread is parked this call will have no
+ // effect.
+ void notify_event_has_triggered() noexcept;
+
+ void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
+ void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;
+
+ TickableStripe* operator->() noexcept { return &_stripe; }
+ const TickableStripe* operator->() const noexcept { return &_stripe; }
+private:
+ [[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
+ return _should_stop.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] bool should_park_relaxed() const noexcept {
+ return _should_park.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] vespalib::duration tick_wait_duration_relaxed() const noexcept {
+ return _tick_wait_duration.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] uint32_t ticks_before_wait_relaxed() const noexcept {
+ return _ticks_before_wait.load(std::memory_order_relaxed);
+ }
+
+ void signal_wants_park() noexcept;
+ void unpark_thread() noexcept;
+ void wait_until_event_notified_or_timed_out() noexcept;
+ void wait_until_unparked() noexcept;
+
+ void signal_should_stop() noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
new file mode 100644
index 00000000000..6bc9c03158a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -0,0 +1,113 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "multi_threaded_stripe_access_guard.h"
+#include "distributor_stripe.h"
+#include "distributor_stripe_pool.h"
+#include "distributor_stripe_thread.h"
+
+namespace storage::distributor {
+
+MultiThreadedStripeAccessGuard::MultiThreadedStripeAccessGuard(
+ MultiThreadedStripeAccessor& accessor,
+ DistributorStripePool& stripe_pool)
+ : _accessor(accessor),
+ _stripe_pool(stripe_pool)
+{
+ assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE many more yes yes
+ _stripe_pool.park_all_threads();
+}
+
+MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() {
+ _stripe_pool.unpark_all_threads();
+ _accessor.mark_guard_released();
+}
+
+void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
+ // TODO STRIPE multiple stripes
+ first_stripe().update_total_distributor_config(std::move(config));
+}
+
+void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+ // TODO STRIPE multiple stripes
+ first_stripe().update_distribution_config(new_configs);
+}
+
+void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+}
+
+void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() {
+ // TODO STRIPE multiple stripes
+ first_stripe().getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+}
+
+void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().enableClusterStateBundle(new_state);
+}
+
+void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() {
+ // TODO STRIPE multiple stripes
+ first_stripe().notifyDistributionChangeEnabled();
+}
+
+PotentialDataLossReport
+MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ // TODO STRIPE multiple stripes
+ return first_stripe().bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+}
+
+void
+MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
+}
+
+void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().update_read_snapshot_before_db_pruning();
+}
+
+void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+}
+
+void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+}
+
+void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+}
+
+DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
+ return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0));
+}
+
+std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
+ // For sanity checking of invariant of only one guard being allowed at any given time.
+ assert(!_guard_held);
+ _guard_held = true;
+ return std::make_unique<MultiThreadedStripeAccessGuard>(*this, _stripe_pool);
+}
+
+void MultiThreadedStripeAccessor::mark_guard_released() {
+ assert(_guard_held);
+ _guard_held = false;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
new file mode 100644
index 00000000000..376eccd1c4a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -0,0 +1,79 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "stripe_access_guard.h"
+
+namespace storage::distributor {
+
+class MultiThreadedStripeAccessor;
+class DistributorStripePool;
+class DistributorStripe;
+
+/**
+ * StripeAccessGuard implementation which provides exclusive access to a set of stripes
+ * by ensuring that all stripe threads are safely parked upon guard construction. This
+ * means that as long as a guard exists, access to stripes is guaranteed to not cause
+ * data races.
+ *
+ * Threads are automatically un-parked upon guard destruction.
+ *
+ * At most one guard instance may exist at any given time.
+ */
+class MultiThreadedStripeAccessGuard : public StripeAccessGuard {
+ MultiThreadedStripeAccessor& _accessor;
+ DistributorStripePool& _stripe_pool;
+public:
+ MultiThreadedStripeAccessGuard(MultiThreadedStripeAccessor& accessor,
+ DistributorStripePool& stripe_pool);
+ ~MultiThreadedStripeAccessGuard() override;
+
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
+
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
+ void clear_pending_cluster_state_bundle() override;
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override;
+ void notify_distribution_change_enabled() override;
+
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) override;
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) override;
+
+ void update_read_snapshot_before_db_pruning() override;
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
+ void clear_read_only_bucket_repo_databases() override;
+private:
+ // TODO STRIPE remove once multi threaded stripe support is implemented
+ DistributorStripe& first_stripe() noexcept;
+};
+
+/**
+ * Impl of StripeAccessor which creates MultiThreadedStripeAccessGuards that cover all threads
+ * in the provided stripe pool.
+ */
+class MultiThreadedStripeAccessor : public StripeAccessor {
+ DistributorStripePool& _stripe_pool;
+ bool _guard_held;
+
+ friend class MultiThreadedStripeAccessGuard;
+public:
+ MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool)
+ : _stripe_pool(stripe_pool),
+ _guard_held(false)
+ {}
+ ~MultiThreadedStripeAccessor() override = default;
+
+ std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() override;
+private:
+ void mark_guard_released();
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 67b31343f6f..42fe0d5c29a 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -141,6 +141,8 @@ private:
friend class DistributorTestUtil;
// TODO refactor and rewire to avoid needing this direct meddling
friend class LegacySingleStripeAccessGuard;
+ friend class MultiThreadedStripeAccessGuard;
+
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
// components agree on the currently active cluster state bundle.
// Transitively invokes Distributor::enableClusterStateBundle
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
new file mode 100644
index 00000000000..323db627d31
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -0,0 +1,24 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace storage::distributor {
+
+/**
+ * A tickable stripe is the minimal binding glue between the stripe's worker thread and
+ * the actual implementation. Primarily allows for easier testing without having to
+ * fake an entire actual DistributorStripe.
+ */
+class TickableStripe {
+public:
+ virtual ~TickableStripe() = default;
+
+ // Perform a single operation tick of the stripe logic.
+ // If function returns true, the caller should not perform any waiting before calling
+ // tick() again. This generally means that the stripe is processing client operations
+ // and wants to continue doing so as quickly as possible.
+ // Only used for multi-threaded striped setups.
+ // TODO return an enum indicating type of last processed event? E.g. external, maintenance, none, ...
+ virtual bool tick() = 0;
+};
+
+}