diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-18 16:45:37 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-20 16:56:58 +0100 |
commit | c3650cea23c1297e0db87b1bef9005dedda518d2 (patch) | |
tree | fc358517e7fc0c18a04d36229ecb924a1b8613c0 /storage | |
parent | 2d64b4b12971a307e1fa3af47bdca2bed6371dea (diff) |
Test more BucketDBUpdater two-phase transition edge cases
Diffstat (limited to 'storage')
3 files changed, 98 insertions, 58 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index 0a82fadbf68..7188d060b41 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -27,6 +27,7 @@ using document::test::makeBucketSpace; using document::BucketSpace; using document::FixedBucketSpaces; using document::BucketId; +using document::Bucket; namespace storage::distributor { @@ -118,6 +119,9 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled); CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received); CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated); + CPPUNIT_TEST(read_only_db_is_populated_even_when_self_is_marked_down); + CPPUNIT_TEST(activate_cluster_state_request_with_mismatching_version_returns_actual_version); + CPPUNIT_TEST(activate_cluster_state_request_without_pending_transition_passes_message_through); CPPUNIT_TEST_SUITE_END(); public: @@ -185,6 +189,9 @@ protected: void non_owned_buckets_purged_when_read_only_support_is_config_disabled(); void deferred_activated_state_does_not_enable_state_until_activation_received(); void read_only_db_cleared_once_pending_state_is_activated(); + void read_only_db_is_populated_even_when_self_is_marked_down(); + void activate_cluster_state_request_with_mismatching_version_returns_actual_version(); + void activate_cluster_state_request_without_pending_transition_passes_message_through(); auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } @@ -413,17 +420,25 @@ public: sortSentMessagesByIndex(_sender, sizeBeforeState); } - void setClusterStateBundle(const lib::ClusterStateBundle& state) { + void set_cluster_state_bundle(const lib::ClusterStateBundle& state) { const size_t sizeBeforeState = _sender.commands.size(); getBucketDBUpdater().onSetSystemState( std::make_shared<api::SetSystemStateCommand>(state)); sortSentMessagesByIndex(_sender, sizeBeforeState); } - void activateClusterStateVersion(uint32_t version) { - getBucketDBUpdater().onActivateClusterStateVersion( + bool activate_cluster_state_version(uint32_t version) { + return getBucketDBUpdater().onActivateClusterStateVersion( std::make_shared<api::ActivateClusterStateVersionCommand>(version)); } + + void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) { + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies.back().get()); + CPPUNIT_ASSERT(response != nullptr); + CPPUNIT_ASSERT_EQUAL(version, response->actualVersion()); + _sender.clear(); + } void completeBucketInfoGathering(const lib::ClusterState& state, size_t expectedMsgs, @@ -2647,15 +2662,23 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u namespace { template <typename Func> -void for_each_bucket(const BucketDatabase& db, Func f) { +void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) { BucketId last(0); auto e = db.getNext(last); while (e.valid()) { - f(e); + f(space, e); e = db.getNext(e.getBucketId()); } } +template <typename Func> +void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { + for (const auto& space : repo) { + // TODO needs to propagate space + for_each_bucket(space.second->getBucketDatabase(), space.first, f); + } +} + } using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; @@ -2664,82 +2687,75 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); _sender.clear(); - auto& default_db = mutable_repo().get(makeBucketSpace()).getBucketDatabase(); - auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase(); - - CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), default_db.size()); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_global_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); lib::ClusterState pending_state("distributor:2 storage:4"); - // TODO iterate over buckets with spaces instead - std::unordered_set<BucketId, BucketId::hash> buckets_not_owned_in_pending_state; - for_each_bucket(default_db, [&](const auto& entry) { + std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state; + for_each_bucket(mutable_repo(), [&](const auto& space, const auto& entry) { if (!getBucketDBUpdater().getDistributorComponent() .ownsBucketInState(pending_state, makeDocumentBucket(entry.getBucketId()))) { - buckets_not_owned_in_pending_state.insert(entry.getBucketId()); + buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId())); } }); CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty()); - setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation - CPPUNIT_ASSERT_EQUAL(n_buckets - buckets_not_owned_in_pending_state.size(), default_db.size()); - CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size()); + const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces + const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space; + CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_global_db().size()); + CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_global_db().size()); - // TODO replace with gmock unordered set equality matcher - for_each_bucket(read_only_db, [&](const auto& entry) { - CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(entry.getBucketId()) + for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) { + CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) != buckets_not_owned_in_pending_state.end()); }); - - // TODO check global space too! } -// TODO dedupe setup stuff void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() { - getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); - lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity - - CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; - completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); - _sender.clear(); - // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will // cause some buckets to be entirely unavailable. - lib::ClusterState pending_state("distributor:1 storage:4 .0.s:d .1.s:m"); - setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0); CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); } -// TODO must ensure this works correctly with cluster controller! void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() { getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - setSystemState(initial_state); + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); _sender.clear(); + // Nothing in read-only DB after first bulk load of buckets. CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); lib::ClusterState pending_state("distributor:2 storage:4"); setSystemState(pending_state); - // No buckets should be moved into read only db + // No buckets should be moved into read only db after ownership changes. CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); } void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( @@ -2753,13 +2769,12 @@ void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state(initial_state_str); setSystemState(initial_state); - // TODO decouple expected message count CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size()); completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets); _sender.clear(); lib::ClusterState pending_state(pending_state_str); // Ownership change - setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size()); completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets); _sender.clear(); @@ -2776,7 +2791,7 @@ void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_a CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); - activateClusterStateVersion(2); + CPPUNIT_ASSERT(!activate_cluster_state_version(2)); CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size()); @@ -2787,26 +2802,50 @@ void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() constexpr uint32_t n_buckets = 10; trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, "version:2 distributor:2 storage:4", n_buckets, 0); - activateClusterStateVersion(2); + CPPUNIT_ASSERT(!activate_cluster_state_version(2)); CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size()); CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size()); } -/* - * TODO tests - * - [X] buckets moved to read only db on ownership change - * - even when self is down in pending state - * - [X] buckets NOT moved to read only db on content node down/maintenance - * - [X] read only db cleared when cluster state activated - * - explicit cluster state activation path - * - legacy implicit cluster state activation support - * - return activation ACK if already activated - * - buckets merged on explicit activation - * - version check for activation msg - * - deferred bundle with non-deferred config? - */ +void BucketDBUpdaterTest::read_only_db_is_populated_even_when_self_is_marked_down() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0); + + // State not yet activated, so read-only DBs have got all the buckets we used to have. + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_global_db().size()); +} + +void BucketDBUpdaterTest::activate_cluster_state_request_with_mismatching_version_returns_actual_version() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, + "version:5 distributor:2 storage:4", n_buckets, 0); + + CPPUNIT_ASSERT(activate_cluster_state_version(4)); // Too old version + assert_has_activate_cluster_state_reply_with_actual_version(5); + + CPPUNIT_ASSERT(activate_cluster_state_version(6)); // More recent version than what has been observed + assert_has_activate_cluster_state_reply_with_actual_version(5); +} + +void BucketDBUpdaterTest::activate_cluster_state_request_without_pending_transition_passes_message_through() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4); + // Activate version 2; no pending cluster state after this. + CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + + // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager. + // Note: state manager is not modelled in this test, so we just check that the message handler returns + // false (meaning "didn't take message ownership") and there's no auto-generated reply. + CPPUNIT_ASSERT(!activate_cluster_state_version(3)); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size()); +} -// TODO reconcile default deferred activation state of bundle between Java and C++! +// TODO rename distributor config to imply two phase functionlity explicitly? } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 202455098db..7516e10082c 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -259,7 +259,6 @@ BucketDBUpdater::onSetSystemState( bool BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) { - // TODO test edges! if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { const auto pending_version = _pendingClusterState->clusterStateVersion(); if (pending_version == cmd->version()) { @@ -281,7 +280,7 @@ BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::Activa } else { // Likely just a resend, but log warn for now to get a feel of how common it is. LOG(warning, "Received cluster state activation command for version %u, which " - "has no corresponding pending state. Resent operation?", cmd->version()); + "has no corresponding pending state. Likely resent operation.", cmd->version()); } // Fall through to next link in call chain that cares about this message. return false; diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index 4e9fdc99a8a..fb2967965c0 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -208,7 +208,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what()); return; } - LOG(info, "Got state bundle %s", state_bundle->toString().c_str()); // TODO + LOG(debug, "Got state bundle %s", state_bundle->toString().c_str()); // TODO add constructor taking in shared_ptr directly instead? auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle); @@ -228,6 +228,8 @@ void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) { auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version); cmd->setPriority(api::StorageMessage::VERYHIGH); + LOG(debug, "Got state activation request for version %u", activate_version); + detach_and_forward_to_enqueuer(std::move(cmd), req); } |