diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-08 15:53:17 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2021-09-08 15:53:17 +0000 |
commit | c99b6ca8bdb1e99d123e47f25ed25da28ae1e2ce (patch) | |
tree | bf1aadea261e9302d982fdbd5443c5a0ea7f147d /storage/src | |
parent | 490a718a6036b05dca9935ea177c1af514ed10dc (diff) |
Port additional DB updater tests and fix delayed sending regression
Addresses a missing piece of functionality in the new code path where
queued bucket rechecks during a pending cluster state time window would
not be sent as expected when the pending state has been completed and
activated.
Diffstat (limited to 'storage/src')
5 files changed, 173 insertions, 0 deletions
diff --git a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp index cb61c1bb009..6cca6df9f80 100644 --- a/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/legacy_bucket_db_updater_test.cpp @@ -1053,6 +1053,7 @@ TEST_F(LegacyBucketDBUpdaterTest, recheck_node) { EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo()); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change) { enableDistributorClusterState("distributor:1 storage:1"); @@ -1116,6 +1117,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change) { dumpBucket(document::BucketId(16, 2))); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change_from_node_down) { enableDistributorClusterState("distributor:1 storage:2"); @@ -1162,6 +1164,7 @@ TEST_F(LegacyBucketDBUpdaterTest, notify_bucket_change_from_node_down) { dumpBucket(document::BucketId(16, 1))); } +// TODO STRIPE migrated to TopLevelBucketDBUpdaterTest /** * Test that NotifyBucketChange received while there's a pending cluster state * waits until the cluster state has been enabled as current before it sends off diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index de0b6b22358..70e5afaed43 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -284,6 +284,12 @@ public: ASSERT_NO_FATAL_FAILURE(assert_correct_buckets(num_buckets, state)); } + void set_and_enable_cluster_state(const lib::ClusterState& state, uint32_t expected_msgs, uint32_t n_buckets) { + _sender.clear(); + set_cluster_state(state); + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(state, expected_msgs, n_buckets)); + } + }; TopLevelBucketDBUpdaterTest::TopLevelBucketDBUpdaterTest() @@ -786,4 +792,159 @@ TEST_F(TopLevelBucketDBUpdaterTest, recheck_node) { EXPECT_EQ(api::BucketInfo(20,10,12, 50, 60, true, true), copy->getBucketInfo()); } +TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change) { + enable_distributor_cluster_state("distributor:1 storage:1"); + + add_nodes_to_stripe_bucket_db(document::BucketId(16, 1), "0=1234"); + _sender.replies().clear(); + + { + api::BucketInfo info(1, 2, 3, 4, 5, true, true); + auto cmd = std::make_shared<api::NotifyBucketChangeCommand>( + makeDocumentBucket(document::BucketId(16, 1)), info); + cmd->setSourceIndex(0); + stripe_of_bucket(document::BucketId(16, 1)).bucket_db_updater().onNotifyBucketChange(cmd); + } + + { + api::BucketInfo info(10, 11, 12, 13, 14, false, false); + auto cmd = std::make_shared<api::NotifyBucketChangeCommand>( + makeDocumentBucket(document::BucketId(16, 2)), info); + cmd->setSourceIndex(0); + stripe_of_bucket(document::BucketId(16, 2)).bucket_db_updater().onNotifyBucketChange(cmd); + } + + // Must receive reply + ASSERT_EQ(size_t(2), _sender.replies().size()); + + for (int i = 0; i < 2; ++i) { + ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.reply(i)->getType()); + } + + // No database update until request bucket info replies have been received. + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x4d2,docs=1234/1234,bytes=1234/1234," + "trusted=false,active=false,ready=false)"), + dump_bucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("NONEXISTING"), dump_bucket(document::BucketId(16, 2))); + + ASSERT_EQ(size_t(2), _sender.commands().size()); + + std::vector<api::BucketInfo> infos; + infos.push_back(api::BucketInfo(4567, 200, 2000, 400, 4000, true, true)); + infos.push_back(api::BucketInfo(8999, 300, 3000, 500, 5000, false, false)); + + for (int i = 0; i < 2; ++i) { + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(i)); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + document::BucketId bucket_id(16, i + 1); + EXPECT_EQ(bucket_id, rbi.getBuckets()[0]); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + reply->getBucketInfo().push_back(api::RequestBucketInfoReply::Entry(bucket_id, infos[i])); + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply); + } + + EXPECT_EQ(std::string("BucketId(0x4000000000000001) : " + "node(idx=0,crc=0x11d7,docs=200/400,bytes=2000/4000,trusted=true,active=true,ready=true)"), + dump_bucket(document::BucketId(16, 1))); + EXPECT_EQ(std::string("BucketId(0x4000000000000002) : " + "node(idx=0,crc=0x2327,docs=300/500,bytes=3000/5000,trusted=true,active=false,ready=false)"), + dump_bucket(document::BucketId(16, 2))); +} + +TEST_F(TopLevelBucketDBUpdaterTest, notify_bucket_change_from_node_down) { + enable_distributor_cluster_state("distributor:1 storage:2"); + + document::BucketId bucket_id(16, 1); + add_nodes_to_stripe_bucket_db(bucket_id, "1=1234"); + + _sender.replies().clear(); + + { + api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); + auto cmd = std::make_shared<api::NotifyBucketChangeCommand>(makeDocumentBucket(bucket_id), info); + cmd->setSourceIndex(0); + stripe_of_bucket(bucket_id).bucket_db_updater().onNotifyBucketChange(cmd); + } + // Enable here to avoid having request bucket info be silently swallowed + // (send_request_bucket_info drops message if node is down). + enable_distributor_cluster_state("distributor:1 storage:2 .0.s:d"); + + ASSERT_EQ("BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); + + ASSERT_EQ(size_t(1), _sender.replies().size()); + ASSERT_EQ(MessageType::NOTIFYBUCKETCHANGE_REPLY, _sender.reply(0)->getType()); + + // Currently, this pending operation will be auto-flushed when the cluster state + // changes so the behavior is still correct. Keep this test around to prevent + // regressions here. + ASSERT_EQ(size_t(1), _sender.commands().size()); + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(bucket_id, rbi.getBuckets()[0]); + + auto reply = std::make_shared<api::RequestBucketInfoReply>(rbi); + reply->getBucketInfo().push_back( + api::RequestBucketInfoReply::Entry( + bucket_id, + api::BucketInfo(8999, 300, 3000, 500, 5000, false, false))); + stripe_of_bucket(bucket_id).bucket_db_updater().onRequestBucketInfoReply(reply); + + // No change + EXPECT_EQ("BucketId(0x4000000000000001) : " + "node(idx=1,crc=0x4d2,docs=1234/1234,bytes=1234/1234,trusted=false,active=false,ready=false)", + dump_bucket(bucket_id)); +} + +/** + * Test that NotifyBucketChange received while there's a pending cluster state + * waits until the cluster state has been enabled as current before it sends off + * the single bucket info requests. This is to prevent a race condition where + * the replies to bucket info requests for buckets that would be owned by the + * distributor in the pending state but not by the current state would be + * discarded when attempted inserted into the bucket database. + */ +TEST_F(TopLevelBucketDBUpdaterTest, notify_change_with_pending_state_queues_bucket_info_requests) { + set_cluster_state("distributor:1 storage:1"); + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + document::BucketId bucket_id(16, 1); + { + api::BucketInfo info(8999, 300, 3000, 500, 5000, false, false); + auto cmd(std::make_shared<api::NotifyBucketChangeCommand>( + makeDocumentBucket(bucket_id), info)); + cmd->setSourceIndex(0); + stripe_of_bucket(bucket_id).bucket_db_updater().onNotifyBucketChange(cmd); + } + + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + + ASSERT_NO_FATAL_FAILURE(complete_bucket_info_gathering(lib::ClusterState("distributor:1 storage:1"), + _bucket_spaces.size(), 10)); + + ASSERT_EQ(_bucket_spaces.size() + 1, _sender.commands().size()); + + { + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(_bucket_spaces.size())); + ASSERT_EQ(size_t(1), rbi.getBuckets().size()); + EXPECT_EQ(bucket_id, rbi.getBuckets()[0]); + } + _sender.clear(); + + // Queue must be cleared once pending state is enabled. + { + lib::ClusterState state("distributor:1 storage:2"); + uint32_t expected_msgs = _bucket_spaces.size(), dummy_buckets_to_return = 1; + ASSERT_NO_FATAL_FAILURE(set_and_enable_cluster_state(state, expected_msgs, dummy_buckets_to_return)); + } + ASSERT_EQ(_bucket_spaces.size(), _sender.commands().size()); + { + auto& rbi = dynamic_cast<RequestBucketInfoCommand&>(*_sender.command(0)); + EXPECT_EQ(size_t(0), rbi.getBuckets().size()); + } +} + } diff --git a/storage/src/vespa/storage/distributor/distributor_stripe.cpp b/storage/src/vespa/storage/distributor/distributor_stripe.cpp index 8cb260455ed..1a9cb9f303c 100644 --- a/storage/src/vespa/storage/distributor/distributor_stripe.cpp +++ b/storage/src/vespa/storage/distributor/distributor_stripe.cpp @@ -985,6 +985,7 @@ void DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& new_state, bool has_bucket_ownership_change) { + assert(!_use_legacy_mode); // TODO STRIPE replace legacy func enableClusterStateBundle(new_state); if (has_bucket_ownership_change) { @@ -995,6 +996,7 @@ DistributorStripe::enable_cluster_state_bundle(const lib::ClusterStateBundle& ne const auto now = TimePoint(std::chrono::milliseconds(_component.getClock().getTimeInMillis().getTime())); _externalOperationHandler.rejectFeedBeforeTimeReached(_ownershipSafeTimeCalc->safeTimePoint(now)); } + _bucketDBUpdater.handle_activated_cluster_state_bundle(); // Triggers resending of queued requests } void diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp index 06a9672ba50..c48434484d2 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.cpp @@ -170,6 +170,12 @@ StripeBucketDBUpdater::recheckBucketInfo(uint32_t nodeIdx, sendRequestBucketInfo(nodeIdx, bucket, std::shared_ptr<MergeReplyGuard>()); } +void +StripeBucketDBUpdater::handle_activated_cluster_state_bundle() +{ + sendAllQueuedBucketRechecks(); +} + namespace { class ReadOnlyDbMergingInserter : public BucketDatabase::MergingProcessor { diff --git a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h index 1456308c3d0..9bc91ca78e7 100644 --- a/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h +++ b/storage/src/vespa/storage/distributor/stripe_bucket_db_updater.h @@ -46,6 +46,7 @@ public: void flush(); const lib::ClusterState* pendingClusterStateOrNull(const document::BucketSpace&) const; void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); + void handle_activated_cluster_state_bundle(); bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override; |