aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-05-28 13:56:18 +0200
committerGitHub <noreply@github.com>2021-05-28 13:56:18 +0200
commitcbae0892dad70845ac0a444a1d417212c36ed948 (patch)
treef6421b3a0a98dfa795f14823c5675ecb021f3de0
parenta1d1be5ec41fb72ec2cba4552f40a5bbb4834aac (diff)
parent6b43ecc479b38810c7d060a96c0765e5d872346a (diff)
Merge pull request #18020 from vespa-engine/geirst/multi-stripe-merge-entries-into-db
Multi-stripe merging of entries into db
-rw-r--r--storage/src/tests/common/CMakeLists.txt1
-rw-r--r--storage/src/tests/common/bucket_stripe_utils_test.cpp37
-rw-r--r--storage/src/tests/distributor/distributortest.cpp4
-rw-r--r--storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp90
-rw-r--r--storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h1
-rw-r--r--storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp13
-rw-r--r--storage/src/vespa/storage/common/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/common/bucket_stripe_utils.cpp43
-rw-r--r--storage/src/vespa/storage/common/bucket_stripe_utils.h28
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h6
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp23
12 files changed, 235 insertions, 24 deletions
diff --git a/storage/src/tests/common/CMakeLists.txt b/storage/src/tests/common/CMakeLists.txt
index 400255964d6..4e7cecb9d9f 100644
--- a/storage/src/tests/common/CMakeLists.txt
+++ b/storage/src/tests/common/CMakeLists.txt
@@ -12,6 +12,7 @@ vespa_add_library(storage_testcommon TEST
vespa_add_executable(storage_common_gtest_runner_app TEST
SOURCES
+ bucket_stripe_utils_test.cpp
bucket_utils_test.cpp
global_bucket_space_distribution_converter_test.cpp
gtest_runner.cpp
diff --git a/storage/src/tests/common/bucket_stripe_utils_test.cpp b/storage/src/tests/common/bucket_stripe_utils_test.cpp
new file mode 100644
index 00000000000..ae9f656e4d7
--- /dev/null
+++ b/storage/src/tests/common/bucket_stripe_utils_test.cpp
@@ -0,0 +1,37 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <vespa/document/bucket/bucketid.h>
+#include <vespa/storage/common/bucket_stripe_utils.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using document::BucketId;
+using storage::calc_num_stripe_bits;
+using storage::stripe_of_bucket_key;
+constexpr uint8_t MUB = storage::spi::BucketLimits::MinUsedBits;
+
+TEST(BucketStripeUtilsTest, stripe_of_bucket_key)
+{
+ BucketId id(MUB, std::numeric_limits<uint64_t>::max());
+ uint64_t key = id.stripUnused().toKey();
+ EXPECT_EQ(0, stripe_of_bucket_key(key, 0));
+ EXPECT_EQ(1, stripe_of_bucket_key(key, 1));
+ EXPECT_EQ(3, stripe_of_bucket_key(key, 2));
+ EXPECT_EQ(127, stripe_of_bucket_key(key, 7));
+ EXPECT_EQ(255, stripe_of_bucket_key(key, 8));
+}
+
+TEST(BucketStripeUtilsTest, calc_num_stripe_bits)
+{
+ EXPECT_EQ(0, calc_num_stripe_bits(1));
+ EXPECT_EQ(1, calc_num_stripe_bits(2));
+ EXPECT_EQ(2, calc_num_stripe_bits(4));
+ EXPECT_EQ(7, calc_num_stripe_bits(128));
+ EXPECT_EQ(8, calc_num_stripe_bits(256));
+}
+
+TEST(BucketStripeUtilsTest, max_stripe_values)
+{
+ EXPECT_EQ(8, storage::MaxStripeBits);
+ EXPECT_EQ(256, storage::MaxStripes);
+}
+
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 5a5ebb8c823..660d58a9a28 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -1283,11 +1283,11 @@ TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_report
// TODO STRIPE make delay configurable instead of hardcoded
TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) {
- set_num_distributor_stripes(3);
+ set_num_distributor_stripes(4);
createLinks();
getClock().setAbsoluteTimeInSeconds(1000);
- for (uint16_t i = 0; i < 3; ++i) {
+ for (uint16_t i = 0; i < 4; ++i) {
getDistributor().notify_stripe_wants_to_send_host_info(i);
}
tickDistributorNTimes(1);
diff --git a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
index 8513186d1e1..ba28396886f 100644
--- a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
+++ b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
@@ -1,22 +1,43 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "mock_tickable_stripe.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/persistence/spi/bucket_limits.h>
#include <vespa/storage/distributor/distributor_stripe_pool.h>
#include <vespa/storage/distributor/multi_threaded_stripe_access_guard.h>
+#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace ::testing;
+using RawIdVector = std::vector<uint64_t>;
+
+constexpr uint8_t MUB = storage::spi::BucketLimits::MinUsedBits;
namespace storage::distributor {
struct AggregationTestingMockTickableStripe : MockTickableStripe {
PotentialDataLossReport report;
+ std::vector<dbtransition::Entry> entries;
PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
return report;
}
+ void merge_entries_into_db(document::BucketSpace, api::Timestamp, const lib::Distribution&,
+ const lib::ClusterState&, const char*, const std::unordered_set<uint16_t>&,
+ const std::vector<dbtransition::Entry>& entries_in) override {
+ entries = entries_in;
+ }
+
+ RawIdVector entries_as_raw_ids() const {
+ std::vector<uint64_t> result;
+ for (const auto& entry : entries) {
+ result.push_back(entry.bucket_id().withoutCountBits());
+ }
+ std::sort(result.begin(), result.end());
+ return result;
+ }
+
bool tick() override {
return false;
}
@@ -25,6 +46,7 @@ struct AggregationTestingMockTickableStripe : MockTickableStripe {
struct MultiThreadedStripeAccessGuardTest : Test {
DistributorStripePool _pool;
MultiThreadedStripeAccessor _accessor;
+ AggregationTestingMockTickableStripe _stripe0;
AggregationTestingMockTickableStripe _stripe1;
AggregationTestingMockTickableStripe _stripe2;
AggregationTestingMockTickableStripe _stripe3;
@@ -39,21 +61,77 @@ struct MultiThreadedStripeAccessGuardTest : Test {
}
void start_pool_with_stripes() {
- _pool.start({{&_stripe1, &_stripe2, &_stripe3}});
+ _pool.start({{&_stripe0, &_stripe1, &_stripe2, &_stripe3}});
}
+
+ void start_pool_with_one_stripe() {
+ _pool.start({&_stripe0});
+ }
+
+ void merge_entries_into_db(const RawIdVector& raw_ids) {
+ std::vector<dbtransition::Entry> entries;
+ for (auto raw_id : raw_ids) {
+ entries.emplace_back(document::BucketId(MUB, raw_id), BucketCopy());
+ }
+ std::sort(entries.begin(), entries.end());
+ auto guard = _accessor.rendezvous_and_hold_all();
+ guard->merge_entries_into_db(document::FixedBucketSpaces::default_space(), api::Timestamp(),
+ lib::Distribution(), lib::ClusterState(), "", {},
+ entries);
+ }
+
};
TEST_F(MultiThreadedStripeAccessGuardTest, remove_superfluous_buckets_aggregates_reports_across_stripes) {
- _stripe1.report = PotentialDataLossReport(20, 100);
- _stripe2.report = PotentialDataLossReport(5, 200);
- _stripe3.report = PotentialDataLossReport(7, 350);
+ _stripe0.report = PotentialDataLossReport(20, 100);
+ _stripe1.report = PotentialDataLossReport(5, 200);
+ _stripe2.report = PotentialDataLossReport(7, 350);
+ _stripe3.report = PotentialDataLossReport(3, 30);
start_pool_with_stripes();
auto guard = _accessor.rendezvous_and_hold_all();
auto report = guard->remove_superfluous_buckets(document::FixedBucketSpaces::default_space(),
lib::ClusterState(), false);
- EXPECT_EQ(report.buckets, 32);
- EXPECT_EQ(report.documents, 650);
+ EXPECT_EQ(report.buckets, 35);
+ EXPECT_EQ(report.documents, 680);
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_all_stripes) {
+ start_pool_with_stripes();
+ // Note: The bucket key is calculated by reversing the bits of the raw bucket id.
+ // We have 4 stripes and use 2 stripe bits. The 2 MSB of the bucket key is used to map to stripe.
+ // This gives the following mapping from raw bucket id to bucket key to stripe:
+ // raw id | key (8 MSB) | stripe
+ // 0x..0 | 00000000 | 0
+ // 0x..1 | 10000000 | 2
+ // 0x..2 | 01000000 | 1
+ // 0x..3 | 11000000 | 3
+ merge_entries_into_db({0x10,0x20,0x30,0x40,0x11,0x21,0x31,0x12,0x22,0x13});
+ EXPECT_EQ(RawIdVector({0x10,0x20,0x30,0x40}), _stripe0.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x11,0x21,0x31}), _stripe2.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids());
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_subset_of_stripes) {
+ start_pool_with_stripes();
+ merge_entries_into_db({0x12,0x22,0x13});
+ EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector(), _stripe2.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids());
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_one_stripe) {
+ start_pool_with_one_stripe();
+ merge_entries_into_db({0x10,0x11});
+ EXPECT_EQ(RawIdVector({0x10,0x11}), _stripe0.entries_as_raw_ids());
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_handles_empty_entries_vector) {
+ start_pool_with_one_stripe();
+ merge_entries_into_db({});
+ EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids());
}
}
diff --git a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h
index b01f06c2b3f..73b5c37ab8b 100644
--- a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h
+++ b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.h
@@ -27,7 +27,6 @@ public:
using Decision = typename ParentType::Decision;
using BucketId = document::BucketId;
- constexpr static uint8_t MaxStripeBits = 8;
private:
using StripedDBType = BTreeLockableMap<T>;
uint8_t _n_stripe_bits;
diff --git a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp
index e4f6efa30a3..310a9d5154b 100644
--- a/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp
+++ b/storage/src/vespa/storage/bucketdb/striped_btree_lockable_map.hpp
@@ -3,20 +3,13 @@
#include "striped_btree_lockable_map.h"
#include "btree_lockable_map.hpp"
+#include <vespa/storage/common/bucket_stripe_utils.h>
#include <algorithm>
#include <cassert>
#include <queue>
namespace storage::bucketdb {
-namespace {
-
-constexpr uint8_t used_bits_of(uint64_t key) noexcept {
- return static_cast<uint8_t>(key & 0b11'1111ULL);
-}
-
-}
-
template <typename T>
StripedBTreeLockableMap<T>::StripedBTreeLockableMap(uint8_t n_stripe_bits)
: _n_stripe_bits(n_stripe_bits),
@@ -37,9 +30,7 @@ StripedBTreeLockableMap<T>::~StripedBTreeLockableMap() = default;
template <typename T>
size_t StripedBTreeLockableMap<T>::stripe_of(key_type key) const noexcept {
- assert(used_bits_of(key) >= _n_stripe_bits);
- // Since bucket keys have count-bits at the LSB positions, we want to look at the MSBs instead.
- return (key >> (64 - _n_stripe_bits));
+ return stripe_of_bucket_key(key, _n_stripe_bits);
}
template <typename T>
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt
index 741d97f78ef..ae428929946 100644
--- a/storage/src/vespa/storage/common/CMakeLists.txt
+++ b/storage/src/vespa/storage/common/CMakeLists.txt
@@ -1,6 +1,7 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_common OBJECT
SOURCES
+ bucket_stripe_utils.cpp
bucketmessages.cpp
content_bucket_space.cpp
content_bucket_space_repo.cpp
diff --git a/storage/src/vespa/storage/common/bucket_stripe_utils.cpp b/storage/src/vespa/storage/common/bucket_stripe_utils.cpp
new file mode 100644
index 00000000000..f1454403153
--- /dev/null
+++ b/storage/src/vespa/storage/common/bucket_stripe_utils.cpp
@@ -0,0 +1,43 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "bucket_stripe_utils.h"
+#include <vespa/vespalib/util/alloc.h>
+#include <cassert>
+
+namespace storage {
+
+namespace {
+
+constexpr uint8_t used_bits_of(uint64_t key) noexcept {
+ return static_cast<uint8_t>(key & 0b11'1111ULL);
+}
+
+}
+
+size_t
+stripe_of_bucket_key(uint64_t key, uint8_t n_stripe_bits) noexcept
+{
+ if (n_stripe_bits == 0) {
+ return 0;
+ }
+ assert(used_bits_of(key) >= n_stripe_bits);
+ // Since bucket keys have count-bits at the LSB positions, we want to look at the MSBs instead.
+ return (key >> (64 - n_stripe_bits));
+}
+
+uint8_t
+calc_num_stripe_bits(size_t n_stripes) noexcept
+{
+ assert(n_stripes > 0);
+ if (n_stripes == 1) {
+ return 0;
+ }
+ assert(n_stripes <= MaxStripes);
+ assert(n_stripes == vespalib::roundUp2inN(n_stripes));
+
+ auto result = vespalib::Optimized::msbIdx(n_stripes);
+ assert(result <= MaxStripeBits);
+ return result;
+}
+
+}
diff --git a/storage/src/vespa/storage/common/bucket_stripe_utils.h b/storage/src/vespa/storage/common/bucket_stripe_utils.h
new file mode 100644
index 00000000000..b4052a893a2
--- /dev/null
+++ b/storage/src/vespa/storage/common/bucket_stripe_utils.h
@@ -0,0 +1,28 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/persistence/spi/bucket_limits.h>
+#include <cstdint>
+#include <cstdlib>
+
+namespace storage {
+
+constexpr static uint8_t MaxStripeBits = spi::BucketLimits::MinUsedBits;
+constexpr static size_t MaxStripes = (1ULL << MaxStripeBits);
+
+/**
+ * Returns the stripe in which the given bucket key belongs,
+ * when using the given number of stripe bits.
+ */
+size_t stripe_of_bucket_key(uint64_t key, uint8_t n_stripe_bits) noexcept;
+
+/**
+ * Returns the number of stripe bits used to represent the given number of stripes.
+ *
+ * This also asserts that the number of stripes is valid (power of 2 and within MaxStripes boundary).
+ */
+uint8_t calc_num_stripe_bits(size_t n_stripes) noexcept;
+
+}
+
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
index 715d95e70fb..b7183f4e157 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "distributor_stripe_pool.h"
#include "distributor_stripe_thread.h"
+#include <vespa/storage/common/bucket_stripe_utils.h>
#include <vespa/vespalib/util/size_literals.h>
#include <cassert>
@@ -8,6 +9,7 @@ namespace storage::distributor {
DistributorStripePool::DistributorStripePool()
: _thread_pool(512_Ki),
+ _n_stripe_bits(0),
_stripes(),
_threads(),
_mutex(),
@@ -48,6 +50,14 @@ void DistributorStripePool::unpark_all_threads() noexcept {
_parker_cond.wait(lock, [this]{ return (_parked_threads == 0); });
}
+const TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) const noexcept {
+ return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe();
+}
+
+TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) noexcept {
+ return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe();
+}
+
void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept {
std::unique_lock lock(_mutex);
assert(_parked_threads < _threads.size());
@@ -67,6 +77,8 @@ void DistributorStripePool::park_thread_until_released(DistributorStripeThread&
void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
assert(!stripes.empty());
assert(_stripes.empty() && _threads.empty());
+ // Note: This also asserts that the number of stripes is valid (power of 2 and within MaxStripes boundary).
+ _n_stripe_bits = calc_num_stripe_bits(stripes.size());
_stripes.reserve(stripes.size());
_threads.reserve(stripes.size());
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 5e72cb47fc4..75328479296 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -38,6 +38,7 @@ class DistributorStripePool {
using NativeThreadVector = std::vector<FastOS_ThreadInterface*>;
FastOS_ThreadPool _thread_pool;
+ uint8_t _n_stripe_bits;
StripeVector _stripes;
NativeThreadVector _threads;
std::mutex _mutex;
@@ -57,7 +58,8 @@ public:
// Set up the stripe pool with a 1-1 relationship between the provided
// stripes and running threads. Can only be called once per pool.
//
- // Precondition: stripes.size() > 0
+ // Precondition: stripes.size() > 0 &&
+ // when stripes.size() > 1: is a power of 2 and within storage::MaxStripes boundary
void start(const std::vector<TickableStripe*>& stripes);
void stop_and_join();
@@ -76,6 +78,8 @@ public:
[[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept {
return *_stripes[idx];
}
+ [[nodiscard]] const TickableStripe& stripe_of_key(uint64_t key) const noexcept;
+ [[nodiscard]] TickableStripe& stripe_of_key(uint64_t key) noexcept;
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
[[nodiscard]] bool is_stopped() const noexcept { return _stopped; }
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index a5adf732824..80af41e57ad 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -84,9 +84,26 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck
const std::unordered_set<uint16_t>& outdated_nodes,
const std::vector<dbtransition::Entry>& entries)
{
- // TODO STRIPE multiple stripes
- first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
- new_state, storage_up_states, outdated_nodes, entries);
+ if (entries.empty()) {
+ return;
+ }
+ std::vector<dbtransition::Entry> stripe_entries;
+ stripe_entries.reserve(entries.size() / _stripe_pool.stripe_count());
+ auto* curr_stripe = &_stripe_pool.stripe_of_key(entries[0].bucket_key);
+ stripe_entries.push_back(entries[0]);
+ for (size_t i = 1; i < entries.size(); ++i) {
+ const auto& entry = entries[i];
+ auto* next_stripe = &_stripe_pool.stripe_of_key(entry.bucket_key);
+ if (curr_stripe != next_stripe) {
+ curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, stripe_entries);
+ stripe_entries.clear();
+ }
+ curr_stripe = next_stripe;
+ stripe_entries.push_back(entry);
+ }
+ curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, stripe_entries);
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {