summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorGeir Storli <geirst@verizonmedia.com>2021-05-28 09:12:26 +0000
committerGeir Storli <geirst@verizonmedia.com>2021-05-28 11:52:46 +0000
commit6b43ecc479b38810c7d060a96c0765e5d872346a (patch)
tree70250f842559be3c370e67fbbd735f99ae9e6169 /storage
parent91c8ae2272d653027f4618f9d367a312aac6fb98 (diff)
Make merge_entries_into_db() work across multiple stripes by handling each stripe in sequence.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/distributortest.cpp4
-rw-r--r--storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp90
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp12
-rw-r--r--storage/src/vespa/storage/distributor/distributor_stripe_pool.h6
-rw-r--r--storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp23
5 files changed, 123 insertions, 12 deletions
diff --git a/storage/src/tests/distributor/distributortest.cpp b/storage/src/tests/distributor/distributortest.cpp
index 5a5ebb8c823..660d58a9a28 100644
--- a/storage/src/tests/distributor/distributortest.cpp
+++ b/storage/src/tests/distributor/distributortest.cpp
@@ -1283,11 +1283,11 @@ TEST_F(DistributorTest, host_info_sent_immediately_once_all_stripes_first_report
// TODO STRIPE make delay configurable instead of hardcoded
TEST_F(DistributorTest, non_bootstrap_host_info_send_request_delays_sending) {
- set_num_distributor_stripes(3);
+ set_num_distributor_stripes(4);
createLinks();
getClock().setAbsoluteTimeInSeconds(1000);
- for (uint16_t i = 0; i < 3; ++i) {
+ for (uint16_t i = 0; i < 4; ++i) {
getDistributor().notify_stripe_wants_to_send_host_info(i);
}
tickDistributorNTimes(1);
diff --git a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
index 8513186d1e1..ba28396886f 100644
--- a/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
+++ b/storage/src/tests/distributor/multi_thread_stripe_access_guard_test.cpp
@@ -1,22 +1,43 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "mock_tickable_stripe.h"
#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/persistence/spi/bucket_limits.h>
#include <vespa/storage/distributor/distributor_stripe_pool.h>
#include <vespa/storage/distributor/multi_threaded_stripe_access_guard.h>
+#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace ::testing;
+using RawIdVector = std::vector<uint64_t>;
+
+constexpr uint8_t MUB = storage::spi::BucketLimits::MinUsedBits;
namespace storage::distributor {
struct AggregationTestingMockTickableStripe : MockTickableStripe {
PotentialDataLossReport report;
+ std::vector<dbtransition::Entry> entries;
PotentialDataLossReport remove_superfluous_buckets(document::BucketSpace, const lib::ClusterState&, bool) override {
return report;
}
+ void merge_entries_into_db(document::BucketSpace, api::Timestamp, const lib::Distribution&,
+ const lib::ClusterState&, const char*, const std::unordered_set<uint16_t>&,
+ const std::vector<dbtransition::Entry>& entries_in) override {
+ entries = entries_in;
+ }
+
+ RawIdVector entries_as_raw_ids() const {
+ std::vector<uint64_t> result;
+ for (const auto& entry : entries) {
+ result.push_back(entry.bucket_id().withoutCountBits());
+ }
+ std::sort(result.begin(), result.end());
+ return result;
+ }
+
bool tick() override {
return false;
}
@@ -25,6 +46,7 @@ struct AggregationTestingMockTickableStripe : MockTickableStripe {
struct MultiThreadedStripeAccessGuardTest : Test {
DistributorStripePool _pool;
MultiThreadedStripeAccessor _accessor;
+ AggregationTestingMockTickableStripe _stripe0;
AggregationTestingMockTickableStripe _stripe1;
AggregationTestingMockTickableStripe _stripe2;
AggregationTestingMockTickableStripe _stripe3;
@@ -39,21 +61,77 @@ struct MultiThreadedStripeAccessGuardTest : Test {
}
void start_pool_with_stripes() {
- _pool.start({{&_stripe1, &_stripe2, &_stripe3}});
+ _pool.start({{&_stripe0, &_stripe1, &_stripe2, &_stripe3}});
}
+
+ void start_pool_with_one_stripe() {
+ _pool.start({&_stripe0});
+ }
+
+ void merge_entries_into_db(const RawIdVector& raw_ids) {
+ std::vector<dbtransition::Entry> entries;
+ for (auto raw_id : raw_ids) {
+ entries.emplace_back(document::BucketId(MUB, raw_id), BucketCopy());
+ }
+ std::sort(entries.begin(), entries.end());
+ auto guard = _accessor.rendezvous_and_hold_all();
+ guard->merge_entries_into_db(document::FixedBucketSpaces::default_space(), api::Timestamp(),
+ lib::Distribution(), lib::ClusterState(), "", {},
+ entries);
+ }
+
};
TEST_F(MultiThreadedStripeAccessGuardTest, remove_superfluous_buckets_aggregates_reports_across_stripes) {
- _stripe1.report = PotentialDataLossReport(20, 100);
- _stripe2.report = PotentialDataLossReport(5, 200);
- _stripe3.report = PotentialDataLossReport(7, 350);
+ _stripe0.report = PotentialDataLossReport(20, 100);
+ _stripe1.report = PotentialDataLossReport(5, 200);
+ _stripe2.report = PotentialDataLossReport(7, 350);
+ _stripe3.report = PotentialDataLossReport(3, 30);
start_pool_with_stripes();
auto guard = _accessor.rendezvous_and_hold_all();
auto report = guard->remove_superfluous_buckets(document::FixedBucketSpaces::default_space(),
lib::ClusterState(), false);
- EXPECT_EQ(report.buckets, 32);
- EXPECT_EQ(report.documents, 650);
+ EXPECT_EQ(report.buckets, 35);
+ EXPECT_EQ(report.documents, 680);
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_all_stripes) {
+ start_pool_with_stripes();
+ // Note: The bucket key is calculated by reversing the bits of the raw bucket id.
+ // We have 4 stripes and use 2 stripe bits. The 2 MSB of the bucket key is used to map to stripe.
+ // This gives the following mapping from raw bucket id to bucket key to stripe:
+ // raw id | key (8 MSB) | stripe
+ // 0x..0 | 00000000 | 0
+ // 0x..1 | 10000000 | 2
+ // 0x..2 | 01000000 | 1
+ // 0x..3 | 11000000 | 3
+ merge_entries_into_db({0x10,0x20,0x30,0x40,0x11,0x21,0x31,0x12,0x22,0x13});
+ EXPECT_EQ(RawIdVector({0x10,0x20,0x30,0x40}), _stripe0.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x11,0x21,0x31}), _stripe2.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids());
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_subset_of_stripes) {
+ start_pool_with_stripes();
+ merge_entries_into_db({0x12,0x22,0x13});
+ EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x12,0x22}), _stripe1.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector(), _stripe2.entries_as_raw_ids());
+ EXPECT_EQ(RawIdVector({0x13}), _stripe3.entries_as_raw_ids());
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_operates_across_one_stripe) {
+ start_pool_with_one_stripe();
+ merge_entries_into_db({0x10,0x11});
+ EXPECT_EQ(RawIdVector({0x10,0x11}), _stripe0.entries_as_raw_ids());
+}
+
+TEST_F(MultiThreadedStripeAccessGuardTest, merge_entries_into_db_handles_empty_entries_vector) {
+ start_pool_with_one_stripe();
+ merge_entries_into_db({});
+ EXPECT_EQ(RawIdVector(), _stripe0.entries_as_raw_ids());
}
}
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
index 715d95e70fb..b7183f4e157 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.cpp
@@ -1,6 +1,7 @@
// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "distributor_stripe_pool.h"
#include "distributor_stripe_thread.h"
+#include <vespa/storage/common/bucket_stripe_utils.h>
#include <vespa/vespalib/util/size_literals.h>
#include <cassert>
@@ -8,6 +9,7 @@ namespace storage::distributor {
DistributorStripePool::DistributorStripePool()
: _thread_pool(512_Ki),
+ _n_stripe_bits(0),
_stripes(),
_threads(),
_mutex(),
@@ -48,6 +50,14 @@ void DistributorStripePool::unpark_all_threads() noexcept {
_parker_cond.wait(lock, [this]{ return (_parked_threads == 0); });
}
+const TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) const noexcept {
+ return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe();
+}
+
+TickableStripe& DistributorStripePool::stripe_of_key(uint64_t key) noexcept {
+ return stripe_thread(stripe_of_bucket_key(key, _n_stripe_bits)).stripe();
+}
+
void DistributorStripePool::park_thread_until_released(DistributorStripeThread& thread) noexcept {
std::unique_lock lock(_mutex);
assert(_parked_threads < _threads.size());
@@ -67,6 +77,8 @@ void DistributorStripePool::park_thread_until_released(DistributorStripeThread&
void DistributorStripePool::start(const std::vector<TickableStripe*>& stripes) {
assert(!stripes.empty());
assert(_stripes.empty() && _threads.empty());
+ // Note: This also asserts that the number of stripes is valid (power of 2 and within MaxStripes boundary).
+ _n_stripe_bits = calc_num_stripe_bits(stripes.size());
_stripes.reserve(stripes.size());
_threads.reserve(stripes.size());
diff --git a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
index 5e72cb47fc4..75328479296 100644
--- a/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
+++ b/storage/src/vespa/storage/distributor/distributor_stripe_pool.h
@@ -38,6 +38,7 @@ class DistributorStripePool {
using NativeThreadVector = std::vector<FastOS_ThreadInterface*>;
FastOS_ThreadPool _thread_pool;
+ uint8_t _n_stripe_bits;
StripeVector _stripes;
NativeThreadVector _threads;
std::mutex _mutex;
@@ -57,7 +58,8 @@ public:
// Set up the stripe pool with a 1-1 relationship between the provided
// stripes and running threads. Can only be called once per pool.
//
- // Precondition: stripes.size() > 0
+ // Precondition: stripes.size() > 0 &&
+ // when stripes.size() > 1: is a power of 2 and within storage::MaxStripes boundary
void start(const std::vector<TickableStripe*>& stripes);
void stop_and_join();
@@ -76,6 +78,8 @@ public:
[[nodiscard]] DistributorStripeThread& stripe_thread(size_t idx) noexcept {
return *_stripes[idx];
}
+ [[nodiscard]] const TickableStripe& stripe_of_key(uint64_t key) const noexcept;
+ [[nodiscard]] TickableStripe& stripe_of_key(uint64_t key) noexcept;
[[nodiscard]] size_t stripe_count() const noexcept { return _stripes.size(); }
[[nodiscard]] bool is_stopped() const noexcept { return _stopped; }
diff --git a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
index a5adf732824..80af41e57ad 100644
--- a/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
+++ b/storage/src/vespa/storage/distributor/multi_threaded_stripe_access_guard.cpp
@@ -84,9 +84,26 @@ MultiThreadedStripeAccessGuard::merge_entries_into_db(document::BucketSpace buck
const std::unordered_set<uint16_t>& outdated_nodes,
const std::vector<dbtransition::Entry>& entries)
{
- // TODO STRIPE multiple stripes
- first_stripe().merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
- new_state, storage_up_states, outdated_nodes, entries);
+ if (entries.empty()) {
+ return;
+ }
+ std::vector<dbtransition::Entry> stripe_entries;
+ stripe_entries.reserve(entries.size() / _stripe_pool.stripe_count());
+ auto* curr_stripe = &_stripe_pool.stripe_of_key(entries[0].bucket_key);
+ stripe_entries.push_back(entries[0]);
+ for (size_t i = 1; i < entries.size(); ++i) {
+ const auto& entry = entries[i];
+ auto* next_stripe = &_stripe_pool.stripe_of_key(entry.bucket_key);
+ if (curr_stripe != next_stripe) {
+ curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, stripe_entries);
+ stripe_entries.clear();
+ }
+ curr_stripe = next_stripe;
+ stripe_entries.push_back(entry);
+ }
+ curr_stripe->merge_entries_into_db(bucket_space, gathered_at_timestamp, distribution,
+ new_state, storage_up_states, outdated_nodes, stripe_entries);
}
void MultiThreadedStripeAccessGuard::update_read_snapshot_before_db_pruning() {