summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-04-29 22:53:31 +0200
committerGitHub <noreply@github.com>2021-04-29 22:53:31 +0200
commitf41d092b40930d35f52956f0e535be3fb641a722 (patch)
treef7f6f7817fc949b3f4c36e4e9c4e5af17ac09019 /storage
parente3ba7773401be957ea99f65d5108608e61a34763 (diff)
parent3d34041283d66938c3164ae0761754a3790a455f (diff)
Merge pull request #17661 from vespa-engine/vekterli/distributor-stripe-pool-and-thread-coordination
Add DistributorStripe thread pool with thread park/unpark support
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/distributor_stripe_pool_test.cpp92
-rw-r--r--storage/src/vespa/storage/distributor/CMakeLists.txt3
-rw-r--r--storage/src/vespa/storage/distributor/distributor.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.cpp6
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe.h8
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp109
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h80
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp106
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_thread.h81
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp113
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h79
-rw-r--r--storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h2
-rw-r--r--storage/src/vespa/storage/distributor/tickable_stripe.h24
14 files changed, 704 insertions, 2 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 810ffb550bf..d808acc0d3a 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -13,6 +13,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
distributor_bucket_space_test.cpp
distributor_host_info_reporter_test.cpp
distributor_message_sender_stub.cpp
+ distributor_stripe_pool_test.cpp
distributortest.cpp
distributortestutil.cpp
externaloperationhandlertest.cpp
diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
new file mode 100644
index 00000000000..408027eb894
--- /dev/null
+++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp
@@ -0,0 +1,92 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/storage/distributor/distributor_stripe_pool.h>
+#include <vespa/storage/distributor/tickable_stripe.h>
+#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <thread>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+struct DistributorStripePoolThreadingTest : Test {
+ static constexpr vespalib::duration min_test_duration = 50ms;
+
+ DistributorStripePool _pool;
+ vespalib::steady_time _start_time;
+ std::atomic<bool> _is_parked;
+
+ DistributorStripePoolThreadingTest()
+ : _pool(),
+ _start_time(std::chrono::steady_clock::now()),
+ _is_parked(false)
+ {
+ // Set an absurdly high tick wait duration to catch any regressions where
+ // thread wakeups aren't triggering as expected.
+ _pool.set_tick_wait_duration(600s);
+ // Ensure we always trigger a wait if tick() returns false.
+ _pool.set_ticks_before_wait(0);
+ }
+
+ bool min_test_time_reached() const noexcept {
+ return ((std::chrono::steady_clock::now() - _start_time) > min_test_duration);
+ }
+
+ void loop_park_unpark_cycle_until_test_time_expired() {
+ constexpr size_t min_cycles = 100;
+ size_t cycle = 0;
+ // TODO enforce minimum number of actual calls to tick() per thread?
+ while ((cycle < min_cycles) || !min_test_time_reached()) {
+ _pool.park_all_threads();
+ _is_parked = true;
+ std::this_thread::sleep_for(50us);
+ _is_parked = false;
+ _pool.unpark_all_threads();
+ ++cycle;
+ }
+ }
+};
+
+// Optimistic invariant checker that cannot prove correctness, but will hopefully
+// make tests scream if something is obviously incorrect.
+struct ParkingInvariantCheckingMockStripe : TickableStripe {
+ std::atomic<bool>& _is_parked;
+ bool _to_return;
+
+ explicit ParkingInvariantCheckingMockStripe(std::atomic<bool>& is_parked)
+ : _is_parked(is_parked),
+ _to_return(true)
+ {}
+
+ bool tick() override {
+ std::this_thread::sleep_for(50us);
+ assert(!_is_parked.load());
+ // Alternate between returning whether or not work was done to trigger
+ // both waiting and non-waiting edges. Note that this depends on the
+ // ticks_before_wait value being 0.
+ _to_return = !_to_return;
+ return _to_return;
+ }
+};
+
+TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_single_stripe) {
+ ParkingInvariantCheckingMockStripe stripe(_is_parked);
+
+ _pool.start({{&stripe}});
+ loop_park_unpark_cycle_until_test_time_expired();
+ _pool.stop_and_join();
+}
+
+TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_multiple_stripes) {
+ ParkingInvariantCheckingMockStripe s1(_is_parked);
+ ParkingInvariantCheckingMockStripe s2(_is_parked);
+ ParkingInvariantCheckingMockStripe s3(_is_parked);
+ ParkingInvariantCheckingMockStripe s4(_is_parked);
+
+ _pool.start({{&s1, &s2, &s3, &s4}});
+ loop_park_unpark_cycle_until_test_time_expired();
+ _pool.stop_and_join();
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt
index 4cf65ddb63a..c5c67d3203b 100644
--- a/storage/src/vespa/storage/distributor/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/CMakeLists.txt
@@ -19,6 +19,8 @@ vespa_add_library(storage_distributor
distributor_status.cpp
distributor_stripe.cpp
distributor_stripe_component.cpp
+ distributor_stripe_pool.cpp
+ distributor_stripe_thread.cpp
distributormessagesender.cpp
distributormetricsset.cpp
externaloperationhandler.cpp
@@ -27,6 +29,7 @@ vespa_add_library(storage_distributor
idealstatemetricsset.cpp
legacy_single_stripe_accessor.cpp
messagetracker.cpp
+ multi_threaded_stripe_access_guard.cpp
nodeinfo.cpp
operation_routing_snapshot.cpp
operation_sequencer.cpp
diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp
index 6f093f538ab..bdf3f30fdc9 100644
--- a/storage/src/vespa/storage/distributor/distributor.cpp
+++ b/storage/src/vespa/storage/distributor/distributor.cpp
@@ -413,7 +413,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx)
}
void
-Distributor::enableNextConfig()
+Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_changed()?
{
// Only lazily trigger a config propagation and internal update if something has _actually changed_.
if (_component.internal_config_generation() != _current_internal_config_generation) {
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
index 87e938efd71..03f3a181a48 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp
@@ -780,6 +780,12 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex)
return _tickResult;
}
+bool DistributorStripe::tick() {
+ assert(!_use_legacy_mode);
+ auto wait_info = doNonCriticalTick(framework::ThreadIndex(0));
+ return !wait_info.waitWanted(); // If we don't want to wait, we presumably did some useful stuff.
+}
+
bool DistributorStripe::should_inhibit_current_maintenance_scan_tick() const noexcept {
return (workWasDone() && (_inhibited_maintenance_tick_count
< getConfig().max_consecutively_inhibited_maintenance_ticks()));
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h
index bc058305c09..1885fa79341 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe.h
@@ -12,6 +12,7 @@
#include "statusreporterdelegate.h"
#include "stripe_access_guard.h"
#include "stripe_bucket_db_updater.h"
+#include "tickable_stripe.h"
#include <vespa/config/config.h>
#include <vespa/storage/common/doneinitializehandler.h>
#include <vespa/storage/common/messagesender.h>
@@ -50,7 +51,8 @@ class DistributorStripe final
public framework::StatusReporter,
public MinReplicaProvider,
public BucketSpacesStatsProvider,
- public NonTrackingMessageSender
+ public NonTrackingMessageSender,
+ public TickableStripe
{
public:
DistributorStripe(DistributorComponentRegister&,
@@ -190,13 +192,17 @@ public:
return _db_memory_sample_interval;
}
+ bool tick() override;
+
private:
+ // TODO reduce number of friends. DistributorStripe too popular for its own good.
friend struct DistributorTest;
friend class BucketDBUpdaterTest;
friend class DistributorTestUtil;
friend class MetricUpdateHook;
friend class Distributor;
friend class LegacySingleStripeAccessGuard;
+ friend class MultiThreadedStripeAccessGuard;
bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg);
bool isMaintenanceReply(const api::StorageReply& reply) const;
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
new file mode 100644
index 00000000000..715d95e70fb
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -0,0 +1,109 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "distributor_stripe_pool.h"
+#include "distributor_stripe_thread.h"
+#include <vespa/vespalib/util/size_literals.h>
+#include <cassert>
+
+namespace storage::distributor {
+
+DistributorStripePool::DistributorStripePool()
+ : _thread_pool(512_Ki),
+ _stripes(),
+ _threads(),
+ _mutex(),
+ _parker_cond(),
+ _parked_threads(0),
+ _bootstrap_tick_wait_duration(1ms),
+ _bootstrap_ticks_before_wait(10),
+ _stopped(false)
+{}
+
+DistributorStripePool::~DistributorStripePool() {
+ if (!_stopped) {
+ stop_and_join();
+ }
+}
+
+void DistributorStripePool::park_all_threads() noexcept {
+ assert(!_stripes.empty());
+ // Thread pool is not dynamic and signal_wants_park() is thread safe.
+ for (auto& s : _stripes) {
+ s->signal_wants_park();
+ }
+ std::unique_lock lock(_mutex);
+ _parker_cond.wait(lock, [this]{ return (_parked_threads == _threads.size()); });
+}
+
+void DistributorStripePool::unpark_all_threads() noexcept {
+ // Thread pool is not dynamic and unpark_thread() is thread safe.
+ for (auto& s : _stripes) {
+ s->unpark_thread();
+ }
+ // We have a full unpark barrier here as a pragmatic way to avoid potential ABA issues
+ // caused by back-to-back park->unpark->park calls causing issues with interleaving
+ // up-counts and down-counts for thread parking/unparking.
+ // It's fully possibly to avoid this, but requires a somewhat more finicky solution for
+ // cross-thread coordination.
+ std::unique_lock lock(_mutex);
+ _parker_cond.wait(lock, [this]{ return (_parked_threads == 0); });
+}
+
+void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept {
+ std::unique_lock lock(_mutex);
+ assert(_parked_threads < _threads.size());
+ ++_parked_threads;
+ if (_parked_threads == _threads.size()) {
+ _parker_cond.notify_all();
+ }
+ lock.unlock();
+ thread.wait_until_unparked();
+ lock.lock();
+ --_parked_threads;
+ if (_parked_threads == 0) {
+ _parker_cond.notify_all();
+ }
+};
+
+void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
+ assert(!stripes.empty());
+ assert(_stripes.empty() && _threads.empty());
+ _stripes.reserve(stripes.size());
+ _threads.reserve(stripes.size());
+
+ for (auto* s : stripes) {
+ auto new_stripe = std::make_unique<DistributorStripeThread>(*s, *this);
+ new_stripe->set_tick_wait_duration(_bootstrap_tick_wait_duration);
+ new_stripe->set_ticks_before_wait(_bootstrap_ticks_before_wait);
+ _stripes.emplace_back(std::move(new_stripe));
+ }
+ for (auto& s : _stripes) {
+ _threads.emplace_back(_thread_pool.NewThread(s.get()));
+ }
+}
+
+void DistributorStripePool::stop_and_join() {
+ for (auto& s : _stripes) {
+ s->signal_should_stop();
+ }
+ for (auto* t : _threads) {
+ t->Join();
+ }
+ _stopped = true;
+}
+
+void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
+ _bootstrap_tick_wait_duration = new_tick_wait_duration;
+ // Stripe set may be empty if start() hasn't been called yet.
+ for (auto& s : _stripes) {
+ s->set_tick_wait_duration(new_tick_wait_duration);
+ }
+}
+void DistributorStripePool::set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept {
+ _bootstrap_ticks_before_wait = new_ticks_before_wait;
+ // Stripe set may be empty if start() hasn't been called yet.
+ for (auto& s : _stripes) {
+ s->set_ticks_before_wait(new_ticks_before_wait);
+ }
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
new file mode 100644
index 00000000000..9149482cd5d
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -0,0 +1,80 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace storage::distributor {
+
+class DistributorStripeThread;
+class TickableStripe;
+
+/**
+ * Management and coordination of a pool of distributor stripe threads.
+ *
+ * Aside from handling the threads themselves, the pool crucially offers a well-defined
+ * thread synchronization/coordination API meant for ensuring all stripe threads are in
+ * a well defined state before accessing them:
+ *
+ * - park_all_threads() returns once ALL threads are in a "parked" state where they
+ * may not race with any operations performed on them by the caller. In essence, this
+ * acts as if a (very large) mutex is held by the caller that prevents the stripe
+ * from doing anything of its own volition. Must be followed by:
+ * - unpark_all_threads() returns once ALL threads have been confirmed released from
+ * a previously parked state. Must be called after park_all_threads().
+ *
+ * Neither park_all_threads() or unpark_all_threads() may be called prior to calling start().
+ *
+ * It's possible to set stripe thread tick-specific options (wait duration, ticks before
+ * wait) both before and after start() is called. The options will be propagated to any
+ * running stripe threads in a thread-safe, lock-free manner.
+ */
+class DistributorStripePool {
+ using StripeVector = std::vector<std::unique_ptr<DistributorStripeThread>>;
+ using NativeThreadVector = std::vector<FastOS_ThreadInterface*>;
+
+ FastOS_ThreadPool _thread_pool;
+ StripeVector _stripes;
+ NativeThreadVector _threads;
+ std::mutex _mutex;
+ std::condition_variable _parker_cond;
+ size_t _parked_threads; // Must be protected by _park_mutex
+ vespalib::duration _bootstrap_tick_wait_duration;
+ uint32_t _bootstrap_ticks_before_wait;
+ bool _stopped;
+
+ friend class DistributorStripeThread;
+public:
+ DistributorStripePool();
+ ~DistributorStripePool();
+
+ // Set up the stripe pool with a 1-1 relationship between the provided
+ // stripes and running threads. Can only be called once per pool.
+ //
+ // Precondition: stripes.size() > 0
+ void start(const std::vector<TickableStripe*>& stripes);
+ void stop_and_join();
+
+ void park_all_threads() noexcept;
+ void unpark_all_threads() noexcept;
+
+ [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept {
+ return *_stripes[idx];
+ }
+ [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept {
+ return *_stripes[idx];
+ }
+ [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
+
+ // Applies to all threads. May be called both before and after start(). Thread safe.
+ void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
+ void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;
+private:
+ void park_thread_until_released(DistributorStripeThread& thread) noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
new file mode 100644
index 00000000000..7359fbe5cf8
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp
@@ -0,0 +1,106 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "distributor_stripe_thread.h"
+#include "distributor_stripe.h"
+#include "distributor_stripe_pool.h"
+#include "tickable_stripe.h"
+#include <cassert>
+
+namespace storage::distributor {
+
+DistributorStripeThread::DistributorStripeThread(TickableStripe& stripe,
+ DistributorStripePool& stripe_pool)
+ : _stripe(stripe),
+ _stripe_pool(stripe_pool),
+ _tick_wait_duration(1ms),
+ _mutex(),
+ _event_cond(),
+ _park_cond(),
+ _ticks_before_wait(10),
+ _should_park(false),
+ _should_stop(false),
+ _waiting_for_event(false)
+{}
+
+DistributorStripeThread::~DistributorStripeThread() = default;
+
+void DistributorStripeThread::Run(FastOS_ThreadInterface*, void*) {
+ uint32_t tick_waits_inhibited = 0;
+ while (!should_stop_thread_relaxed()) {
+ while (should_park_relaxed()) {
+ _stripe_pool.park_thread_until_released(*this);
+ }
+ // TODO consider enum to only trigger "ticks before wait"-behavior when maintenance was done
+ const bool did_work = _stripe.tick();
+ if (did_work) {
+ tick_waits_inhibited = 0;
+ } else if (tick_waits_inhibited >= ticks_before_wait_relaxed()) {
+ wait_until_event_notified_or_timed_out();
+ tick_waits_inhibited = 0;
+ } else {
+ ++tick_waits_inhibited;
+ }
+ }
+}
+
+void DistributorStripeThread::signal_wants_park() noexcept {
+ std::lock_guard lock(_mutex);
+ assert(!should_park_relaxed());
+ _should_park.store(true, std::memory_order_relaxed);
+ if (_waiting_for_event) {
+ _event_cond.notify_one(); // TODO after unlock?
+ }
+}
+
+void DistributorStripeThread::unpark_thread() noexcept {
+ std::lock_guard lock(_mutex);
+ assert(should_park_relaxed());
+ _should_park.store(false, std::memory_order_relaxed);
+ _park_cond.notify_one(); // TODO after unlock?
+}
+
+void DistributorStripeThread::wait_until_event_notified_or_timed_out() noexcept {
+ std::unique_lock lock(_mutex);
+ if (should_stop_thread_relaxed() || should_park_relaxed()) {
+ return;
+ }
+ _waiting_for_event = true;
+ _event_cond.wait_for(lock, tick_wait_duration_relaxed());
+ _waiting_for_event = false;
+}
+
+void DistributorStripeThread::wait_until_unparked() noexcept {
+ std::unique_lock lock(_mutex);
+ assert(should_park_relaxed());
+ // _should_park is always written within _mutex, relaxed load is safe.
+ _park_cond.wait(lock, [this]{ return !should_park_relaxed(); });
+}
+
+void DistributorStripeThread::notify_event_has_triggered() noexcept {
+ // TODO mutex protect and add flag for "should tick immediately next time"
+ // TODO only notify if _waiting_for_event == true
+ _event_cond.notify_one();
+}
+
+void DistributorStripeThread::signal_should_stop() noexcept {
+ std::unique_lock lock(_mutex);
+ assert(!should_park_relaxed());
+ _should_stop.store(true, std::memory_order_relaxed);
+ if (_waiting_for_event) {
+ _event_cond.notify_one();
+ }
+ // TODO if we ever need it, handle pending thread park. For now we assume that
+ // the caller never attempts to concurrently park and stop threads.
+}
+
+void DistributorStripeThread::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept {
+ static_assert(AtomicDuration::is_always_lock_free);
+ // No memory ordering required for a "lazy" single value setting such as the tick duration
+ _tick_wait_duration.store(new_tick_wait_duration, std::memory_order_relaxed);
+}
+
+void DistributorStripeThread::set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept {
+ _ticks_before_wait.store(new_ticks_before_wait, std::memory_order_relaxed);
+}
+
+
+}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
new file mode 100644
index 00000000000..b02d733895e
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h
@@ -0,0 +1,81 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/fastos/thread.h>
+#include <vespa/vespalib/util/time.h>
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
+#include <vector>
+
+namespace storage::distributor {
+
+class DistributorStripe;
+class DistributorStripePool;
+class TickableStripe;
+
+/**
+ * A DistributorStripeThread provides threading resources for a single distributor stripe
+ * and the means of synchronizing access towards it through a DistributorStripePool.
+ *
+ * A DistributorStripeThread instance is bidirectionally bound to a particular pool and
+ * should therefore always be created by the pool itself (never standalone).
+ */
+class DistributorStripeThread : private FastOS_Runnable {
+ using AtomicDuration = std::atomic<vespalib::duration>;
+
+ TickableStripe& _stripe;
+ DistributorStripePool& _stripe_pool;
+ AtomicDuration _tick_wait_duration;
+ std::mutex _mutex;
+ std::condition_variable _event_cond;
+ std::condition_variable _park_cond;
+ std::atomic<uint32_t> _ticks_before_wait;
+ std::atomic<bool> _should_park;
+ std::atomic<bool> _should_stop;
+ bool _waiting_for_event;
+
+ friend class DistributorStripePool;
+public:
+ DistributorStripeThread(TickableStripe& stripe,
+ DistributorStripePool& stripe_pool);
+ ~DistributorStripeThread();
+
+ void Run(FastOS_ThreadInterface*, void*) override;
+
+ // Wakes up stripe thread if it's currently waiting for an external event to be triggered,
+ // such as the arrival of a new RPC message. If thread is parked this call will have no
+ // effect.
+ void notify_event_has_triggered() noexcept;
+
+ void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept;
+ void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept;
+
+ TickableStripe* operator->() noexcept { return &_stripe; }
+ const TickableStripe* operator->() const noexcept { return &_stripe; }
+private:
+ [[nodiscard]] bool should_stop_thread_relaxed() const noexcept {
+ return _should_stop.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] bool should_park_relaxed() const noexcept {
+ return _should_park.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] vespalib::duration tick_wait_duration_relaxed() const noexcept {
+ return _tick_wait_duration.load(std::memory_order_relaxed);
+ }
+
+ [[nodiscard]] uint32_t ticks_before_wait_relaxed() const noexcept {
+ return _ticks_before_wait.load(std::memory_order_relaxed);
+ }
+
+ void signal_wants_park() noexcept;
+ void unpark_thread() noexcept;
+ void wait_until_event_notified_or_timed_out() noexcept;
+ void wait_until_unparked() noexcept;
+
+ void signal_should_stop() noexcept;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
new file mode 100644
index 00000000000..6bc9c03158a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -0,0 +1,113 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "multi_threaded_stripe_access_guard.h"
+#include "distributor_stripe.h"
+#include "distributor_stripe_pool.h"
+#include "distributor_stripe_thread.h"
+
+namespace storage::distributor {
+
+MultiThreadedStripeAccessGuard::MultiThreadedStripeAccessGuard(
+ MultiThreadedStripeAccessor& accessor,
+ DistributorStripePool& stripe_pool)
+ : _accessor(accessor),
+ _stripe_pool(stripe_pool)
+{
+ assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE many more yes yes
+ _stripe_pool.park_all_threads();
+}
+
+MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() {
+ _stripe_pool.unpark_all_threads();
+ _accessor.mark_guard_released();
+}
+
+void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) {
+ // TODO STRIPE multiple stripes
+ first_stripe().update_total_distributor_config(std::move(config));
+}
+
+void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) {
+ // TODO STRIPE multiple stripes
+ first_stripe().update_distribution_config(new_configs);
+}
+
+void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state);
+}
+
+void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() {
+ // TODO STRIPE multiple stripes
+ first_stripe().getBucketSpaceRepo().clear_pending_cluster_state_bundle();
+}
+
+void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().enableClusterStateBundle(new_state);
+}
+
+void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() {
+ // TODO STRIPE multiple stripes
+ first_stripe().notifyDistributionChangeEnabled();
+}
+
+PotentialDataLossReport
+MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change)
+{
+ // TODO STRIPE multiple stripes
+ return first_stripe().bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change);
+}
+
+void
+MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries)
+{
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, entries);
+}
+
+void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().update_read_snapshot_before_db_pruning();
+}
+
+void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().update_read_snapshot_after_db_pruning(new_state);
+}
+
+void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().update_read_snapshot_after_activation(activated_state);
+}
+
+void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() {
+ // TODO STRIPE multiple stripes
+ first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases();
+}
+
+DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept {
+ return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0));
+}
+
+std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() {
+ // For sanity checking of invariant of only one guard being allowed at any given time.
+ assert(!_guard_held);
+ _guard_held = true;
+ return std::make_unique<MultiThreadedStripeAccessGuard>(*this, _stripe_pool);
+}
+
+void MultiThreadedStripeAccessor::mark_guard_released() {
+ assert(_guard_held);
+ _guard_held = false;
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
new file mode 100644
index 00000000000..376eccd1c4a
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h
@@ -0,0 +1,79 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "stripe_access_guard.h"
+
+namespace storage::distributor {
+
+class MultiThreadedStripeAccessor;
+class DistributorStripePool;
+class DistributorStripe;
+
+/**
+ * StripeAccessGuard implementation which provides exclusive access to a set of stripes
+ * by ensuring that all stripe threads are safely parked upon guard construction. This
+ * means that as long as a guard exists, access to stripes is guaranteed to not cause
+ * data races.
+ *
+ * Threads are automatically un-parked upon guard destruction.
+ *
+ * At most one guard instance may exist at any given time.
+ */
+class MultiThreadedStripeAccessGuard : public StripeAccessGuard {
+ MultiThreadedStripeAccessor& _accessor;
+ DistributorStripePool& _stripe_pool;
+public:
+ MultiThreadedStripeAccessGuard(MultiThreadedStripeAccessor& accessor,
+ DistributorStripePool& stripe_pool);
+ ~MultiThreadedStripeAccessGuard() override;
+
+ void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override;
+
+ void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override;
+ void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override;
+ void clear_pending_cluster_state_bundle() override;
+ void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override;
+ void notify_distribution_change_enabled() override;
+
+ PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space,
+ const lib::ClusterState& new_state,
+ bool is_distribution_change) override;
+ void merge_entries_into_db(document::BucketSpace bucket_space,
+ api::Timestamp gathered_at_timestamp,
+ const lib::Distribution& distribution,
+ const lib::ClusterState& new_state,
+ const char* storage_up_states,
+ const std::unordered_set<uint16_t>& outdated_nodes,
+ const std::vector<dbtransition::Entry>& entries) override;
+
+ void update_read_snapshot_before_db_pruning() override;
+ void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override;
+ void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override;
+ void clear_read_only_bucket_repo_databases() override;
+private:
+ // TODO STRIPE remove once multi threaded stripe support is implemented
+ DistributorStripe& first_stripe() noexcept;
+};
+
+/**
+ * Impl of StripeAccessor which creates MultiThreadedStripeAccessGuards that cover all threads
+ * in the provided stripe pool.
+ */
+class MultiThreadedStripeAccessor : public StripeAccessor {
+ DistributorStripePool& _stripe_pool;
+ bool _guard_held;
+
+ friend class MultiThreadedStripeAccessGuard;
+public:
+ MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool)
+ : _stripe_pool(stripe_pool),
+ _guard_held(false)
+ {}
+ ~MultiThreadedStripeAccessor() override = default;
+
+ std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() override;
+private:
+ void mark_guard_released();
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
index 67b31343f6f..42fe0d5c29a 100644
--- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
+++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h
@@ -141,6 +141,8 @@ private:
friend class DistributorTestUtil;
// TODO refactor and rewire to avoid needing this direct meddling
friend class LegacySingleStripeAccessGuard;
+ friend class MultiThreadedStripeAccessGuard;
+
// Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor
// components agree on the currently active cluster state bundle.
// Transitively invokes Distributor::enableClusterStateBundle
diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h
new file mode 100644
index 00000000000..323db627d31
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/tickable_stripe.h
@@ -0,0 +1,24 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+namespace storage::distributor {
+
+/**
+ * A tickable stripe is the minimal binding glue between the stripe's worker thread and
+ * the actual implementation. Primarily allows for easier testing without having to
+ * fake an entire actual DistributorStripe.
+ */
+class TickableStripe {
+public:
+ virtual ~TickableStripe() = default;
+
+ // Perform a single operation tick of the stripe logic.
+ // If function returns true, the caller should not perform any waiting before calling
+ // tick() again. This generally means that the stripe is processing client operations
+ // and wants to continue doing so as quickly as possible.
+ // Only used for multi-threaded striped setups.
+ // TODO return an enum indicating type of last processed event? E.g. external, maintenance, none, ...
+ virtual bool tick() = 0;
+};
+
+}