summaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/bucketdbupdatertest.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/src/tests/distributor/bucketdbupdatertest.cpp')
-rw-r--r--storage/src/tests/distributor/bucketdbupdatertest.cpp102
1 files changed, 86 insertions, 16 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp
index 1cfc1692edb..cdaa6e9aaa3 100644
--- a/storage/src/tests/distributor/bucketdbupdatertest.cpp
+++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp
@@ -514,7 +514,7 @@ public:
OutdatedNodesMap outdatedNodesMap;
state = PendingClusterState::createForClusterStateChange(
clock, clusterInfo, sender,
- owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(),
+ owner.getBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1));
}
@@ -526,8 +526,7 @@ public:
owner.createClusterInfo(oldClusterState));
state = PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, owner.getBucketSpaceRepo(),
- owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1));
+ clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1));
}
};
@@ -543,6 +542,8 @@ public:
{
return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState);
}
+
+ uint32_t populate_bucket_db_via_request_bucket_info_for_benchmarking();
};
BucketDBUpdaterTest::BucketDBUpdaterTest()
@@ -863,14 +864,14 @@ TEST_F(BucketDBUpdaterTest, testBitChange) {
const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]);
auto sreply = std::make_shared<RequestBucketInfoReply>(req);
sreply->setAddress(storageAddress(0));
- api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo();
+ auto& vec = sreply->getBucketInfo();
if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
int cnt=0;
for (int i=0; cnt < 2; i++) {
lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution();
std::vector<uint16_t> distributors;
if (distribution.getIdealDistributorNode(
- lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"),
+ lib::ClusterState("bits:14 storage:1 distributor:2"),
document::BucketId(16, i))
== 0)
{
@@ -1373,8 +1374,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForDistributionChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(),
- getReadOnlyBucketSpaceRepo(), api::Timestamp(1)));
+ clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1)));
sortSentMessagesByIndex(sender);
@@ -1508,7 +1508,7 @@ TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) {
OutdatedNodesMap outdatedNodesMap;
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, api::Timestamp(1)));
ASSERT_EQ(messageCount(3), sender.commands.size());
@@ -1617,7 +1617,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor
{
}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
document::BucketId bucketId(e.getBucketId());
ost << (uint32_t)bucketId.getRawId() << ":";
@@ -1661,7 +1661,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d"));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, beforeTime));
parseInputData(existingData, beforeTime, *state, includeBucketInfo);
@@ -1680,7 +1680,7 @@ BucketDBUpdaterTest::mergeBucketLists(
ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString()));
std::unique_ptr<PendingClusterState> state(
PendingClusterState::createForClusterStateChange(
- clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(),
+ clock, clusterInfo, sender, getBucketSpaceRepo(),
cmd, outdatedNodesMap, afterTime));
parseInputData(newData, afterTime, *state, includeBucketInfo);
@@ -1931,7 +1931,7 @@ struct FunctorProcessor : BucketDatabase::EntryProcessor {
template <typename F>
explicit FunctorProcessor(F&& f) : _f(std::forward<F>(f)) {}
- bool process(const BucketDatabase::Entry& e) override {
+ bool process(const BucketDatabase::ConstEntryRef& e) override {
_f(e);
return true;
}
@@ -2580,19 +2580,17 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_trans
TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
// Need to trigger an initial edge to complete first bucket scan
- ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("storage:1 distributor:2"),
+ ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"),
messageCount(1), 0));
_sender.clear();
- lib::ClusterState state("storage:1 distributor:1");
+ lib::ClusterState state("distributor:1 storage:1");
setSystemState(state);
constexpr uint32_t superbuckets = 1u << 16u;
constexpr uint32_t sub_buckets = 14;
constexpr uint32_t n_buckets = superbuckets * sub_buckets;
- vespalib::BenchmarkTimer timer(1.0);
-
ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size());
for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO);
@@ -2610,6 +2608,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
}
}
+ vespalib::BenchmarkTimer timer(1.0);
// Global space has no buckets but will serve as a trigger for merging
// buckets into the DB. This lets us measure the overhead of just this part.
if (req.getBucketSpace() == FixedBucketSpaces::global_space()) {
@@ -2626,6 +2625,77 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) {
EXPECT_EQ(size_t(0), mutable_global_db().size());
}
+uint32_t BucketDBUpdaterTest::populate_bucket_db_via_request_bucket_info_for_benchmarking() {
+ // Need to trigger an initial edge to complete first bucket scan
+ setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0);
+ _sender.clear();
+
+ lib::ClusterState state("distributor:1 storage:1");
+ setSystemState(state);
+
+ constexpr uint32_t superbuckets = 1u << 16u;
+ constexpr uint32_t sub_buckets = 14;
+ constexpr uint32_t n_buckets = superbuckets * sub_buckets;
+
+ assert(_bucketSpaces.size() == _sender.commands.size());
+ for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) {
+ assert(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO);
+ const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.commands[bsi]);
+
+ auto sreply = std::make_shared<RequestBucketInfoReply>(req);
+ sreply->setAddress(storageAddress(0));
+ auto& vec = sreply->getBucketInfo();
+ if (req.getBucketSpace() == FixedBucketSpaces::default_space()) {
+ for (uint32_t sb = 0; sb < superbuckets; ++sb) {
+ for (uint64_t i = 0; i < sub_buckets; ++i) {
+ document::BucketId bucket(48, (i << 32ULL) | sb);
+ vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1)));
+ }
+ }
+ }
+ getBucketDBUpdater().onRequestBucketInfoReply(sreply);
+ }
+
+ assert(mutable_default_db().size() == n_buckets);
+ assert(mutable_global_db().size() == 0);
+ return n_buckets;
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_removing_buckets_for_unavailable_storage_nodes) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via ownership
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_no_buckets_removed_during_node_remover_db_pass) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ // TODO this benchmark is void if we further restrict the pruning elision logic to allow
+ // elision when storage nodes come online.
+ lib::ClusterState no_op_state("distributor:1 storage:2"); // Not removing any buckets
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan %u buckets with no-op action\n", timer.min_time(), n_buckets);
+}
+
+TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_node_remover_db_pass) {
+ const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking();
+
+ lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via all replicas gone
+ vespalib::BenchmarkTimer timer(1.0);
+ timer.before();
+ setSystemState(no_op_state);
+ timer.after();
+ fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets);
+}
+
TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) {
auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d");
auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m");