From 6b43ecc479b38810c7d060a96c0765e5d872346a Mon Sep 17 00:00:00 2001 From: Geir Storli Date: Fri, 28 May 2021 09:12:26 +0000 Subject: Make merge_entries_into_db() work across multiple stripes by handling each stripe in sequence. --- storage/src/tests/distributor/distributortest.cpp | 4 +- .../multi_thread_stripe_access_guard_test.cpp | 90 ++++++++++++++++++++-- .../distributor/distributor_stripe_pool.cpp | 12 +++ .../storage/distributor/distributor_stripe_pool.h | 6 +- .../multi_threaded_stripe_access_guard.cpp | 23 +++++- 5 files changed, 123 insertions(+), 12 deletions(-) (limited to 'storage/src') diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 5a5ebb8c823..660d58a9a28 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -1283,11 +1283,11 @@ TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_report // TODO STRIPE make delay configurable instead of hardcoded TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) { - set_num_distributor_stripes(3); + set_num_distributor_stripes(4); createLinks(); getClock().setAbsoluteTimeInSeconds(1000); - for (uint16_t i = 0; i < 3; ++i) { + for (uint16_t i = 0; i < 4; ++i) { getDistributor().notify_stripe_wants_to_send_host_info(i); } tickDistributorNTimes(1); diff --git a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp index 8513186d1e1..ba28396886f 100644 --- a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp +++ b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp @@ -1,22 +1,43 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mock_tickable_stripe.h" #include +#include #include #include +#include #include #include using namespace ::testing; +using RawIdVector = std::vector; + +constexpr uint8_t MUB = storage::spi::BucketLimits::MinUsedBits; namespace storage::distributor { struct AggregationTestingMockTickableStripe : MockTickableStripe { PotentialDataLossReport report; + std::vector entries; PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override { return report; } + void merge_entries_into_db(document::BucketSpace, api::Timestamp, const lib::Distribution&, + const lib::ClusterState&, const char*, const std::unordered_set&, + const std::vector& entries_in) override { + entries = entries_in; + } + + RawIdVector entries_as_raw_ids() const { + std::vector result; + for (const auto& entry : entries) { + result.push_back(entry.bucket_id().withoutCountBits()); + } + std::sort(result.begin(), result.end()); + return result; + } + bool tick() override { return false; } @@ -25,6 +46,7 @@ struct AggregationTestingMockTickableStripe : MockTickableStripe { struct MultiThreadedStripeAccessGuardTest : Test { DistributorStripePool _pool; MultiThreadedStripeAccessor _accessor; + AggregationTestingMockTickableStripe _stripe0; AggregationTestingMockTickableStripe _stripe1; AggregationTestingMockTickableStripe _stripe2; AggregationTestingMockTickableStripe _stripe3; @@ -39,21 +61,77 @@ struct MultiThreadedStripeAccessGuardTest : Test { } void start_pool_with_stripes() { - _pool.start({{&_stripe1, &_stripe2, &_stripe3}}); + _pool.start({{&_stripe0, &_stripe1, &_stripe2, &_stripe3}}); } + + void start_pool_with_one_stripe() { + _pool.start({&_stripe0}); + } + + void merge_entries_into_db(const RawIdVector& raw_ids) { + std::vector entries; + for (auto raw_id : raw_ids) { + entries.emplace_back(document::BucketId(MUB, raw_id), BucketCopy()); + } + std::sort(entries.begin(), entries.end()); + auto guard = _accessor.rendezvous_and_hold_all(); + guard->merge_entries_into_db(document::FixedBucketSpaces::default_space(), api::Timestamp(), + lib::Distribution(), lib::ClusterState(), "", {}, + entries); + } + }; TEST_F(MultiThreadedStripeAccessGuardTest, remove_superfluous_buckets_aggregates_reports_across_stripes) { - _stripe1.report = PotentialDataLossReport(20, 100); - _stripe2.report = PotentialDataLossReport(5, 200); - _stripe3.report = PotentialDataLossReport(7, 350); + _stripe0.report = PotentialDataLossReport(20, 100); + _stripe1.report = PotentialDataLossReport(5, 200); + _stripe2.report = PotentialDataLossReport(7, 350); + _stripe3.report = PotentialDataLossReport(3, 30); start_pool_with_stripes(); auto guard = _accessor.rendezvous_and_hold_all(); auto report = guard->remove_superfluous_buckets(document::FixedBucketSpaces::default_space(), lib::ClusterState(), false); - EXPECT_EQ(report.buckets, 32); - EXPECT_EQ(report.documents, 650); + EXPECT_EQ(report.buckets, 35); + EXPECT_EQ(report.documents, 680); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_all_stripes) { + start_pool_with_stripes(); + // Note: The bucket key is calculated by reversing the bits of the raw bucket id. + // We have 4 stripes and use 2 stripe bits. The 2 MSB of the bucket key is used to map to stripe. + // This gives the following mapping from raw bucket id to bucket key to stripe: + // raw id | key (8 MSB) | stripe + // 0x..0 | 00000000 | 0 + // 0x..1 | 10000000 | 2 + // 0x..2 | 01000000 | 1 + // 0x..3 | 11000000 | 3 + merge_entries_into_db({0x10,0x20,0x30,0x40,0x11,0x21,0x31,0x12,0x22,0x13}); + EXPECT_EQ(RawIdVector({0x10,0x20,0x30,0x40}), _stripe0.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x11,0x21,0x31}), _stripe2.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids()); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_subset_of_stripes) { + start_pool_with_stripes(); + merge_entries_into_db({0x12,0x22,0x13}); + EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector(), _stripe2.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids()); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_one_stripe) { + start_pool_with_one_stripe(); + merge_entries_into_db({0x10,0x11}); + EXPECT_EQ(RawIdVector({0x10,0x11}), _stripe0.entries_as_raw_ids()); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_handles_empty_entries_vector) { + start_pool_with_one_stripe(); + merge_entries_into_db({}); + EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids()); } } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp index 715d95e70fb..b7183f4e157 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "distributor_stripe_pool.h" #include "distributor_stripe_thread.h" +#include #include #include @@ -8,6 +9,7 @@ namespace storage::distributor { DistributorStripePool::DistributorStripePool() : _thread_pool(512_Ki), + _n_stripe_bits(0), _stripes(), _threads(), _mutex(), @@ -48,6 +50,14 @@ void DistributorStripePool::unpark_all_threads() noexcept { _parker_cond.wait(lock, [this]{ return (_parked_threads == 0); }); } +const TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) const noexcept { + return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe(); +} + +TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) noexcept { + return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe(); +} + void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept { std::unique_lock lock(_mutex); assert(_parked_threads < _threads.size()); @@ -67,6 +77,8 @@ void DistributorStripePool::park_thread_until_released(DistributorStripeThread& void DistributorStripePool::start(const std::vector& stripes) { assert(!stripes.empty()); assert(_stripes.empty() && _threads.empty()); + // Note: This also asserts that the number of stripes is valid (power of 2 and within MaxStripes boundary). + _n_stripe_bits = calc_num_stripe_bits(stripes.size()); _stripes.reserve(stripes.size()); _threads.reserve(stripes.size()); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 5e72cb47fc4..75328479296 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -38,6 +38,7 @@ class DistributorStripePool { using NativeThreadVector = std::vector; FastOS_ThreadPool _thread_pool; + uint8_t _n_stripe_bits; StripeVector _stripes; NativeThreadVector _threads; std::mutex _mutex; @@ -57,7 +58,8 @@ public: // Set up the stripe pool with a 1-1 relationship between the provided // stripes and running threads. Can only be called once per pool. // - // Precondition: stripes.size() > 0 + // Precondition: stripes.size() > 0 && + // when stripes.size() > 1: is a power of 2 and within storage::MaxStripes boundary void start(const std::vector& stripes); void stop_and_join(); @@ -76,6 +78,8 @@ public: [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept { return *_stripes[idx]; } + [[nodiscard]] const TickableStripe& stripe_of_key(uint64_t key) const noexcept; + [[nodiscard]] TickableStripe& stripe_of_key(uint64_t key) noexcept; [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } [[nodiscard]] bool is_stopped() const noexcept { return _stopped; } diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index a5adf732824..80af41e57ad 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -84,9 +84,26 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck const std::unordered_set& outdated_nodes, const std::vector& entries) { - // TODO STRIPE multiple stripes - first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, - new_state, storage_up_states, outdated_nodes, entries); + if (entries.empty()) { + return; + } + std::vector stripe_entries; + stripe_entries.reserve(entries.size() / _stripe_pool.stripe_count()); + auto* curr_stripe = &_stripe_pool.stripe_of_key(entries[0].bucket_key); + stripe_entries.push_back(entries[0]); + for (size_t i = 1; i < entries.size(); ++i) { + const auto& entry = entries[i]; + auto* next_stripe = &_stripe_pool.stripe_of_key(entry.bucket_key); + if (curr_stripe != next_stripe) { + curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, stripe_entries); + stripe_entries.clear(); + } + curr_stripe = next_stripe; + stripe_entries.push_back(entry); + } + curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, stripe_entries); } void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() { -- cgit v1.2.3