diff options
Diffstat (limited to 'storage/src/tests/distributor/bucketdbupdatertest.cpp')
-rw-r--r-- | storage/src/tests/distributor/bucketdbupdatertest.cpp | 102 |
1 files changed, 86 insertions, 16 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 1cfc1692edb..cdaa6e9aaa3 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -514,7 +514,7 @@ public: OutdatedNodesMap outdatedNodesMap; state = PendingClusterState::createForClusterStateChange( clock, clusterInfo, sender, - owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(), + owner.getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1)); } @@ -526,8 +526,7 @@ public: owner.createClusterInfo(oldClusterState)); state = PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), - owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1)); + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); } }; @@ -543,6 +542,8 @@ public: { return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); } + + uint32_t populate_bucket_db_via_request_bucket_info_for_benchmarking(); }; BucketDBUpdaterTest::BucketDBUpdaterTest() @@ -863,14 +864,14 @@ TEST_F(BucketDBUpdaterTest, testBitChange) { const auto &req = dynamic_cast<const RequestBucketInfoCommand &>(*_sender.commands[bsi]); auto sreply = std::make_shared<RequestBucketInfoReply>(req); sreply->setAddress(storageAddress(0)); - api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); + auto& vec = sreply->getBucketInfo(); if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { int cnt=0; for (int i=0; cnt < 2; i++) { lib::Distribution distribution = defaultDistributorBucketSpace().getDistribution(); std::vector<uint16_t> distributors; if (distribution.getIdealDistributorNode( - lib::ClusterState("redundancy:1 bits:14 storage:1 distributor:2"), + lib::ClusterState("bits:14 storage:1 distributor:2"), document::BucketId(16, i)) == 0) { @@ -1373,8 +1374,7 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged( ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), - getReadOnlyBucketSpaceRepo(), api::Timestamp(1))); + clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1))); sortSentMessagesByIndex(sender); @@ -1508,7 +1508,7 @@ TEST_F(BucketDBUpdaterTest, testPendingClusterStateReceive) { OutdatedNodesMap outdatedNodesMap; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, api::Timestamp(1))); ASSERT_EQ(messageCount(3), sender.commands.size()); @@ -1617,7 +1617,7 @@ struct BucketDumper : public BucketDatabase::EntryProcessor { } - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { document::BucketId bucketId(e.getBucketId()); ost << (uint32_t)bucketId.getRawId() << ":"; @@ -1661,7 +1661,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); @@ -1680,7 +1680,7 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); @@ -1931,7 +1931,7 @@ struct FunctorProcessor : BucketDatabase::EntryProcessor { template <typename F> explicit FunctorProcessor(F&& f) : _f(std::forward<F>(f)) {} - bool process(const BucketDatabase::Entry& e) override { + bool process(const BucketDatabase::ConstEntryRef& e) override { _f(e); return true; } @@ -2580,19 +2580,17 @@ TEST_F(BucketDBUpdaterTest, activate_cluster_state_request_without_pending_trans TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { // Need to trigger an initial edge to complete first bucket scan - ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("storage:1 distributor:2"), + ASSERT_NO_FATAL_FAILURE(setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0)); _sender.clear(); - lib::ClusterState state("storage:1 distributor:1"); + lib::ClusterState state("distributor:1 storage:1"); setSystemState(state); constexpr uint32_t superbuckets = 1u << 16u; constexpr uint32_t sub_buckets = 14; constexpr uint32_t n_buckets = superbuckets * sub_buckets; - vespalib::BenchmarkTimer timer(1.0); - ASSERT_EQ(_bucketSpaces.size(), _sender.commands.size()); for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { ASSERT_EQ(_sender.commands[bsi]->getType(), MessageType::REQUESTBUCKETINFO); @@ -2610,6 +2608,7 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { } } + vespalib::BenchmarkTimer timer(1.0); // Global space has no buckets but will serve as a trigger for merging // buckets into the DB. This lets us measure the overhead of just this part. if (req.getBucketSpace() == FixedBucketSpaces::global_space()) { @@ -2626,6 +2625,77 @@ TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_bulk_loading_into_empty_db) { EXPECT_EQ(size_t(0), mutable_global_db().size()); } +uint32_t BucketDBUpdaterTest::populate_bucket_db_via_request_bucket_info_for_benchmarking() { + // Need to trigger an initial edge to complete first bucket scan + setAndEnableClusterState(lib::ClusterState("distributor:2 storage:1"), messageCount(1), 0); + _sender.clear(); + + lib::ClusterState state("distributor:1 storage:1"); + setSystemState(state); + + constexpr uint32_t superbuckets = 1u << 16u; + constexpr uint32_t sub_buckets = 14; + constexpr uint32_t n_buckets = superbuckets * sub_buckets; + + assert(_bucketSpaces.size() == _sender.commands.size()); + for (uint32_t bsi = 0; bsi < _bucketSpaces.size(); ++bsi) { + assert(_sender.commands[bsi]->getType() == MessageType::REQUESTBUCKETINFO); + const auto& req = dynamic_cast<const RequestBucketInfoCommand&>(*_sender.commands[bsi]); + + auto sreply = std::make_shared<RequestBucketInfoReply>(req); + sreply->setAddress(storageAddress(0)); + auto& vec = sreply->getBucketInfo(); + if (req.getBucketSpace() == FixedBucketSpaces::default_space()) { + for (uint32_t sb = 0; sb < superbuckets; ++sb) { + for (uint64_t i = 0; i < sub_buckets; ++i) { + document::BucketId bucket(48, (i << 32ULL) | sb); + vec.push_back(api::RequestBucketInfoReply::Entry(bucket, api::BucketInfo(10, 1, 1))); + } + } + } + getBucketDBUpdater().onRequestBucketInfoReply(sreply); + } + + assert(mutable_default_db().size() == n_buckets); + assert(mutable_global_db().size() == 0); + return n_buckets; +} + +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_removing_buckets_for_unavailable_storage_nodes) { + const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); + + lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via ownership + vespalib::BenchmarkTimer timer(1.0); + timer.before(); + setSystemState(no_op_state); + timer.after(); + fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); +} + +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_no_buckets_removed_during_node_remover_db_pass) { + const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); + + // TODO this benchmark is void if we further restrict the pruning elision logic to allow + // elision when storage nodes come online. + lib::ClusterState no_op_state("distributor:1 storage:2"); // Not removing any buckets + vespalib::BenchmarkTimer timer(1.0); + timer.before(); + setSystemState(no_op_state); + timer.after(); + fprintf(stderr, "Took %g seconds to scan %u buckets with no-op action\n", timer.min_time(), n_buckets); +} + +TEST_F(BucketDBUpdaterTest, DISABLED_benchmark_all_buckets_removed_during_node_remover_db_pass) { + const uint32_t n_buckets = populate_bucket_db_via_request_bucket_info_for_benchmarking(); + + lib::ClusterState no_op_state("distributor:1 storage:1 .0.s:m"); // Removing all buckets via all replicas gone + vespalib::BenchmarkTimer timer(1.0); + timer.before(); + setSystemState(no_op_state); + timer.after(); + fprintf(stderr, "Took %g seconds to scan and remove %u buckets\n", timer.min_time(), n_buckets); +} + TEST_F(BucketDBUpdaterTest, pending_cluster_state_getter_is_non_null_only_when_state_is_pending) { auto initial_baseline = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:d"); auto initial_default = std::make_shared<lib::ClusterState>("distributor:1 storage:2 .0.s:m"); |