diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-04-29 22:53:31 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-29 22:53:31 +0200 |
commit | f41d092b40930d35f52956f0e535be3fb641a722 (patch) | |
tree | f7f6f7817fc949b3f4c36e4e9c4e5af17ac09019 /storage | |
parent | e3ba7773401be957ea99f65d5108608e61a34763 (diff) | |
parent | 3d34041283d66938c3164ae0761754a3790a455f (diff) |
Merge pull request #17661 from vespa-engine/vekterli/distributor-stripe-pool-and-thread-coordination
Add DistributorStripe thread pool with thread park/unpark support
Diffstat (limited to 'storage')
14 files changed, 704 insertions, 2 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 810ffb550bf..d808acc0d3a 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -13,6 +13,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST distributor_bucket_space_test.cpp distributor_host_info_reporter_test.cpp distributor_message_sender_stub.cpp + distributor_stripe_pool_test.cpp distributortest.cpp distributortestutil.cpp externaloperationhandlertest.cpp diff --git a/storage/src/tests/distributor/distributor_stripe_pool_test.cpp b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp new file mode 100644 index 00000000000..408027eb894 --- /dev/null +++ b/storage/src/tests/distributor/distributor_stripe_pool_test.cpp @@ -0,0 +1,92 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/storage/distributor/distributor_stripe_pool.h> +#include <vespa/storage/distributor/tickable_stripe.h> +#include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/util/time.h> +#include <atomic> +#include <thread> + +using namespace ::testing; + +namespace storage::distributor { + +struct DistributorStripePoolThreadingTest : Test { + static constexpr vespalib::duration min_test_duration = 50ms; + + DistributorStripePool _pool; + vespalib::steady_time _start_time; + std::atomic<bool> _is_parked; + + DistributorStripePoolThreadingTest() + : _pool(), + _start_time(std::chrono::steady_clock::now()), + _is_parked(false) + { + // Set an absurdly high tick wait duration to catch any regressions where + // thread wakeups aren't triggering as expected. + _pool.set_tick_wait_duration(600s); + // Ensure we always trigger a wait if tick() returns false. + _pool.set_ticks_before_wait(0); + } + + bool min_test_time_reached() const noexcept { + return ((std::chrono::steady_clock::now() - _start_time) > min_test_duration); + } + + void loop_park_unpark_cycle_until_test_time_expired() { + constexpr size_t min_cycles = 100; + size_t cycle = 0; + // TODO enforce minimum number of actual calls to tick() per thread? + while ((cycle < min_cycles) || !min_test_time_reached()) { + _pool.park_all_threads(); + _is_parked = true; + std::this_thread::sleep_for(50us); + _is_parked = false; + _pool.unpark_all_threads(); + ++cycle; + } + } +}; + +// Optimistic invariant checker that cannot prove correctness, but will hopefully +// make tests scream if something is obviously incorrect. +struct ParkingInvariantCheckingMockStripe : TickableStripe { + std::atomic<bool>& _is_parked; + bool _to_return; + + explicit ParkingInvariantCheckingMockStripe(std::atomic<bool>& is_parked) + : _is_parked(is_parked), + _to_return(true) + {} + + bool tick() override { + std::this_thread::sleep_for(50us); + assert(!_is_parked.load()); + // Alternate between returning whether or not work was done to trigger + // both waiting and non-waiting edges. Note that this depends on the + // ticks_before_wait value being 0. + _to_return = !_to_return; + return _to_return; + } +}; + +TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_single_stripe) { + ParkingInvariantCheckingMockStripe stripe(_is_parked); + + _pool.start({{&stripe}}); + loop_park_unpark_cycle_until_test_time_expired(); + _pool.stop_and_join(); +} + +TEST_F(DistributorStripePoolThreadingTest, can_park_and_unpark_multiple_stripes) { + ParkingInvariantCheckingMockStripe s1(_is_parked); + ParkingInvariantCheckingMockStripe s2(_is_parked); + ParkingInvariantCheckingMockStripe s3(_is_parked); + ParkingInvariantCheckingMockStripe s4(_is_parked); + + _pool.start({{&s1, &s2, &s3, &s4}}); + loop_park_unpark_cycle_until_test_time_expired(); + _pool.stop_and_join(); +} + +} diff --git a/storage/src/vespa/storage/distributor/CMakeLists.txt b/storage/src/vespa/storage/distributor/CMakeLists.txt index 4cf65ddb63a..c5c67d3203b 100644 --- a/storage/src/vespa/storage/distributor/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/CMakeLists.txt @@ -19,6 +19,8 @@ vespa_add_library(storage_distributor distributor_status.cpp distributor_stripe.cpp distributor_stripe_component.cpp + distributor_stripe_pool.cpp + distributor_stripe_thread.cpp distributormessagesender.cpp distributormetricsset.cpp externaloperationhandler.cpp @@ -27,6 +29,7 @@ vespa_add_library(storage_distributor idealstatemetricsset.cpp legacy_single_stripe_accessor.cpp messagetracker.cpp + multi_threaded_stripe_access_guard.cpp nodeinfo.cpp operation_routing_snapshot.cpp operation_sequencer.cpp diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 6f093f538ab..bdf3f30fdc9 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -413,7 +413,7 @@ Distributor::doNonCriticalTick(framework::ThreadIndex idx) } void -Distributor::enableNextConfig() +Distributor::enableNextConfig() // TODO STRIPE rename to enable_next_config_if_changed()? { // Only lazily trigger a config propagation and internal update if something has _actually changed_. if (_component.internal_config_generation() != _current_internal_config_generation) { diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 87e938efd71..03f3a181a48 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -780,6 +780,12 @@ DistributorStripe::doNonCriticalTick(framework::ThreadIndex) return _tickResult; } +bool DistributorStripe::tick() { + assert(!_use_legacy_mode); + auto wait_info = doNonCriticalTick(framework::ThreadIndex(0)); + return !wait_info.waitWanted(); // If we don't want to wait, we presumably did some useful stuff. +} + bool DistributorStripe::should_inhibit_current_maintenance_scan_tick() const noexcept { return (workWasDone() && (_inhibited_maintenance_tick_count < getConfig().max_consecutively_inhibited_maintenance_ticks())); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.h b/storage/src/vespa/storage/distributor/distributor_stripe.h index bc058305c09..1885fa79341 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe.h @@ -12,6 +12,7 @@ #include "statusreporterdelegate.h" #include "stripe_access_guard.h" #include "stripe_bucket_db_updater.h" +#include "tickable_stripe.h" #include <vespa/config/config.h> #include <vespa/storage/common/doneinitializehandler.h> #include <vespa/storage/common/messagesender.h> @@ -50,7 +51,8 @@ class DistributorStripe final public framework::StatusReporter, public MinReplicaProvider, public BucketSpacesStatsProvider, - public NonTrackingMessageSender + public NonTrackingMessageSender, + public TickableStripe { public: DistributorStripe(DistributorComponentRegister&, @@ -190,13 +192,17 @@ public: return _db_memory_sample_interval; } + bool tick() override; + private: + // TODO reduce number of friends. DistributorStripe too popular for its own good. friend struct DistributorTest; friend class BucketDBUpdaterTest; friend class DistributorTestUtil; friend class MetricUpdateHook; friend class Distributor; friend class LegacySingleStripeAccessGuard; + friend class MultiThreadedStripeAccessGuard; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg); bool isMaintenanceReply(const api::StorageReply& reply) const; diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp new file mode 100644 index 00000000000..715d95e70fb --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp @@ -0,0 +1,109 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "distributor_stripe_pool.h" +#include "distributor_stripe_thread.h" +#include <vespa/vespalib/util/size_literals.h> +#include <cassert> + +namespace storage::distributor { + +DistributorStripePool::DistributorStripePool() + : _thread_pool(512_Ki), + _stripes(), + _threads(), + _mutex(), + _parker_cond(), + _parked_threads(0), + _bootstrap_tick_wait_duration(1ms), + _bootstrap_ticks_before_wait(10), + _stopped(false) +{} + +DistributorStripePool::~DistributorStripePool() { + if (!_stopped) { + stop_and_join(); + } +} + +void DistributorStripePool::park_all_threads() noexcept { + assert(!_stripes.empty()); + // Thread pool is not dynamic and signal_wants_park() is thread safe. + for (auto& s : _stripes) { + s->signal_wants_park(); + } + std::unique_lock lock(_mutex); + _parker_cond.wait(lock, [this]{ return (_parked_threads == _threads.size()); }); +} + +void DistributorStripePool::unpark_all_threads() noexcept { + // Thread pool is not dynamic and unpark_thread() is thread safe. + for (auto& s : _stripes) { + s->unpark_thread(); + } + // We have a full unpark barrier here as a pragmatic way to avoid potential ABA issues + // caused by back-to-back park->unpark->park calls causing issues with interleaving + // up-counts and down-counts for thread parking/unparking. + // It's fully possibly to avoid this, but requires a somewhat more finicky solution for + // cross-thread coordination. + std::unique_lock lock(_mutex); + _parker_cond.wait(lock, [this]{ return (_parked_threads == 0); }); +} + +void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept { + std::unique_lock lock(_mutex); + assert(_parked_threads < _threads.size()); + ++_parked_threads; + if (_parked_threads == _threads.size()) { + _parker_cond.notify_all(); + } + lock.unlock(); + thread.wait_until_unparked(); + lock.lock(); + --_parked_threads; + if (_parked_threads == 0) { + _parker_cond.notify_all(); + } +}; + +void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) { + assert(!stripes.empty()); + assert(_stripes.empty() && _threads.empty()); + _stripes.reserve(stripes.size()); + _threads.reserve(stripes.size()); + + for (auto* s : stripes) { + auto new_stripe = std::make_unique<DistributorStripeThread>(*s, *this); + new_stripe->set_tick_wait_duration(_bootstrap_tick_wait_duration); + new_stripe->set_ticks_before_wait(_bootstrap_ticks_before_wait); + _stripes.emplace_back(std::move(new_stripe)); + } + for (auto& s : _stripes) { + _threads.emplace_back(_thread_pool.NewThread(s.get())); + } +} + +void DistributorStripePool::stop_and_join() { + for (auto& s : _stripes) { + s->signal_should_stop(); + } + for (auto* t : _threads) { + t->Join(); + } + _stopped = true; +} + +void DistributorStripePool::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept { + _bootstrap_tick_wait_duration = new_tick_wait_duration; + // Stripe set may be empty if start() hasn't been called yet. + for (auto& s : _stripes) { + s->set_tick_wait_duration(new_tick_wait_duration); + } +} +void DistributorStripePool::set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept { + _bootstrap_ticks_before_wait = new_ticks_before_wait; + // Stripe set may be empty if start() hasn't been called yet. + for (auto& s : _stripes) { + s->set_ticks_before_wait(new_ticks_before_wait); + } +} + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h new file mode 100644 index 00000000000..9149482cd5d --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -0,0 +1,80 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fastos/thread.h> +#include <vespa/vespalib/util/time.h> +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <vector> + +namespace storage::distributor { + +class DistributorStripeThread; +class TickableStripe; + +/** + * Management and coordination of a pool of distributor stripe threads. + * + * Aside from handling the threads themselves, the pool crucially offers a well-defined + * thread synchronization/coordination API meant for ensuring all stripe threads are in + * a well defined state before accessing them: + * + * - park_all_threads() returns once ALL threads are in a "parked" state where they + * may not race with any operations performed on them by the caller. In essence, this + * acts as if a (very large) mutex is held by the caller that prevents the stripe + * from doing anything of its own volition. Must be followed by: + * - unpark_all_threads() returns once ALL threads have been confirmed released from + * a previously parked state. Must be called after park_all_threads(). + * + * Neither park_all_threads() or unpark_all_threads() may be called prior to calling start(). + * + * It's possible to set stripe thread tick-specific options (wait duration, ticks before + * wait) both before and after start() is called. The options will be propagated to any + * running stripe threads in a thread-safe, lock-free manner. + */ +class DistributorStripePool { + using StripeVector = std::vector<std::unique_ptr<DistributorStripeThread>>; + using NativeThreadVector = std::vector<FastOS_ThreadInterface*>; + + FastOS_ThreadPool _thread_pool; + StripeVector _stripes; + NativeThreadVector _threads; + std::mutex _mutex; + std::condition_variable _parker_cond; + size_t _parked_threads; // Must be protected by _park_mutex + vespalib::duration _bootstrap_tick_wait_duration; + uint32_t _bootstrap_ticks_before_wait; + bool _stopped; + + friend class DistributorStripeThread; +public: + DistributorStripePool(); + ~DistributorStripePool(); + + // Set up the stripe pool with a 1-1 relationship between the provided + // stripes and running threads. Can only be called once per pool. + // + // Precondition: stripes.size() > 0 + void start(const std::vector<TickableStripe*>& stripes); + void stop_and_join(); + + void park_all_threads() noexcept; + void unpark_all_threads() noexcept; + + [[nodiscard]] const DistributorStripeThread& stripe(size_t idx) const noexcept { + return *_stripes[idx]; + } + [[nodiscard]] DistributorStripeThread& stripe(size_t idx) noexcept { + return *_stripes[idx]; + } + [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } + + // Applies to all threads. May be called both before and after start(). Thread safe. + void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept; + void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept; +private: + void park_thread_until_released(DistributorStripeThread& thread) noexcept; +}; + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp new file mode 100644 index 00000000000..7359fbe5cf8 --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.cpp @@ -0,0 +1,106 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "distributor_stripe_thread.h" +#include "distributor_stripe.h" +#include "distributor_stripe_pool.h" +#include "tickable_stripe.h" +#include <cassert> + +namespace storage::distributor { + +DistributorStripeThread::DistributorStripeThread(TickableStripe& stripe, + DistributorStripePool& stripe_pool) + : _stripe(stripe), + _stripe_pool(stripe_pool), + _tick_wait_duration(1ms), + _mutex(), + _event_cond(), + _park_cond(), + _ticks_before_wait(10), + _should_park(false), + _should_stop(false), + _waiting_for_event(false) +{} + +DistributorStripeThread::~DistributorStripeThread() = default; + +void DistributorStripeThread::Run(FastOS_ThreadInterface*, void*) { + uint32_t tick_waits_inhibited = 0; + while (!should_stop_thread_relaxed()) { + while (should_park_relaxed()) { + _stripe_pool.park_thread_until_released(*this); + } + // TODO consider enum to only trigger "ticks before wait"-behavior when maintenance was done + const bool did_work = _stripe.tick(); + if (did_work) { + tick_waits_inhibited = 0; + } else if (tick_waits_inhibited >= ticks_before_wait_relaxed()) { + wait_until_event_notified_or_timed_out(); + tick_waits_inhibited = 0; + } else { + ++tick_waits_inhibited; + } + } +} + +void DistributorStripeThread::signal_wants_park() noexcept { + std::lock_guard lock(_mutex); + assert(!should_park_relaxed()); + _should_park.store(true, std::memory_order_relaxed); + if (_waiting_for_event) { + _event_cond.notify_one(); // TODO after unlock? + } +} + +void DistributorStripeThread::unpark_thread() noexcept { + std::lock_guard lock(_mutex); + assert(should_park_relaxed()); + _should_park.store(false, std::memory_order_relaxed); + _park_cond.notify_one(); // TODO after unlock? +} + +void DistributorStripeThread::wait_until_event_notified_or_timed_out() noexcept { + std::unique_lock lock(_mutex); + if (should_stop_thread_relaxed() || should_park_relaxed()) { + return; + } + _waiting_for_event = true; + _event_cond.wait_for(lock, tick_wait_duration_relaxed()); + _waiting_for_event = false; +} + +void DistributorStripeThread::wait_until_unparked() noexcept { + std::unique_lock lock(_mutex); + assert(should_park_relaxed()); + // _should_park is always written within _mutex, relaxed load is safe. + _park_cond.wait(lock, [this]{ return !should_park_relaxed(); }); +} + +void DistributorStripeThread::notify_event_has_triggered() noexcept { + // TODO mutex protect and add flag for "should tick immediately next time" + // TODO only notify if _waiting_for_event == true + _event_cond.notify_one(); +} + +void DistributorStripeThread::signal_should_stop() noexcept { + std::unique_lock lock(_mutex); + assert(!should_park_relaxed()); + _should_stop.store(true, std::memory_order_relaxed); + if (_waiting_for_event) { + _event_cond.notify_one(); + } + // TODO if we ever need it, handle pending thread park. For now we assume that + // the caller never attempts to concurrently park and stop threads. +} + +void DistributorStripeThread::set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept { + static_assert(AtomicDuration::is_always_lock_free); + // No memory ordering required for a "lazy" single value setting such as the tick duration + _tick_wait_duration.store(new_tick_wait_duration, std::memory_order_relaxed); +} + +void DistributorStripeThread::set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept { + _ticks_before_wait.store(new_ticks_before_wait, std::memory_order_relaxed); +} + + +} diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_thread.h b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h new file mode 100644 index 00000000000..b02d733895e --- /dev/null +++ b/storage/src/vespa/storage/distributor/distributor_stripe_thread.h @@ -0,0 +1,81 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/fastos/thread.h> +#include <vespa/vespalib/util/time.h> +#include <atomic> +#include <condition_variable> +#include <mutex> +#include <vector> + +namespace storage::distributor { + +class DistributorStripe; +class DistributorStripePool; +class TickableStripe; + +/** + * A DistributorStripeThread provides threading resources for a single distributor stripe + * and the means of synchronizing access towards it through a DistributorStripePool. + * + * A DistributorStripeThread instance is bidirectionally bound to a particular pool and + * should therefore always be created by the pool itself (never standalone). + */ +class DistributorStripeThread : private FastOS_Runnable { + using AtomicDuration = std::atomic<vespalib::duration>; + + TickableStripe& _stripe; + DistributorStripePool& _stripe_pool; + AtomicDuration _tick_wait_duration; + std::mutex _mutex; + std::condition_variable _event_cond; + std::condition_variable _park_cond; + std::atomic<uint32_t> _ticks_before_wait; + std::atomic<bool> _should_park; + std::atomic<bool> _should_stop; + bool _waiting_for_event; + + friend class DistributorStripePool; +public: + DistributorStripeThread(TickableStripe& stripe, + DistributorStripePool& stripe_pool); + ~DistributorStripeThread(); + + void Run(FastOS_ThreadInterface*, void*) override; + + // Wakes up stripe thread if it's currently waiting for an external event to be triggered, + // such as the arrival of a new RPC message. If thread is parked this call will have no + // effect. + void notify_event_has_triggered() noexcept; + + void set_tick_wait_duration(vespalib::duration new_tick_wait_duration) noexcept; + void set_ticks_before_wait(uint32_t new_ticks_before_wait) noexcept; + + TickableStripe* operator->() noexcept { return &_stripe; } + const TickableStripe* operator->() const noexcept { return &_stripe; } +private: + [[nodiscard]] bool should_stop_thread_relaxed() const noexcept { + return _should_stop.load(std::memory_order_relaxed); + } + + [[nodiscard]] bool should_park_relaxed() const noexcept { + return _should_park.load(std::memory_order_relaxed); + } + + [[nodiscard]] vespalib::duration tick_wait_duration_relaxed() const noexcept { + return _tick_wait_duration.load(std::memory_order_relaxed); + } + + [[nodiscard]] uint32_t ticks_before_wait_relaxed() const noexcept { + return _ticks_before_wait.load(std::memory_order_relaxed); + } + + void signal_wants_park() noexcept; + void unpark_thread() noexcept; + void wait_until_event_notified_or_timed_out() noexcept; + void wait_until_unparked() noexcept; + + void signal_should_stop() noexcept; +}; + +} diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp new file mode 100644 index 00000000000..6bc9c03158a --- /dev/null +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -0,0 +1,113 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "multi_threaded_stripe_access_guard.h" +#include "distributor_stripe.h" +#include "distributor_stripe_pool.h" +#include "distributor_stripe_thread.h" + +namespace storage::distributor { + +MultiThreadedStripeAccessGuard::MultiThreadedStripeAccessGuard( + MultiThreadedStripeAccessor& accessor, + DistributorStripePool& stripe_pool) + : _accessor(accessor), + _stripe_pool(stripe_pool) +{ + assert(_stripe_pool.stripe_count() == 1); // TODO STRIPE many more yes yes + _stripe_pool.park_all_threads(); +} + +MultiThreadedStripeAccessGuard::~MultiThreadedStripeAccessGuard() { + _stripe_pool.unpark_all_threads(); + _accessor.mark_guard_released(); +} + +void MultiThreadedStripeAccessGuard::update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) { + // TODO STRIPE multiple stripes + first_stripe().update_total_distributor_config(std::move(config)); +} + +void MultiThreadedStripeAccessGuard::update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) { + // TODO STRIPE multiple stripes + first_stripe().update_distribution_config(new_configs); +} + +void MultiThreadedStripeAccessGuard::set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) { + // TODO STRIPE multiple stripes + first_stripe().getBucketSpaceRepo().set_pending_cluster_state_bundle(pending_state); +} + +void MultiThreadedStripeAccessGuard::clear_pending_cluster_state_bundle() { + // TODO STRIPE multiple stripes + first_stripe().getBucketSpaceRepo().clear_pending_cluster_state_bundle(); +} + +void MultiThreadedStripeAccessGuard::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) { + // TODO STRIPE multiple stripes + first_stripe().enableClusterStateBundle(new_state); +} + +void MultiThreadedStripeAccessGuard::notify_distribution_change_enabled() { + // TODO STRIPE multiple stripes + first_stripe().notifyDistributionChangeEnabled(); +} + +PotentialDataLossReport +MultiThreadedStripeAccessGuard::remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) +{ + // TODO STRIPE multiple stripes + return first_stripe().bucket_db_updater().remove_superfluous_buckets(bucket_space, new_state, is_distribution_change); +} + +void +MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) +{ + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, entries); +} + +void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() { + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().update_read_snapshot_before_db_pruning(); +} + +void MultiThreadedStripeAccessGuard::update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) { + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().update_read_snapshot_after_db_pruning(new_state); +} + +void MultiThreadedStripeAccessGuard::update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) { + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().update_read_snapshot_after_activation(activated_state); +} + +void MultiThreadedStripeAccessGuard::clear_read_only_bucket_repo_databases() { + // TODO STRIPE multiple stripes + first_stripe().bucket_db_updater().clearReadOnlyBucketRepoDatabases(); +} + +DistributorStripe& MultiThreadedStripeAccessGuard::first_stripe() noexcept { + return dynamic_cast<DistributorStripe&>(_stripe_pool.stripe(0)); +} + +std::unique_ptr<StripeAccessGuard> MultiThreadedStripeAccessor::rendezvous_and_hold_all() { + // For sanity checking of invariant of only one guard being allowed at any given time. + assert(!_guard_held); + _guard_held = true; + return std::make_unique<MultiThreadedStripeAccessGuard>(*this, _stripe_pool); +} + +void MultiThreadedStripeAccessor::mark_guard_released() { + assert(_guard_held); + _guard_held = false; +} + +} diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h new file mode 100644 index 00000000000..376eccd1c4a --- /dev/null +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.h @@ -0,0 +1,79 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "stripe_access_guard.h" + +namespace storage::distributor { + +class MultiThreadedStripeAccessor; +class DistributorStripePool; +class DistributorStripe; + +/** + * StripeAccessGuard implementation which provides exclusive access to a set of stripes + * by ensuring that all stripe threads are safely parked upon guard construction. This + * means that as long as a guard exists, access to stripes is guaranteed to not cause + * data races. + * + * Threads are automatically un-parked upon guard destruction. + * + * At most one guard instance may exist at any given time. + */ +class MultiThreadedStripeAccessGuard : public StripeAccessGuard { + MultiThreadedStripeAccessor& _accessor; + DistributorStripePool& _stripe_pool; +public: + MultiThreadedStripeAccessGuard(MultiThreadedStripeAccessor& accessor, + DistributorStripePool& stripe_pool); + ~MultiThreadedStripeAccessGuard() override; + + void update_total_distributor_config(std::shared_ptr<const DistributorConfiguration> config) override; + + void update_distribution_config(const BucketSpaceDistributionConfigs& new_configs) override; + void set_pending_cluster_state_bundle(const lib::ClusterStateBundle& pending_state) override; + void clear_pending_cluster_state_bundle() override; + void enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state) override; + void notify_distribution_change_enabled() override; + + PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace bucket_space, + const lib::ClusterState& new_state, + bool is_distribution_change) override; + void merge_entries_into_db(document::BucketSpace bucket_space, + api::Timestamp gathered_at_timestamp, + const lib::Distribution& distribution, + const lib::ClusterState& new_state, + const char* storage_up_states, + const std::unordered_set<uint16_t>& outdated_nodes, + const std::vector<dbtransition::Entry>& entries) override; + + void update_read_snapshot_before_db_pruning() override; + void update_read_snapshot_after_db_pruning(const lib::ClusterStateBundle& new_state) override; + void update_read_snapshot_after_activation(const lib::ClusterStateBundle& activated_state) override; + void clear_read_only_bucket_repo_databases() override; +private: + // TODO STRIPE remove once multi threaded stripe support is implemented + DistributorStripe& first_stripe() noexcept; +}; + +/** + * Impl of StripeAccessor which creates MultiThreadedStripeAccessGuards that cover all threads + * in the provided stripe pool. + */ +class MultiThreadedStripeAccessor : public StripeAccessor { + DistributorStripePool& _stripe_pool; + bool _guard_held; + + friend class MultiThreadedStripeAccessGuard; +public: + MultiThreadedStripeAccessor(DistributorStripePool& stripe_pool) + : _stripe_pool(stripe_pool), + _guard_held(false) + {} + ~MultiThreadedStripeAccessor() override = default; + + std::unique_ptr<StripeAccessGuard> rendezvous_and_hold_all() override; +private: + void mark_guard_released(); +}; + +} diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index 67b31343f6f..42fe0d5c29a 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -141,6 +141,8 @@ private: friend class DistributorTestUtil; // TODO refactor and rewire to avoid needing this direct meddling friend class LegacySingleStripeAccessGuard; + friend class MultiThreadedStripeAccessGuard; + // Only to be used by tests that want to ensure both the BucketDBUpdater _and_ the Distributor // components agree on the currently active cluster state bundle. // Transitively invokes Distributor::enableClusterStateBundle diff --git a/storage/src/vespa/storage/distributor/tickable_stripe.h b/storage/src/vespa/storage/distributor/tickable_stripe.h new file mode 100644 index 00000000000..323db627d31 --- /dev/null +++ b/storage/src/vespa/storage/distributor/tickable_stripe.h @@ -0,0 +1,24 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +namespace storage::distributor { + +/** + * A tickable stripe is the minimal binding glue between the stripe's worker thread and + * the actual implementation. Primarily allows for easier testing without having to + * fake an entire actual DistributorStripe. + */ +class TickableStripe { +public: + virtual ~TickableStripe() = default; + + // Perform a single operation tick of the stripe logic. + // If function returns true, the caller should not perform any waiting before calling + // tick() again. This generally means that the stripe is processing client operations + // and wants to continue doing so as quickly as possible. + // Only used for multi-threaded striped setups. + // TODO return an enum indicating type of last processed event? E.g. external, maintenance, none, ... + virtual bool tick() = 0; +}; + +} |