diff options
author | Geir Storli <geirst@verizonmedia.com> | 2021-05-28 13:56:18 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-28 13:56:18 +0200 |
commit | cbae0892dad70845ac0a444a1d417212c36ed948 (patch) | |
tree | f6421b3a0a98dfa795f14823c5675ecb021f3de0 | |
parent | a1d1be5ec41fb72ec2cba4552f40a5bbb4834aac (diff) | |
parent | 6b43ecc479b38810c7d060a96c0765e5d872346a (diff) |
Merge pull request #18020 from vespa-engine/geirst/multi-stripe-merge-entries-into-db
Multi-stripe merging of entries into db
12 files changed, 235 insertions, 24 deletions
diff --git a/storage/src/tests/common/CMakeLists.txt b/storage/src/tests/common/CMakeLists.txt index 400255964d6..4e7cecb9d9f 100644 --- a/storage/src/tests/common/CMakeLists.txt +++ b/storage/src/tests/common/CMakeLists.txt @@ -12,6 +12,7 @@ vespa_add_library(storage_testcommon TEST vespa_add_executable(storage_common_gtest_runner_app TEST SOURCES + bucket_stripe_utils_test.cpp bucket_utils_test.cpp global_bucket_space_distribution_converter_test.cpp gtest_runner.cpp diff --git a/storage/src/tests/common/bucket_stripe_utils_test.cpp b/storage/src/tests/common/bucket_stripe_utils_test.cpp new file mode 100644 index 00000000000..ae9f656e4d7 --- /dev/null +++ b/storage/src/tests/common/bucket_stripe_utils_test.cpp @@ -0,0 +1,37 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <vespa/document/bucket/bucketid.h> +#include <vespa/storage/common/bucket_stripe_utils.h> +#include <vespa/vespalib/gtest/gtest.h> + +using document::BucketId; +using storage::calc_num_stripe_bits; +using storage::stripe_of_bucket_key; +constexpr uint8_t MUB = storage::spi::BucketLimits::MinUsedBits; + +TEST(BucketStripeUtilsTest, stripe_of_bucket_key) +{ + BucketId id(MUB, std::numeric_limits<uint64_t>::max()); + uint64_t key = id.stripUnused().toKey(); + EXPECT_EQ(0, stripe_of_bucket_key(key, 0)); + EXPECT_EQ(1, stripe_of_bucket_key(key, 1)); + EXPECT_EQ(3, stripe_of_bucket_key(key, 2)); + EXPECT_EQ(127, stripe_of_bucket_key(key, 7)); + EXPECT_EQ(255, stripe_of_bucket_key(key, 8)); +} + +TEST(BucketStripeUtilsTest, calc_num_stripe_bits) +{ + EXPECT_EQ(0, calc_num_stripe_bits(1)); + EXPECT_EQ(1, calc_num_stripe_bits(2)); + EXPECT_EQ(2, calc_num_stripe_bits(4)); + EXPECT_EQ(7, calc_num_stripe_bits(128)); + EXPECT_EQ(8, calc_num_stripe_bits(256)); +} + +TEST(BucketStripeUtilsTest, max_stripe_values) +{ + EXPECT_EQ(8, storage::MaxStripeBits); + EXPECT_EQ(256, storage::MaxStripes); +} + diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp index 5a5ebb8c823..660d58a9a28 100644 --- a/storage/src/tests/distributor/distributortest.cpp +++ b/storage/src/tests/distributor/distributortest.cpp @@ -1283,11 +1283,11 @@ TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_report // TODO STRIPE make delay configurable instead of hardcoded TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) { - set_num_distributor_stripes(3); + set_num_distributor_stripes(4); createLinks(); getClock().setAbsoluteTimeInSeconds(1000); - for (uint16_t i = 0; i < 3; ++i) { + for (uint16_t i = 0; i < 4; ++i) { getDistributor().notify_stripe_wants_to_send_host_info(i); } tickDistributorNTimes(1); diff --git a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp index 8513186d1e1..ba28396886f 100644 --- a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp +++ b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp @@ -1,22 +1,43 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "mock_tickable_stripe.h" #include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/persistence/spi/bucket_limits.h> #include <vespa/storage/distributor/distributor_stripe_pool.h> #include <vespa/storage/distributor/multi_threaded_stripe_access_guard.h> +#include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> #include <vespa/vespalib/gtest/gtest.h> using namespace ::testing; +using RawIdVector = std::vector<uint64_t>; + +constexpr uint8_t MUB = storage::spi::BucketLimits::MinUsedBits; namespace storage::distributor { struct AggregationTestingMockTickableStripe : MockTickableStripe { PotentialDataLossReport report; + std::vector<dbtransition::Entry> entries; PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override { return report; } + void merge_entries_into_db(document::BucketSpace, api::Timestamp, const lib::Distribution&, + const lib::ClusterState&, const char*, const std::unordered_set<uint16_t>&, + const std::vector<dbtransition::Entry>& entries_in) override { + entries = entries_in; + } + + RawIdVector entries_as_raw_ids() const { + std::vector<uint64_t> result; + for (const auto& entry : entries) { + result.push_back(entry.bucket_id().withoutCountBits()); + } + std::sort(result.begin(), result.end()); + return result; + } + bool tick() override { return false; } @@ -25,6 +46,7 @@ struct AggregationTestingMockTickableStripe : MockTickableStripe { struct MultiThreadedStripeAccessGuardTest : Test { DistributorStripePool _pool; MultiThreadedStripeAccessor _accessor; + AggregationTestingMockTickableStripe _stripe0; AggregationTestingMockTickableStripe _stripe1; AggregationTestingMockTickableStripe _stripe2; AggregationTestingMockTickableStripe _stripe3; @@ -39,21 +61,77 @@ struct MultiThreadedStripeAccessGuardTest : Test { } void start_pool_with_stripes() { - _pool.start({{&_stripe1, &_stripe2, &_stripe3}}); + _pool.start({{&_stripe0, &_stripe1, &_stripe2, &_stripe3}}); } + + void start_pool_with_one_stripe() { + _pool.start({&_stripe0}); + } + + void merge_entries_into_db(const RawIdVector& raw_ids) { + std::vector<dbtransition::Entry> entries; + for (auto raw_id : raw_ids) { + entries.emplace_back(document::BucketId(MUB, raw_id), BucketCopy()); + } + std::sort(entries.begin(), entries.end()); + auto guard = _accessor.rendezvous_and_hold_all(); + guard->merge_entries_into_db(document::FixedBucketSpaces::default_space(), api::Timestamp(), + lib::Distribution(), lib::ClusterState(), "", {}, + entries); + } + }; TEST_F(MultiThreadedStripeAccessGuardTest, remove_superfluous_buckets_aggregates_reports_across_stripes) { - _stripe1.report = PotentialDataLossReport(20, 100); - _stripe2.report = PotentialDataLossReport(5, 200); - _stripe3.report = PotentialDataLossReport(7, 350); + _stripe0.report = PotentialDataLossReport(20, 100); + _stripe1.report = PotentialDataLossReport(5, 200); + _stripe2.report = PotentialDataLossReport(7, 350); + _stripe3.report = PotentialDataLossReport(3, 30); start_pool_with_stripes(); auto guard = _accessor.rendezvous_and_hold_all(); auto report = guard->remove_superfluous_buckets(document::FixedBucketSpaces::default_space(), lib::ClusterState(), false); - EXPECT_EQ(report.buckets, 32); - EXPECT_EQ(report.documents, 650); + EXPECT_EQ(report.buckets, 35); + EXPECT_EQ(report.documents, 680); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_all_stripes) { + start_pool_with_stripes(); + // Note: The bucket key is calculated by reversing the bits of the raw bucket id. + // We have 4 stripes and use 2 stripe bits. The 2 MSB of the bucket key is used to map to stripe. + // This gives the following mapping from raw bucket id to bucket key to stripe: + // raw id | key (8 MSB) | stripe + // 0x..0 | 00000000 | 0 + // 0x..1 | 10000000 | 2 + // 0x..2 | 01000000 | 1 + // 0x..3 | 11000000 | 3 + merge_entries_into_db({0x10,0x20,0x30,0x40,0x11,0x21,0x31,0x12,0x22,0x13}); + EXPECT_EQ(RawIdVector({0x10,0x20,0x30,0x40}), _stripe0.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x11,0x21,0x31}), _stripe2.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids()); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_subset_of_stripes) { + start_pool_with_stripes(); + merge_entries_into_db({0x12,0x22,0x13}); + EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector(), _stripe2.entries_as_raw_ids()); + EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids()); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_one_stripe) { + start_pool_with_one_stripe(); + merge_entries_into_db({0x10,0x11}); + EXPECT_EQ(RawIdVector({0x10,0x11}), _stripe0.entries_as_raw_ids()); +} + +TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_handles_empty_entries_vector) { + start_pool_with_one_stripe(); + merge_entries_into_db({}); + EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids()); } } diff --git a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h index b01f06c2b3f..73b5c37ab8b 100644 --- a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h +++ b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h @@ -27,7 +27,6 @@ public: using Decision = typename ParentType::Decision; using BucketId = document::BucketId; - constexpr static uint8_t MaxStripeBits = 8; private: using StripedDBType = BTreeLockableMap<T>; uint8_t _n_stripe_bits; diff --git a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp index e4f6efa30a3..310a9d5154b 100644 --- a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp +++ b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp @@ -3,20 +3,13 @@ #include "striped_btree_lockable_map.h" #include "btree_lockable_map.hpp" +#include <vespa/storage/common/bucket_stripe_utils.h> #include <algorithm> #include <cassert> #include <queue> namespace storage::bucketdb { -namespace { - -constexpr uint8_t used_bits_of(uint64_t key) noexcept { - return static_cast<uint8_t>(key & 0b11'1111ULL); -} - -} - template <typename T> StripedBTreeLockableMap<T>::StripedBTreeLockableMap(uint8_t n_stripe_bits) : _n_stripe_bits(n_stripe_bits), @@ -37,9 +30,7 @@ StripedBTreeLockableMap<T>::~StripedBTreeLockableMap() = default; template <typename T> size_t StripedBTreeLockableMap<T>::stripe_of(key_type key) const noexcept { - assert(used_bits_of(key) >= _n_stripe_bits); - // Since bucket keys have count-bits at the LSB positions, we want to look at the MSBs instead. - return (key >> (64 - _n_stripe_bits)); + return stripe_of_bucket_key(key, _n_stripe_bits); } template <typename T> diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index 741d97f78ef..ae428929946 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -1,6 +1,7 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_common OBJECT SOURCES + bucket_stripe_utils.cpp bucketmessages.cpp content_bucket_space.cpp content_bucket_space_repo.cpp diff --git a/storage/src/vespa/storage/common/bucket_stripe_utils.cpp b/storage/src/vespa/storage/common/bucket_stripe_utils.cpp new file mode 100644 index 00000000000..f1454403153 --- /dev/null +++ b/storage/src/vespa/storage/common/bucket_stripe_utils.cpp @@ -0,0 +1,43 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "bucket_stripe_utils.h" +#include <vespa/vespalib/util/alloc.h> +#include <cassert> + +namespace storage { + +namespace { + +constexpr uint8_t used_bits_of(uint64_t key) noexcept { + return static_cast<uint8_t>(key & 0b11'1111ULL); +} + +} + +size_t +stripe_of_bucket_key(uint64_t key, uint8_t n_stripe_bits) noexcept +{ + if (n_stripe_bits == 0) { + return 0; + } + assert(used_bits_of(key) >= n_stripe_bits); + // Since bucket keys have count-bits at the LSB positions, we want to look at the MSBs instead. + return (key >> (64 - n_stripe_bits)); +} + +uint8_t +calc_num_stripe_bits(size_t n_stripes) noexcept +{ + assert(n_stripes > 0); + if (n_stripes == 1) { + return 0; + } + assert(n_stripes <= MaxStripes); + assert(n_stripes == vespalib::roundUp2inN(n_stripes)); + + auto result = vespalib::Optimized::msbIdx(n_stripes); + assert(result <= MaxStripeBits); + return result; +} + +} diff --git a/storage/src/vespa/storage/common/bucket_stripe_utils.h b/storage/src/vespa/storage/common/bucket_stripe_utils.h new file mode 100644 index 00000000000..b4052a893a2 --- /dev/null +++ b/storage/src/vespa/storage/common/bucket_stripe_utils.h @@ -0,0 +1,28 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/persistence/spi/bucket_limits.h> +#include <cstdint> +#include <cstdlib> + +namespace storage { + +constexpr static uint8_t MaxStripeBits = spi::BucketLimits::MinUsedBits; +constexpr static size_t MaxStripes = (1ULL << MaxStripeBits); + +/** + * Returns the stripe in which the given bucket key belongs, + * when using the given number of stripe bits. + */ +size_t stripe_of_bucket_key(uint64_t key, uint8_t n_stripe_bits) noexcept; + +/** + * Returns the number of stripe bits used to represent the given number of stripes. + * + * This also asserts that the number of stripes is valid (power of 2 and within MaxStripes boundary). + */ +uint8_t calc_num_stripe_bits(size_t n_stripes) noexcept; + +} + diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp index 715d95e70fb..b7183f4e157 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp @@ -1,6 +1,7 @@ // Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "distributor_stripe_pool.h" #include "distributor_stripe_thread.h" +#include <vespa/storage/common/bucket_stripe_utils.h> #include <vespa/vespalib/util/size_literals.h> #include <cassert> @@ -8,6 +9,7 @@ namespace storage::distributor { DistributorStripePool::DistributorStripePool() : _thread_pool(512_Ki), + _n_stripe_bits(0), _stripes(), _threads(), _mutex(), @@ -48,6 +50,14 @@ void DistributorStripePool::unpark_all_threads() noexcept { _parker_cond.wait(lock, [this]{ return (_parked_threads == 0); }); } +const TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) const noexcept { + return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe(); +} + +TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) noexcept { + return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe(); +} + void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept { std::unique_lock lock(_mutex); assert(_parked_threads < _threads.size()); @@ -67,6 +77,8 @@ void DistributorStripePool::park_thread_until_released(DistributorStripeThread& void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) { assert(!stripes.empty()); assert(_stripes.empty() && _threads.empty()); + // Note: This also asserts that the number of stripes is valid (power of 2 and within MaxStripes boundary). + _n_stripe_bits = calc_num_stripe_bits(stripes.size()); _stripes.reserve(stripes.size()); _threads.reserve(stripes.size()); diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h index 5e72cb47fc4..75328479296 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h +++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h @@ -38,6 +38,7 @@ class DistributorStripePool { using NativeThreadVector = std::vector<FastOS_ThreadInterface*>; FastOS_ThreadPool _thread_pool; + uint8_t _n_stripe_bits; StripeVector _stripes; NativeThreadVector _threads; std::mutex _mutex; @@ -57,7 +58,8 @@ public: // Set up the stripe pool with a 1-1 relationship between the provided // stripes and running threads. Can only be called once per pool. // - // Precondition: stripes.size() > 0 + // Precondition: stripes.size() > 0 && + // when stripes.size() > 1: is a power of 2 and within storage::MaxStripes boundary void start(const std::vector<TickableStripe*>& stripes); void stop_and_join(); @@ -76,6 +78,8 @@ public: [[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept { return *_stripes[idx]; } + [[nodiscard]] const TickableStripe& stripe_of_key(uint64_t key) const noexcept; + [[nodiscard]] TickableStripe& stripe_of_key(uint64_t key) noexcept; [[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); } [[nodiscard]] bool is_stopped() const noexcept { return _stopped; } diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp index a5adf732824..80af41e57ad 100644 --- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp +++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp @@ -84,9 +84,26 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck const std::unordered_set<uint16_t>& outdated_nodes, const std::vector<dbtransition::Entry>& entries) { - // TODO STRIPE multiple stripes - first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, - new_state, storage_up_states, outdated_nodes, entries); + if (entries.empty()) { + return; + } + std::vector<dbtransition::Entry> stripe_entries; + stripe_entries.reserve(entries.size() / _stripe_pool.stripe_count()); + auto* curr_stripe = &_stripe_pool.stripe_of_key(entries[0].bucket_key); + stripe_entries.push_back(entries[0]); + for (size_t i = 1; i < entries.size(); ++i) { + const auto& entry = entries[i]; + auto* next_stripe = &_stripe_pool.stripe_of_key(entry.bucket_key); + if (curr_stripe != next_stripe) { + curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, stripe_entries); + stripe_entries.clear(); + } + curr_stripe = next_stripe; + stripe_entries.push_back(entry); + } + curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution, + new_state, storage_up_states, outdated_nodes, stripe_entries); } void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() { |