diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-26 14:39:50 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-03-26 14:39:50 +0100 |
commit | b9ab7c6ae6e1931326c0f8b4e5cee09a2b1152ac (patch) | |
tree | a12edd9028888c048606df280e2a48ae1bd2f13f /storage | |
parent | 313411c31cc8b43f353565d714bc76bc1ca43e5c (diff) | |
parent | a599019904e9f3673b8d834efd28350604d8b7fd (diff) |
Merge pull request #8882 from vespa-engine/vekterli/add-read-only-support-during-cluster-state-transitions
Add read-only support during cluster state transitions
Diffstat (limited to 'storage')
32 files changed, 962 insertions, 151 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index b2d554c1e42..9795f5db5dc 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -26,6 +26,8 @@ using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using document::BucketSpace; using document::FixedBucketSpaces; +using document::BucketId; +using document::Bucket; namespace storage::distributor { @@ -112,6 +114,14 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(adding_diverging_replica_to_existing_trusted_does_not_remove_trusted); CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection); + CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change); + CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database); + CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled); + CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received); + CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated); + CPPUNIT_TEST(read_only_db_is_populated_even_when_self_is_marked_down); + CPPUNIT_TEST(activate_cluster_state_request_with_mismatching_version_returns_actual_version); + CPPUNIT_TEST(activate_cluster_state_request_without_pending_transition_passes_message_through); CPPUNIT_TEST_SUITE_END(); public: @@ -123,10 +133,7 @@ protected: void testDistributorChangeWithGrouping(); void testNormalUsageInitializing(); void testFailedRequestBucketInfo(); - void testNoResponses(); void testBitChange(); - void testInconsistentChecksum(); - void testAddEmptyNode(); void testNodeDown(); void testStorageNodeInMaintenanceClearsBucketsForNode(); void testNodeDownCopiesGetInSync(); @@ -177,6 +184,14 @@ protected: void adding_diverging_replica_to_existing_trusted_does_not_remove_trusted(); void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted(); void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection(); + void non_owned_buckets_moved_to_read_only_db_on_ownership_change(); + void buckets_no_longer_available_are_not_moved_to_read_only_database(); + void non_owned_buckets_purged_when_read_only_support_is_config_disabled(); + void deferred_activated_state_does_not_enable_state_until_activation_received(); + void read_only_db_cleared_once_pending_state_is_activated(); + void read_only_db_is_populated_even_when_self_is_marked_down(); + void activate_cluster_state_request_with_mismatching_version_returns_actual_version(); + void activate_cluster_state_request_without_pending_transition_passes_message_through(); auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } @@ -190,13 +205,32 @@ protected: getBucketDBUpdater().getDistributorComponent().getIndex(), clusterStateBundle, "ui")); - auto &repo = getBucketSpaceRepo(); - for (auto &elem : repo) { - elem.second->setClusterState(clusterStateBundle.getDerivedClusterState(elem.first)); + for (auto* repo : {&mutable_repo(), &read_only_repo()}) { + for (auto& space : *repo) { + space.second->setClusterState(clusterStateBundle.getDerivedClusterState(space.first)); + } } return clusterInfo; } + DistributorBucketSpaceRepo& mutable_repo() noexcept { return getBucketSpaceRepo(); } + // Note: not calling this "immutable_repo" since it may actually be modified by the pending + // cluster state component (just not by operations), so it would not have the expected semantics. + DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); } + + BucketDatabase& mutable_default_db() noexcept { + return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + BucketDatabase& mutable_global_db() noexcept { + return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + BucketDatabase& read_only_default_db() noexcept { + return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + BucketDatabase& read_only_global_db() noexcept { + return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + static std::string getNodeList(std::vector<uint16_t> nodes, size_t count); std::string getNodeList(std::vector<uint16_t> nodes); @@ -210,11 +244,17 @@ protected: return messagesPerBucketSpace * _bucketSpaces.size(); } + void trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs, + vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs); + public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { createLinks(); _bucketSpaces = getBucketSpaces(); + // Disable deferred activation by default (at least for now) to avoid breaking the entire world. + getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); }; void tearDown() override { @@ -228,7 +268,7 @@ public: uint32_t bucketCount, uint32_t invalidBucketCount = 0) { - RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd); + auto sreply = std::make_shared<RequestBucketInfoReply>(cmd); sreply->setAddress(storageAddress(storageIndex)); api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); @@ -261,7 +301,7 @@ public: } } - return std::shared_ptr<api::RequestBucketInfoReply>(sreply); + return sreply; } void fakeBucketReply(const lib::ClusterState &state, @@ -371,8 +411,7 @@ public: void setSystemState(const lib::ClusterState& state) { const size_t sizeBeforeState = _sender.commands.size(); getBucketDBUpdater().onSetSystemState( - std::shared_ptr<api::SetSystemStateCommand>( - new api::SetSystemStateCommand(state))); + std::make_shared<api::SetSystemStateCommand>(state)); // A lot of test logic has the assumption that all messages sent as a // result of cluster state changes will be in increasing index order // (for simplicity, not because this is required for correctness). @@ -381,6 +420,26 @@ public: sortSentMessagesByIndex(_sender, sizeBeforeState); } + void set_cluster_state_bundle(const lib::ClusterStateBundle& state) { + const size_t sizeBeforeState = _sender.commands.size(); + getBucketDBUpdater().onSetSystemState( + std::make_shared<api::SetSystemStateCommand>(state)); + sortSentMessagesByIndex(_sender, sizeBeforeState); + } + + bool activate_cluster_state_version(uint32_t version) { + return getBucketDBUpdater().onActivateClusterStateVersion( + std::make_shared<api::ActivateClusterStateVersionCommand>(version)); + } + + void assert_has_activate_cluster_state_reply_with_actual_version(uint32_t version) { + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + auto* response = dynamic_cast<api::ActivateClusterStateVersionReply*>(_sender.replies.back().get()); + CPPUNIT_ASSERT(response != nullptr); + CPPUNIT_ASSERT_EQUAL(version, response->actualVersion()); + _sender.clear(); + } + void completeBucketInfoGathering(const lib::ClusterState& state, size_t expectedMsgs, uint32_t bucketCount = 1, @@ -586,8 +645,9 @@ public: OutdatedNodesMap outdatedNodesMap; state = PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), cmd, outdatedNodesMap, - api::Timestamp(1)); + clock, clusterInfo, sender, + owner.getBucketSpaceRepo(), owner.getReadOnlyBucketSpaceRepo(), + cmd, outdatedNodesMap, api::Timestamp(1)); } PendingClusterStateFixture( @@ -598,23 +658,22 @@ public: owner.createClusterInfo(oldClusterState)); state = PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, owner.getBucketSpaceRepo(), api::Timestamp(1)); + clock, clusterInfo, sender, owner.getBucketSpaceRepo(), + owner.getReadOnlyBucketSpaceRepo(), api::Timestamp(1)); } }; - auto createPendingStateFixtureForStateChange( + std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForStateChange( const std::string& oldClusterState, const std::string& newClusterState) { - return std::make_unique<PendingClusterStateFixture>( - *this, oldClusterState, newClusterState); + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState); } - auto createPendingStateFixtureForDistributionChange( + std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForDistributionChange( const std::string& oldClusterState) { - return std::make_unique<PendingClusterStateFixture>( - *this, oldClusterState); + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); } }; @@ -622,8 +681,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest); BucketDBUpdaterTest::BucketDBUpdaterTest() : CppUnit::TestFixture(), - DistributorTestUtil(), - _bucketSpaces() + DistributorTestUtil(), + _bucketSpaces() { } @@ -1533,7 +1592,8 @@ BucketDBUpdaterTest::getSentNodesDistributionChanged( ClusterInformation::CSP clusterInfo(createClusterInfo(oldClusterState)); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForDistributionChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), api::Timestamp(1))); + clock, clusterInfo, sender, getBucketSpaceRepo(), + getReadOnlyBucketSpaceRepo(), api::Timestamp(1))); sortSentMessagesByIndex(sender); @@ -1698,8 +1758,8 @@ BucketDBUpdaterTest::testPendingClusterStateReceive() OutdatedNodesMap outdatedNodesMap; std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, - api::Timestamp(1))); + clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + cmd, outdatedNodesMap, api::Timestamp(1))); CPPUNIT_ASSERT_EQUAL(messageCount(3), sender.commands.size()); @@ -1863,8 +1923,8 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo("cluster:d")); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, - beforeTime)); + clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + cmd, outdatedNodesMap, beforeTime)); parseInputData(existingData, beforeTime, *state, includeBucketInfo); state->mergeIntoBucketDatabases(); @@ -1882,8 +1942,8 @@ BucketDBUpdaterTest::mergeBucketLists( ClusterInformation::CSP clusterInfo(createClusterInfo(oldState.toString())); std::unique_ptr<PendingClusterState> state( PendingClusterState::createForClusterStateChange( - clock, clusterInfo, sender, getBucketSpaceRepo(), cmd, outdatedNodesMap, - afterTime)); + clock, clusterInfo, sender, getBucketSpaceRepo(), getReadOnlyBucketSpaceRepo(), + cmd, outdatedNodesMap, afterTime)); parseInputData(newData, afterTime, *state, includeBucketInfo); state->mergeIntoBucketDatabases(); @@ -2599,4 +2659,192 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u CPPUNIT_ASSERT_EQUAL(current_hash, new_current_req.getDistributionHash()); } +namespace { + +template <typename Func> +void for_each_bucket(const BucketDatabase& db, const document::BucketSpace& space, Func&& f) { + BucketId last(0); + auto e = db.getNext(last); + while (e.valid()) { + f(space, e); + e = db.getNext(e.getBucketId()); + } +} + +template <typename Func> +void for_each_bucket(const DistributorBucketSpaceRepo& repo, Func&& f) { + for (const auto& space : repo) { + for_each_bucket(space.second->getBucketDatabase(), space.first, f); + } +} + +} + +using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; + +void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_change() { + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + constexpr uint32_t n_buckets = 10; + completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + _sender.clear(); + + CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), mutable_global_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); + + lib::ClusterState pending_state("distributor:2 storage:4"); + + std::unordered_set<Bucket, Bucket::hash> buckets_not_owned_in_pending_state; + for_each_bucket(mutable_repo(), [&](const auto& space, const auto& entry) { + if (!getBucketDBUpdater().getDistributorComponent() + .ownsBucketInState(pending_state, makeDocumentBucket(entry.getBucketId()))) { + buckets_not_owned_in_pending_state.insert(Bucket(space, entry.getBucketId())); + } + }); + CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty()); + + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation + + const auto buckets_not_owned_per_space = (buckets_not_owned_in_pending_state.size() / 2); // 2 spaces + const auto expected_mutable_buckets = n_buckets - buckets_not_owned_per_space; + CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(expected_mutable_buckets, mutable_global_db().size()); + CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(buckets_not_owned_per_space, read_only_global_db().size()); + + for_each_bucket(read_only_repo(), [&](const auto& space, const auto& entry) { + CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(Bucket(space, entry.getBucketId())) + != buckets_not_owned_in_pending_state.end()); + }); +} + +void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() { + constexpr uint32_t n_buckets = 10; + // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will + // cause some buckets to be entirely unavailable. + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 storage:4 .0.s:d .1.s:m", n_buckets, 0); + + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); +} + +void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() { + getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + set_cluster_state_bundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + constexpr uint32_t n_buckets = 10; + completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + _sender.clear(); + + // Nothing in read-only DB after first bulk load of buckets. + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); + + lib::ClusterState pending_state("distributor:2 storage:4"); + setSystemState(pending_state); + // No buckets should be moved into read only db after ownership changes. + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_global_db().size()); +} + +void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs) +{ + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + lib::ClusterState initial_state(initial_state_str); + setSystemState(initial_state); + CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size()); + completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets); + _sender.clear(); + + lib::ClusterState pending_state(pending_state_str); // Ownership change + set_cluster_state_bundle(lib::ClusterStateBundle(pending_state, {}, true)); + CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size()); + completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets); + _sender.clear(); +} + +void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4); + + // Version should not be switched over yet + CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); + + CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + + CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size()); +} + +void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 storage:4", n_buckets, 0); + CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size()); +} + +void BucketDBUpdaterTest::read_only_db_is_populated_even_when_self_is_marked_down() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:1 .0.s:d storage:4", n_buckets, 0); + + // State not yet activated, so read-only DBs have got all the buckets we used to have. + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), read_only_global_db().size()); +} + +void BucketDBUpdaterTest::activate_cluster_state_request_with_mismatching_version_returns_actual_version() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:4 distributor:1 storage:4", n_buckets, 4, + "version:5 distributor:2 storage:4", n_buckets, 0); + + CPPUNIT_ASSERT(activate_cluster_state_version(4)); // Too old version + assert_has_activate_cluster_state_reply_with_actual_version(5); + + CPPUNIT_ASSERT(activate_cluster_state_version(6)); // More recent version than what has been observed + assert_has_activate_cluster_state_reply_with_actual_version(5); +} + +void BucketDBUpdaterTest::activate_cluster_state_request_without_pending_transition_passes_message_through() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4); + // Activate version 2; no pending cluster state after this. + CPPUNIT_ASSERT(!activate_cluster_state_version(2)); + + // No pending cluster state for version 3, just passed through to be implicitly bounced by state manager. + // Note: state manager is not modelled in this test, so we just check that the message handler returns + // false (meaning "didn't take message ownership") and there's no auto-generated reply. + CPPUNIT_ASSERT(!activate_cluster_state_version(3)); + CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size()); +} + +// TODO rename distributor config to imply two phase functionlity explicitly? + } diff --git a/storage/src/tests/distributor/distributortestutil.cpp b/storage/src/tests/distributor/distributortestutil.cpp index d3496d0c9f6..3f7f2eac63a 100644 --- a/storage/src/tests/distributor/distributortestutil.cpp +++ b/storage/src/tests/distributor/distributortestutil.cpp @@ -388,6 +388,16 @@ DistributorTestUtil::getBucketSpaceRepo() const { return _distributor->getBucketSpaceRepo(); } +DistributorBucketSpaceRepo & +DistributorTestUtil::getReadOnlyBucketSpaceRepo() { + return _distributor->getReadOnlyBucketSpaceRepo(); +} + +const DistributorBucketSpaceRepo & +DistributorTestUtil::getReadOnlyBucketSpaceRepo() const { + return _distributor->getReadOnlyBucketSpaceRepo(); +} + const lib::Distribution& DistributorTestUtil::getDistribution() const { return getBucketSpaceRepo().get(makeBucketSpace()).getDistribution(); diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 10cc5eeaca1..420111437d2 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -132,6 +132,8 @@ public: const BucketDatabase& getBucketDatabase(document::BucketSpace space) const; DistributorBucketSpaceRepo &getBucketSpaceRepo(); const DistributorBucketSpaceRepo &getBucketSpaceRepo() const; + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo(); + const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const; const lib::Distribution& getDistribution() const; // "End to end" distribution change trigger, which will invoke the bucket diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index ddf88f50c36..40fe885dcb1 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/operations/external/getoperation.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/update/documentupdate.h> @@ -20,8 +21,11 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest); CPPUNIT_TEST(testBucketSplitMask); - CPPUNIT_TEST(testOperationRejectedOnWrongDistribution); - CPPUNIT_TEST(testOperationRejectedOnPendingWrongDistribution); + CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution); + CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution); + CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet); + CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution); + CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(reject_put_if_not_past_safe_time_point); CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point); CPPUNIT_TEST(reject_update_if_not_past_safe_time_point); @@ -37,6 +41,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict); CPPUNIT_TEST(sequencing_works_across_mutation_types); CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled); + CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period); + CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period); + CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled); CPPUNIT_TEST_SUITE_END(); document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state); @@ -49,10 +56,13 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const; + std::shared_ptr<api::UpdateCommand> makeUpdateCommandForUser(uint64_t id) const; std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const; + void verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd); + Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd); void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd); @@ -80,10 +90,16 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"}; + // Returns an arbitrary bucket not owned in the pending state + document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled); + protected: void testBucketSplitMask(); - void testOperationRejectedOnWrongDistribution(); - void testOperationRejectedOnPendingWrongDistribution(); + void mutating_operation_wdr_bounced_on_wrong_current_distribution(); + void mutating_operation_busy_bounced_on_wrong_pending_distribution(); + void mutating_operation_busy_bounced_if_no_cluster_state_received_yet(); + void read_only_operation_wdr_bounced_on_wrong_current_distribution(); + void read_only_operation_busy_bounced_if_no_cluster_state_received_yet(); void reject_put_if_not_past_safe_time_point(); void reject_remove_if_not_past_safe_time_point(); void reject_update_if_not_past_safe_time_point(); @@ -99,6 +115,9 @@ protected: void concurrent_get_and_mutation_do_not_conflict(); void sequencing_works_across_mutation_types(); void sequencing_can_be_explicitly_config_disabled(); + void gets_are_started_with_mutable_db_outside_transition_period(); + void gets_are_started_with_read_only_db_during_transition_period(); + void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled(); void assert_rejection_due_to_unsafe_time( std::shared_ptr<api::StorageCommand> cmd); @@ -220,6 +239,11 @@ ExternalOperationHandlerTest::makeUpdateCommand() const { return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz"); } +std::shared_ptr<api::UpdateCommand> +ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const { + return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id)); +} + std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand( const vespalib::string& doc_type, const vespalib::string& id) const { @@ -233,10 +257,30 @@ std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveComm } void -ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution() +ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); - std::string state("distributor:2 storage:2"); + std::string state("version:1 distributor:2 storage:2"); + setupDistributor(1, 2, state); + + document::BucketId bucket(findNonOwnedUserBucketInState(state)); + auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits()); + + Operation::SP genOp; + CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); + CPPUNIT_ASSERT(!genOp.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(WRONG_DISTRIBUTION, " + "version:1 distributor:2 storage:2)"), + _sender.replies[0]->getResult().toString()); +} + +void +ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution() +{ + createLinks(); + std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); @@ -248,43 +292,65 @@ ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution() CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:2 storage:2)"), + "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } void -ExternalOperationHandlerTest::testOperationRejectedOnPendingWrongDistribution() +ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution() { createLinks(); - std::string current("distributor:2 storage:2"); - std::string pending("distributor:3 storage:3"); + std::string current("version:10 distributor:2 storage:2"); + std::string pending("version:11 distributor:3 storage:3"); setupDistributor(1, 3, current); document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending)); // Trigger pending cluster state - auto stateCmd = std::make_shared<api::SetSystemStateCommand>( - lib::ClusterState(pending)); + auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending)); getBucketDBUpdater().onSetSystemState(stateCmd); - auto cmd = makeGetCommandForUser(b.withoutCountBits()); + auto cmd = makeUpdateCommandForUser(b.withoutCountBits()); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); - // Fail back with _pending_ cluster state so client can start trying - // correct distributor immediately. If that distributor has not yet - // completed processing its pending cluster state, it'll return the - // old (current) cluster state, causing the client to bounce between - // the two until the pending states have been resolved. This is pretty - // much inevitable with the current design. CPPUNIT_ASSERT_EQUAL( - std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:3 storage:3)"), + std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"), _sender.replies[0]->getResult().toString()); } +void +ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd) +{ + createLinks(); + std::string state{}; // No version --> not yet received + setupDistributor(1, 2, state); + + Operation::SP genOp; + CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); + CPPUNIT_ASSERT(!genOp.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(BUSY, No cluster state activated yet)"), + _sender.replies[0]->getResult().toString()); +} + +// TODO NOT_READY is a more appropriate return code for this case, but must ensure it's +// handled gracefully and silently through the stack. BUSY is a safe bet until then. +void +ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet() +{ + verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345)); +} + +void +ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet() +{ + verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345)); +} + using TimePoint = ExternalOperationHandler::TimePoint; using namespace std::literals::chrono_literals; @@ -292,7 +358,7 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time( std::shared_ptr<api::StorageCommand> cmd) { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -327,7 +393,7 @@ void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() { void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -342,7 +408,7 @@ void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(10); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -360,7 +426,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); } Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected( @@ -486,6 +552,52 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled( start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); } +void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() { + createLinks(); + std::string current = "version:1 distributor:1 storage:3"; + setupDistributor(1, 3, current); + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + + document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter + + auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits())); + auto& get_op = dynamic_cast<GetOperation&>(*op); + const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); +} + +document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) { + createLinks(); + std::string current = "version:123 distributor:2 storage:2"; + std::string pending = "version:321 distributor:3 storage:3"; + setupDistributor(1, 3, current); + getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled); + + // Trigger pending cluster state + auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending)); + getBucketDBUpdater().onSetSystemState(stateCmd); + return findOwned1stNotOwned2ndInStates(current, pending); +} + +void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() { + auto non_owned_bucket = set_up_pending_cluster_state_transition(true); + + auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); + auto& get_op = dynamic_cast<GetOperation&>(*op); + const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); +} + +void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() { + auto non_owned_bucket = set_up_pending_cluster_state_transition(false); + + start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"), + _sender.replies[0]->getResult().toString()); + +} + // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the diff --git a/storage/src/tests/storageserver/bouncertest.cpp b/storage/src/tests/storageserver/bouncertest.cpp index 27c13a3707e..371c24accbc 100644 --- a/storage/src/tests/storageserver/bouncertest.cpp +++ b/storage/src/tests/storageserver/bouncertest.cpp @@ -43,6 +43,7 @@ struct BouncerTest : public CppUnit::TestFixture { void outOfBoundsConfigValuesThrowException(); void abort_request_when_derived_bucket_space_node_state_is_marked_down(); void client_operations_are_allowed_through_on_cluster_state_down_distributor(); + void cluster_state_activation_commands_are_not_bounced(); CPPUNIT_TEST_SUITE(BouncerTest); CPPUNIT_TEST(testFutureTimestamp); @@ -57,6 +58,7 @@ struct BouncerTest : public CppUnit::TestFixture { CPPUNIT_TEST(outOfBoundsConfigValuesThrowException); CPPUNIT_TEST(abort_request_when_derived_bucket_space_node_state_is_marked_down); CPPUNIT_TEST(client_operations_are_allowed_through_on_cluster_state_down_distributor); + CPPUNIT_TEST(cluster_state_activation_commands_are_not_bounced); CPPUNIT_TEST_SUITE_END(); using Priority = api::StorageMessage::Priority; @@ -368,5 +370,17 @@ void BouncerTest::client_operations_are_allowed_through_on_cluster_state_down_di CPPUNIT_ASSERT_EQUAL(uint64_t(0), _manager->metrics().unavailable_node_aborts.getValue()); } +void BouncerTest::cluster_state_activation_commands_are_not_bounced() { + tearDown(); + setUpAsNode(lib::NodeType::DISTRIBUTOR); + + auto state = makeClusterStateBundle("version:10 distributor:3 .2.s:d storage:3", {}); // Our index (2) is down + _node->getNodeStateUpdater().setClusterStateBundle(state); + + auto activate_cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(11); + _upper->sendDown(activate_cmd); + assertMessageNotBounced(); +} + } // storage diff --git a/storage/src/tests/storageserver/fnet_listener_test.cpp b/storage/src/tests/storageserver/fnet_listener_test.cpp index 84051041d25..d40b230d725 100644 --- a/storage/src/tests/storageserver/fnet_listener_test.cpp +++ b/storage/src/tests/storageserver/fnet_listener_test.cpp @@ -27,6 +27,9 @@ public: CPPUNIT_TEST(set_distribution_rpc_is_immediately_failed_if_listener_is_closed); CPPUNIT_TEST(overly_large_uncompressed_bundle_size_parameter_returns_rpc_error); CPPUNIT_TEST(mismatching_uncompressed_bundle_size_parameter_returns_rpc_error); + CPPUNIT_TEST(true_deferred_activation_flag_can_be_roundtrip_encoded); + CPPUNIT_TEST(false_deferred_activation_flag_can_be_roundtrip_encoded); + CPPUNIT_TEST(activate_cluster_state_version_rpc_enqueues_command_with_version); CPPUNIT_TEST_SUITE_END(); void baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle(); @@ -35,6 +38,9 @@ public: void set_distribution_rpc_is_immediately_failed_if_listener_is_closed(); void overly_large_uncompressed_bundle_size_parameter_returns_rpc_error(); void mismatching_uncompressed_bundle_size_parameter_returns_rpc_error(); + void true_deferred_activation_flag_can_be_roundtrip_encoded(); + void false_deferred_activation_flag_can_be_roundtrip_encoded(); + void activate_cluster_state_version_rpc_enqueues_command_with_version(); }; CPPUNIT_TEST_SUITE_REGISTRATION(FNetListenerTest); @@ -54,24 +60,25 @@ struct DummyReturnHandler : FRT_IReturnHandler { FNET_Connection* GetConnection() override { return nullptr; } }; -struct Fixture { +struct FixtureBase { // TODO factor out Slobrok code to avoid need to set up live ports for unrelated tests mbus::Slobrok slobrok; vdstestlib::DirConfig config; MockOperationEnqueuer enqueuer; std::unique_ptr<FNetListener> fnet_listener; - SlimeClusterStateBundleCodec codec; DummyReturnHandler return_handler; bool request_is_detached{false}; FRT_RPCRequest* bound_request{nullptr}; - Fixture() : config(getStandardConfig(true)) { + FixtureBase() + : config(getStandardConfig(true)) + { config.getConfig("stor-server").set("node_index", "1"); addSlobrokConfig(config, slobrok); fnet_listener = std::make_unique<FNetListener>(enqueuer, config.getConfigId(), 0); } - ~Fixture() { + virtual ~FixtureBase() { // Must destroy any associated message contexts that may have refs to FRT_Request // instance _before_ we destroy the request itself. enqueuer._enqueued.clear(); @@ -79,6 +86,12 @@ struct Fixture { bound_request->SubRef(); } } +}; + +struct SetStateFixture : FixtureBase { + SlimeClusterStateBundleCodec codec; + + SetStateFixture() : FixtureBase() {} void bind_request_params(EncodedClusterStateBundle& encoded_bundle, uint32_t uncompressed_length) { bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting @@ -123,6 +136,10 @@ struct Fixture { lib::ClusterStateBundle dummy_baseline_bundle() const { return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3")); } + + lib::ClusterStateBundle dummy_baseline_bundle_with_deferred_activation(bool deferred) const { + return lib::ClusterStateBundle(lib::ClusterState("version:123 distributor:3 storage:3"), {}, deferred); + } }; std::shared_ptr<const lib::ClusterState> state_of(vespalib::stringref state) { @@ -138,17 +155,17 @@ vespalib::string make_compressable_state_string() { ss.str().data(), ss.str().data()); } -} +} // anon namespace void FNetListenerTest::baseline_set_distribution_states_rpc_enqueues_command_with_state_bundle() { - Fixture f; + SetStateFixture f; auto baseline = f.dummy_baseline_bundle(); f.assert_request_received_and_propagated(baseline); } void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command_with_state_bundle() { - Fixture f; + SetStateFixture f; lib::ClusterStateBundle spaces_bundle( lib::ClusterState("version:123 distributor:3 storage:3"), {{FixedBucketSpaces::default_space(), state_of("version:123 distributor:3 storage:3 .0.s:d")}, @@ -158,7 +175,7 @@ void FNetListenerTest::set_distribution_states_rpc_with_derived_enqueues_command } void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { - Fixture f; + SetStateFixture f; auto state_str = make_compressable_state_string(); lib::ClusterStateBundle compressable_bundle{lib::ClusterState(state_str)}; @@ -171,24 +188,73 @@ void FNetListenerTest::compressed_bundle_is_transparently_uncompressed() { } void FNetListenerTest::set_distribution_rpc_is_immediately_failed_if_listener_is_closed() { - Fixture f; + SetStateFixture f; f.create_request(f.dummy_baseline_bundle()); f.fnet_listener->close(); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN); } void FNetListenerTest::overly_large_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; + SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, FNetListener::StateBundleMaxUncompressedSize + 1); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } void FNetListenerTest::mismatching_uncompressed_bundle_size_parameter_returns_rpc_error() { - Fixture f; + SetStateFixture f; auto encoded_bundle = f.codec.encode(f.dummy_baseline_bundle()); f.bind_request_params(encoded_bundle, encoded_bundle._buffer->getDataLen() + 100); f.assert_request_returns_error_response(RPCRequestWrapper::ERR_BAD_REQUEST); } +void FNetListenerTest::true_deferred_activation_flag_can_be_roundtrip_encoded() { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(true)); + +} + +void FNetListenerTest::false_deferred_activation_flag_can_be_roundtrip_encoded() { + SetStateFixture f; + f.assert_request_received_and_propagated(f.dummy_baseline_bundle_with_deferred_activation(false)); +} + +struct ActivateStateFixture : FixtureBase { + ActivateStateFixture() : FixtureBase() {} + + void bind_request_params(uint32_t activate_version) { + bound_request = new FRT_RPCRequest(); // Naked new isn't pretty, but FRT_RPCRequest has internal refcounting + auto* params = bound_request->GetParams(); + params->AddInt32(activate_version); + + bound_request->SetDetachedPT(&request_is_detached); + bound_request->SetReturnHandler(&return_handler); + } + + void create_request(uint32_t activate_version) { + // Only 1 request allowed per fixture due to lifetime handling snags + assert(bound_request == nullptr); + bind_request_params(activate_version); + } + + void assert_enqueued_operation_has_activate_version(uint32_t version) { + CPPUNIT_ASSERT(bound_request != nullptr); + CPPUNIT_ASSERT(request_is_detached); + CPPUNIT_ASSERT_EQUAL(size_t(1), enqueuer._enqueued.size()); + auto& state_request = dynamic_cast<const api::ActivateClusterStateVersionCommand&>(*enqueuer._enqueued[0]); + CPPUNIT_ASSERT_EQUAL(version, state_request.version()); + } + + void assert_request_received_and_propagated(uint32_t activate_version) { + create_request(activate_version); + fnet_listener->RPC_activateClusterStateVersion(bound_request); + assert_enqueued_operation_has_activate_version(activate_version); + } +}; + +void FNetListenerTest::activate_cluster_state_version_rpc_enqueues_command_with_version() { + ActivateStateFixture f; + f.assert_request_received_and_propagated(1234567); +} + } diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 19f414482db..cdf990fa28f 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -37,6 +37,7 @@ struct StateManagerTest : public CppUnit::TestFixture { void can_explicitly_send_get_node_state_reply(); void explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request(); void immediate_node_state_replying_is_tracked_per_controller(); + void activation_command_is_bounced_with_current_cluster_state_version(); CPPUNIT_TEST_SUITE(StateManagerTest); CPPUNIT_TEST(testSystemState); @@ -45,8 +46,10 @@ struct StateManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(can_explicitly_send_get_node_state_reply); CPPUNIT_TEST(explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request); CPPUNIT_TEST(immediate_node_state_replying_is_tracked_per_controller); + CPPUNIT_TEST(activation_command_is_bounced_with_current_cluster_state_version); CPPUNIT_TEST_SUITE_END(); + void force_current_cluster_state_version(uint32_t version); void mark_reported_node_state_up(); void send_down_get_node_state_request(uint16_t controller_index); void assert_ok_get_node_state_reply_sent_and_clear(); @@ -101,6 +104,12 @@ StateManagerTest::tearDown() { _metricManager.reset(); } +void StateManagerTest::force_current_cluster_state_version(uint32_t version) { + ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); + state.setVersion(version); + _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); +} + #define GET_ONLY_OK_REPLY(varname) \ { \ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \ @@ -236,9 +245,7 @@ StateManagerTest::testReportedNodeState() } void StateManagerTest::current_cluster_state_version_is_included_in_host_info_json() { - ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); - state.setVersion(123); - _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); + force_current_cluster_state_version(123); std::string nodeInfoString(_manager->getNodeInfo()); vespalib::Memory goldenMemory(nodeInfoString); @@ -343,4 +350,21 @@ void StateManagerTest::immediate_node_state_replying_is_tracked_per_controller() CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); } +void StateManagerTest::activation_command_is_bounced_with_current_cluster_state_version() { + force_current_cluster_state_version(12345); + + auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340); + cmd->setTimeout(10000000); + cmd->setSourceIndex(0); + _upper->sendDown(cmd); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); + std::shared_ptr<api::StorageReply> reply; + GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + CPPUNIT_ASSERT_EQUAL(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType()); + auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply); + CPPUNIT_ASSERT_EQUAL(uint32_t(12340), activate_reply.activateVersion()); + CPPUNIT_ASSERT_EQUAL(uint32_t(12345), activate_reply.actualVersion()); +} + } // storage diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 44cf56fdff8..294ce56f536 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -34,6 +34,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _enableHostInfoReporting(true), _disableBucketActivation(false), _sequenceMutatingOperations(true), + _allowStaleReadsDuringClusterStateTransitions(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -144,6 +145,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _enableHostInfoReporting = config.enableHostInfoReporting; _disableBucketActivation = config.disableBucketActivation; _sequenceMutatingOperations = config.sequenceMutatingOperations; + _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 5dfc4f66cb8..8c84fef47b5 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -235,6 +235,13 @@ public: void setSequenceMutatingOperations(bool sequenceMutations) noexcept { _sequenceMutatingOperations = sequenceMutations; } + + bool allowStaleReadsDuringClusterStateTransitions() const noexcept { + return _allowStaleReadsDuringClusterStateTransitions; + } + void setAllowStaleReadsDuringClusterStateTransitions(bool allow) noexcept { + _allowStaleReadsDuringClusterStateTransitions = allow; + } private: DistributorConfiguration(const DistributorConfiguration& other); @@ -274,6 +281,7 @@ private: bool _enableHostInfoReporting; bool _disableBucketActivation; bool _sequenceMutatingOperations; + bool _allowStaleReadsDuringClusterStateTransitions; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 89aad427ca9..d4f69073cc6 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -184,3 +184,10 @@ sequence_mutating_operations bool default=true ## towards a node if it has indicated that its merge queues are full or it is ## suffering from resource exhaustion. inhibit_merge_sending_on_busy_node_duration_sec int default=10 + +## If set, enables potentially stale reads during cluster state transitions where +## buckets change ownership. This also implicitly enables support for two-phase +## cluster state transitions on the distributor. +## For this option to take effect, the cluster controller must also have two-phase +## states enabled. +allow_stale_reads_during_cluster_state_transitions bool default=false diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index a223001af79..e9595b4a960 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -20,11 +20,12 @@ using document::BucketSpace; namespace storage::distributor { BucketDBUpdater::BucketDBUpdater(Distributor& owner, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, DistributorComponentRegister& compReg) : framework::StatusReporter("bucketdb", "Bucket DB Updater"), - _distributorComponent(owner, bucketSpaceRepo, compReg, "Bucket DB Updater"), + _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Bucket DB Updater"), _sender(sender), _transitionTimer(_distributorComponent.getClock()) { @@ -53,6 +54,13 @@ BucketDBUpdater::print(std::ostream& out, bool verbose, const std::string& inden } bool +BucketDBUpdater::shouldDeferStateEnabling() const noexcept +{ + return _distributorComponent.getDistributor().getConfig() + .allowStaleReadsDuringClusterStateTransitions(); +} + +bool BucketDBUpdater::hasPendingClusterState() const { return static_cast<bool>(_pendingClusterState); @@ -113,25 +121,35 @@ void BucketDBUpdater::removeSuperfluousBuckets( const lib::ClusterStateBundle& newState) { + const bool move_to_read_only_db = shouldDeferStateEnabling(); for (auto &elem : _distributorComponent.getBucketSpaceRepo()) { const auto &newDistribution(elem.second->getDistribution()); const auto &oldClusterState(elem.second->getClusterState()); auto &bucketDb(elem.second->getBucketDatabase()); + auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase()); // Remove all buckets not belonging to this distributor, or // being on storage nodes that are no longer up. NodeRemover proc( oldClusterState, *newState.getDerivedClusterState(elem.first), - _distributorComponent.getBucketIdFactory(), _distributorComponent.getIndex(), newDistribution, _distributorComponent.getDistributor().getStorageNodeUpStates()); bucketDb.forEach(proc); - for (const auto & entry :proc.getBucketsToRemove()) { - bucketDb.remove(entry); + for (const auto & bucket : proc.getBucketsToRemove()) { + bucketDb.remove(bucket); + } + // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory... + for (const auto& bucket : proc.getNonOwnedBuckets()) { + if (move_to_read_only_db) { + auto db_entry = bucketDb.get(bucket); + readOnlyDb.update(db_entry); // TODO Entry move support + } + bucketDb.remove(bucket); } + } } @@ -154,6 +172,14 @@ BucketDBUpdater::completeTransitionTimer() } void +BucketDBUpdater::clearReadOnlyBucketRepoDatabases() +{ + for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) { + space.second->getBucketDatabase().clear(); + } +} + +void BucketDBUpdater::storageDistributionChanged() { ensureTransitionTimerStarted(); @@ -169,6 +195,7 @@ BucketDBUpdater::storageDistributionChanged() std::move(clusterInfo), _sender, _distributorComponent.getBucketSpaceRepo(), + _distributorComponent.getReadOnlyBucketSpaceRepo(), _distributorComponent.getUniqueTimestamp()); _outdatedNodesMap = _pendingClusterState->getOutdatedNodesMap(); } @@ -176,14 +203,22 @@ BucketDBUpdater::storageDistributionChanged() void BucketDBUpdater::replyToPreviousPendingClusterStateIfAny() { - if (_pendingClusterState.get() && - _pendingClusterState->getCommand().get()) - { + if (_pendingClusterState.get() && _pendingClusterState->hasCommand()) { _distributorComponent.sendUp( std::make_shared<api::SetSystemStateReply>(*_pendingClusterState->getCommand())); } } +void +BucketDBUpdater::replyToActivationWithActualVersion( + const api::ActivateClusterStateVersionCommand& cmd, + uint32_t actualVersion) +{ + auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(cmd); + reply->setActualVersion(actualVersion); + _distributorComponent.sendUp(reply); // TODO let API accept rvalues +} + bool BucketDBUpdater::onSetSystemState( const std::shared_ptr<api::SetSystemStateCommand>& cmd) @@ -214,6 +249,7 @@ BucketDBUpdater::onSetSystemState( std::move(clusterInfo), _sender, _distributorComponent.getBucketSpaceRepo(), + _distributorComponent.getReadOnlyBucketSpaceRepo(), cmd, _outdatedNodesMap, _distributorComponent.getUniqueTimestamp()); @@ -225,6 +261,39 @@ BucketDBUpdater::onSetSystemState( return true; } +bool +BucketDBUpdater::onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) +{ + if (hasPendingClusterState() && _pendingClusterState->isVersionedTransition()) { + const auto pending_version = _pendingClusterState->clusterStateVersion(); + if (pending_version == cmd->version()) { + if (isPendingClusterStateCompleted()) { + assert(_pendingClusterState->isDeferred()); + activatePendingClusterState(); + } else { + LOG(error, "Received cluster state activation for pending version %u " + "without pending state being complete yet. This is not expected, " + "as no activation should be sent before all distributors have " + "reported that state processing is complete.", pending_version); + replyToActivationWithActualVersion(*cmd, 0); // Invalid version, will cause re-send (hopefully when completed). + return true; + } + } else { + replyToActivationWithActualVersion(*cmd, pending_version); + return true; + } + } else if (shouldDeferStateEnabling()) { + // Likely just a resend, but log warn for now to get a feel of how common it is. + LOG(warning, "Received cluster state activation command for version %u, which " + "has no corresponding pending state. Likely resent operation.", cmd->version()); + } else { + LOG(debug, "Received cluster state activation command for version %u, but distributor " + "config does not have deferred activation enabled. Treating as no-op.", cmd->version()); + } + // Fall through to next link in call chain that cares about this message. + return false; +} + BucketDBUpdater::MergeReplyGuard::~MergeReplyGuard() { if (_reply) { @@ -485,14 +554,45 @@ BucketDBUpdater::isPendingClusterStateCompleted() const void BucketDBUpdater::processCompletedPendingClusterState() { + if (_pendingClusterState->isDeferred()) { + LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated", + _pendingClusterState->clusterStateVersion()); + assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands. + // Sending down SetSystemState command will reach the state manager and a reply + // will be auto-sent back to the cluster controller in charge. Once this happens, + // it will send an explicit activation command once all distributors have reported + // that their pending cluster states have completed. + // A booting distributor will treat itself as "system Up" before the state has actually + // taken effect via activation. External operation handler will keep operations from + // actually being scheduled until state has been activated. The external operation handler + // needs to be explicitly aware of the case where no state has yet to be activated. + _distributorComponent.getDistributor().getMessageSender().sendDown( + _pendingClusterState->getCommand()); + _pendingClusterState->clearCommand(); + return; + } + // Distribution config change or non-deferred cluster state. Immediately activate + // the pending state without being told to do so explicitly. + activatePendingClusterState(); +} + +void +BucketDBUpdater::activatePendingClusterState() +{ _pendingClusterState->mergeIntoBucketDatabases(); - if (_pendingClusterState->getCommand().get()) { + if (_pendingClusterState->isVersionedTransition()) { + LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); enableCurrentClusterStateBundleInDistributor(); - _distributorComponent.getDistributor().getMessageSender().sendDown( - _pendingClusterState->getCommand()); + if (_pendingClusterState->hasCommand()) { + _distributorComponent.getDistributor().getMessageSender().sendDown( + _pendingClusterState->getCommand()); + } addCurrentStateToClusterStateHistory(); } else { + LOG(debug, "Activating pending distribution config"); + // TODO distribution changes cannot currently be deferred as they are not + // initiated by the cluster controller! _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); } @@ -500,13 +600,14 @@ BucketDBUpdater::processCompletedPendingClusterState() _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); + clearReadOnlyBucketRepoDatabases(); } void BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() { const lib::ClusterStateBundle& state( - _pendingClusterState->getCommand()->getClusterStateBundle()); + _pendingClusterState->getNewClusterStateBundle()); LOG(debug, "BucketDBUpdater finished processing state %s", @@ -688,7 +789,7 @@ BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e) return true; } if (!distributorOwnsBucket(bucketId)) { - _removedBuckets.push_back(bucketId); + _nonOwnedBuckets.push_back(bucketId); return true; } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index ea67e7ea72a..393e1e2524e 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -33,7 +33,8 @@ public: using OutdatedNodes = dbtransition::OutdatedNodes; using OutdatedNodesMap = dbtransition::OutdatedNodesMap; BucketDBUpdater(Distributor& owner, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorMessageSender& sender, DistributorComponentRegister& compReg); ~BucketDBUpdater(); @@ -43,6 +44,7 @@ public: void recheckBucketInfo(uint32_t nodeIdx, const document::Bucket& bucket); bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& cmd) override; + bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) override; bool onRequestBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply> & repl) override; bool onMergeBucketReply(const std::shared_ptr<api::MergeBucketReply>& reply) override; bool onNotifyBucketChange(const std::shared_ptr<api::NotifyBucketChangeCommand>&) override; @@ -124,6 +126,7 @@ private: } }; + bool shouldDeferStateEnabling() const noexcept; bool hasPendingClusterState() const; bool pendingClusterStateAccepted(const std::shared_ptr<api::RequestBucketInfoReply>& repl); bool processSingleBucketInfoReply(const std::shared_ptr<api::RequestBucketInfoReply>& repl); @@ -131,6 +134,7 @@ private: const BucketRequest& req); bool isPendingClusterStateCompleted() const; void processCompletedPendingClusterState(); + void activatePendingClusterState(); void mergeBucketInfoWithDatabase(const std::shared_ptr<api::RequestBucketInfoReply>& repl, const BucketRequest& req); void convertBucketInfoToBucketList(const std::shared_ptr<api::RequestBucketInfoReply>& repl, @@ -141,6 +145,7 @@ private: BucketListMerger::BucketList& existing) const; void ensureTransitionTimerStarted(); void completeTransitionTimer(); + void clearReadOnlyBucketRepoDatabases(); /** * Adds all buckets contained in the bucket database * that are either contained @@ -161,6 +166,9 @@ private: void removeSuperfluousBuckets(const lib::ClusterStateBundle& newState); void replyToPreviousPendingClusterStateIfAny(); + void replyToActivationWithActualVersion( + const api::ActivateClusterStateVersionCommand& cmd, + uint32_t actualVersion); void enableCurrentClusterStateBundleInDistributor(); void addCurrentStateToClusterStateHistory(); @@ -191,30 +199,35 @@ private: public: NodeRemover(const lib::ClusterState& oldState, const lib::ClusterState& s, - [[maybe_unused]] const document::BucketIdFactory& factory, uint16_t localIndex, const lib::Distribution& distribution, const char* upStates) : _oldState(oldState), _state(s), + _nonOwnedBuckets(), + _removedBuckets(), _localIndex(localIndex), _distribution(distribution), _upStates(upStates) {} - ~NodeRemover(); + ~NodeRemover() override; bool process(BucketDatabase::Entry& e) override; void logRemove(const document::BucketId& bucketId, const char* msg) const; bool distributorOwnsBucket(const document::BucketId&) const; - const std::vector<document::BucketId>& getBucketsToRemove() const { + const std::vector<document::BucketId>& getBucketsToRemove() const noexcept { return _removedBuckets; } + const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept { + return _nonOwnedBuckets; + } private: void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const; void removeEmptyBucket(const document::BucketId& bucketId); const lib::ClusterState _oldState; const lib::ClusterState _state; + std::vector<document::BucketId> _nonOwnedBuckets; std::vector<document::BucketId> _removedBuckets; uint16_t _localIndex; diff --git a/storage/src/vespa/storage/distributor/distributor.cpp b/storage/src/vespa/storage/distributor/distributor.cpp index 1664dd0d9a1..c92dfbdc14e 100644 --- a/storage/src/vespa/storage/distributor/distributor.cpp +++ b/storage/src/vespa/storage/distributor/distributor.cpp @@ -67,15 +67,16 @@ Distributor::Distributor(DistributorComponentRegister& compReg, _compReg(compReg), _component(compReg, "distributor"), _bucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()), + _readOnlyBucketSpaceRepo(std::make_unique<DistributorBucketSpaceRepo>()), _metrics(new DistributorMetricSet(_component.getLoadTypes()->getMetricLoadTypes())), _operationOwner(*this, _component.getClock()), _maintenanceOperationOwner(*this, _component.getClock()), _pendingMessageTracker(compReg), - _bucketDBUpdater(*this, *_bucketSpaceRepo, *this, compReg), + _bucketDBUpdater(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, *this, compReg), _distributorStatusDelegate(compReg, *this, *this), _bucketDBStatusDelegate(compReg, *this, _bucketDBUpdater), - _idealStateManager(*this, *_bucketSpaceRepo, compReg, manageActiveBucketCopies), - _externalOperationHandler(*this, *_bucketSpaceRepo, _idealStateManager, compReg), + _idealStateManager(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, compReg, manageActiveBucketCopies), + _externalOperationHandler(*this, *_bucketSpaceRepo, *_readOnlyBucketSpaceRepo, _idealStateManager, compReg), _threadPool(threadPool), _initializingIsUp(true), _doneInitializeHandler(doneInitHandler), @@ -575,16 +576,20 @@ void Distributor::propagateDefaultDistribution( std::shared_ptr<const lib::Distribution> distribution) { - _bucketSpaceRepo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); auto global_distr = GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); - _bucketSpaceRepo->get(document::FixedBucketSpaces::global_space()).setDistribution(std::move(global_distr)); + for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { + repo->get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); + repo->get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); + } } void Distributor::propagateClusterStates() { - for (auto &iter : *_bucketSpaceRepo) { - iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first)); + for (auto* repo : {_bucketSpaceRepo.get(), _readOnlyBucketSpaceRepo.get()}) { + for (auto& iter : *repo) { + iter.second->setClusterState(_clusterStateBundle.getDerivedClusterState(iter.first)); + } } } diff --git a/storage/src/vespa/storage/distributor/distributor.h b/storage/src/vespa/storage/distributor/distributor.h index fb8a9fb4299..cd24b91eba2 100644 --- a/storage/src/vespa/storage/distributor/distributor.h +++ b/storage/src/vespa/storage/distributor/distributor.h @@ -158,6 +158,13 @@ public: DistributorBucketSpaceRepo &getBucketSpaceRepo() noexcept { return *_bucketSpaceRepo; } const DistributorBucketSpaceRepo &getBucketSpaceRepo() const noexcept { return *_bucketSpaceRepo; } + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() noexcept { + return *_readOnlyBucketSpaceRepo; + } + const DistributorBucketSpaceRepo& getReadyOnlyBucketSpaceRepo() const noexcept { + return *_readOnlyBucketSpaceRepo; + } + private: friend class Distributor_Test; friend class BucketDBUpdaterTest; @@ -244,6 +251,10 @@ private: DistributorComponentRegister& _compReg; storage::DistributorComponent _component; std::unique_ptr<DistributorBucketSpaceRepo> _bucketSpaceRepo; + // Read-only bucket space repo with DBs that only contain buckets transiently + // during cluster state transitions. Bucket set does not overlap that of _bucketSpaceRepo + // and the DBs are empty during non-transition phases. + std::unique_ptr<DistributorBucketSpaceRepo> _readOnlyBucketSpaceRepo; std::shared_ptr<DistributorMetricSet> _metrics; OperationOwner _operationOwner; diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.cpp b/storage/src/vespa/storage/distributor/distributorcomponent.cpp index d3d07350d35..9bd215b9644 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.cpp +++ b/storage/src/vespa/storage/distributor/distributorcomponent.cpp @@ -15,16 +15,18 @@ namespace storage::distributor { DistributorComponent::DistributorComponent( DistributorInterface& distributor, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorComponentRegister& compReg, const std::string& name) : storage::DistributorComponent(compReg, name), _distributor(distributor), - _bucketSpaceRepo(bucketSpaceRepo) + _bucketSpaceRepo(bucketSpaceRepo), + _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo) { } -DistributorComponent::~DistributorComponent() {} +DistributorComponent::~DistributorComponent() = default; void DistributorComponent::sendDown(const api::StorageMessage::SP& msg) diff --git a/storage/src/vespa/storage/distributor/distributorcomponent.h b/storage/src/vespa/storage/distributor/distributorcomponent.h index 561904cee8d..f2aea89d47c 100644 --- a/storage/src/vespa/storage/distributor/distributorcomponent.h +++ b/storage/src/vespa/storage/distributor/distributorcomponent.h @@ -29,11 +29,12 @@ class DistributorComponent : public storage::DistributorComponent { public: DistributorComponent(DistributorInterface& distributor, - DistributorBucketSpaceRepo &bucketSpaceRepo, - DistributorComponentRegister& compReg, - const std::string& name); + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, + DistributorComponentRegister& compReg, + const std::string& name); - ~DistributorComponent(); + ~DistributorComponent() override; /** * Returns the ownership status of a bucket as decided with the given @@ -153,6 +154,9 @@ public: DistributorBucketSpaceRepo &getBucketSpaceRepo() { return _bucketSpaceRepo; } const DistributorBucketSpaceRepo &getBucketSpaceRepo() const { return _bucketSpaceRepo; } + DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() { return _readOnlyBucketSpaceRepo; } + const DistributorBucketSpaceRepo& getReadOnlyBucketSpaceRepo() const { return _readOnlyBucketSpaceRepo; } + /** * Finds a bucket that has the same direct parent as the given bucket * (i.e. split one bit less), but different bit in the most used bit. @@ -179,7 +183,8 @@ private: protected: - DistributorBucketSpaceRepo &_bucketSpaceRepo; + DistributorBucketSpaceRepo& _bucketSpaceRepo; + DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo; vespalib::Lock _sync; }; diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp index 927dc06182d..83923a1f00e 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp +++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp @@ -17,7 +17,7 @@ DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt) removelocations(lt, PersistenceOperationMetricSet("removelocations"), this), gets(lt, PersistenceOperationMetricSet("gets"), this), stats(lt, PersistenceOperationMetricSet("stats"), this), - multioperations(lt, PersistenceOperationMetricSet("multioperations"), this), + getbucketlists(lt, PersistenceOperationMetricSet("getbucketlists"), this), visits(lt, VisitorMetricSet(), this), stateTransitionTime("state_transition_time", {}, "Time it takes to complete a cluster state transition. If a " diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h index 5a64027f500..dfe976a89ab 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.h +++ b/storage/src/vespa/storage/distributor/distributormetricsset.h @@ -20,7 +20,7 @@ public: metrics::LoadMetric<PersistenceOperationMetricSet> removelocations; metrics::LoadMetric<PersistenceOperationMetricSet> gets; metrics::LoadMetric<PersistenceOperationMetricSet> stats; - metrics::LoadMetric<PersistenceOperationMetricSet> multioperations; + metrics::LoadMetric<PersistenceOperationMetricSet> getbucketlists; metrics::LoadMetric<VisitorMetricSet> visits; metrics::DoubleAverageMetric stateTransitionTime; metrics::DoubleAverageMetric recoveryModeTime; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index b22592af327..1b88f02cac6 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -20,14 +20,18 @@ #include "distributor_bucket_space.h" #include <vespa/log/log.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> + LOG_SETUP(".distributor.manager"); namespace storage::distributor { -ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, +ExternalOperationHandler::ExternalOperationHandler(Distributor& owner, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator& gen, DistributorComponentRegister& compReg) - : DistributorComponent(owner, bucketSpaceRepo, compReg, "External operation handler"), + : DistributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "External operation handler"), _operationGenerator(gen), _rejectFeedBeforeTimeReached() // At epoch { } @@ -68,19 +72,69 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) return true; } +void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result) { + api::StorageReply::UP reply(cmd.makeReply()); + reply->setResult(result); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + +void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { + // Distributor ownership is equal across bucket spaces, so always send back default space state. + // This also helps client avoid getting confused by possibly observing different actual + // (derived) state strings for global/non-global document types for the same state version. + // Similarly, if we've yet to activate any version at all we send back BUSY instead + // of a suspiciously empty WrongDistributionReply. + // TOOD consider NOT_READY instead of BUSY once we're sure this won't cause any other issues. + const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + if (cluster_state.getVersion() != 0) { + auto cluster_state_str = cluster_state.toString(); + LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str()); + bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str)); + } else { // Only valid for empty startup state + LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY"); + bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet")); + } +} + +void ExternalOperationHandler::bounce_with_busy_during_state_transition( + api::StorageCommand& cmd, + const lib::ClusterState& current_state, + const lib::ClusterState& pending_state) +{ + auto status_str = vespalib::make_string("Currently pending cluster state transition" + " from version %u to %u", + current_state.getVersion(), pending_state.getVersion()); + + api::StorageReply::UP reply(cmd.makeReply()); + api::ReturnCode ret(api::ReturnCode::BUSY, status_str); + reply->setResult(ret); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + bool ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageCommand& cmd, const document::BucketId &bucketId, PersistenceOperationMetricSet& persistenceMetrics) { document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId); - if (!checkDistribution(cmd, bucket)) { + if (!ownsBucketInCurrentState(bucket)) { LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", cmd.toString().c_str(), bucket.toString().c_str()); - + bounce_with_wrong_distribution(cmd); persistenceMetrics.failures.wrongdistributor.inc(); return false; } + + auto pending = getDistributor().checkOwnershipInPendingState(bucket); + if (!pending.isOwned()) { + // We return BUSY here instead of WrongDistributionReply to avoid clients potentially + // ping-ponging between cluster state versions during a state transition. + auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + auto& pending_state = pending.getNonOwnedState(); + bounce_with_busy_during_state_transition(cmd, current_state, pending_state); + return false; + } + if (!checkSafeTimeReached(cmd)) { persistenceMetrics.failures.safe_time_not_reached.inc(); return false; @@ -111,6 +165,35 @@ bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) con return handle.valid(); } +template <typename Func> +void ExternalOperationHandler::bounce_or_invoke_read_only_op( + api::StorageCommand& cmd, + const document::Bucket& bucket, + PersistenceOperationMetricSet& metrics, + Func func) +{ + if (!ownsBucketInCurrentState(bucket)) { + LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", + cmd.toString().c_str(), bucket.toString().c_str()); + bounce_with_wrong_distribution(cmd); + metrics.failures.wrongdistributor.inc(); + return; + } + + auto pending = getDistributor().checkOwnershipInPendingState(bucket); + if (pending.isOwned()) { + func(_bucketSpaceRepo); + } else { + if (getDistributor().getConfig().allowStaleReadsDuringClusterStateTransitions()) { + func(_readOnlyBucketSpaceRepo); + } else { + auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + auto& pending_state = pending.getNonOwnedState(); + bounce_with_busy_during_state_transition(cmd, current_state, pending_state); + } + } +} + IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) { auto& metrics = getMetrics().puts[cmd->getLoadType()]; @@ -186,10 +269,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) RemoveLocationOperation::getBucketId(*this, *cmd, bid); document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid); - if (!checkDistribution(*cmd, bucket)) { - LOG(debug, "Distributor manager received %s with wrong distribution", cmd->toString().c_str()); - - getMetrics().removelocations[cmd->getLoadType()].failures.wrongdistributor.inc(); + auto& metrics = getMetrics().removelocations[cmd->getLoadType()]; + if (!checkTimestampMutationPreconditions(*cmd, bucket.getBucketId(), metrics)) { return true; } @@ -201,43 +282,38 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); - if (!checkDistribution(*cmd, bucket)) { - LOG(debug, "Distributor manager received get for %s, bucket %s with wrong distribution", - cmd->getDocumentId().toString().c_str(), bucket.toString().c_str()); - - getMetrics().gets[cmd->getLoadType()].failures.wrongdistributor.inc(); - return true; - } - - _op = std::make_shared<GetOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), - cmd, getMetrics().gets[cmd->getLoadType()]); + auto& metrics = getMetrics().gets[cmd->getLoadType()]; + bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) { + _op = std::make_shared<GetOperation>(*this, bucket_space_repo.get(cmd->getBucket().getBucketSpace()), + cmd, metrics); + }); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) { - if (!checkDistribution(*cmd, cmd->getBucket())) { - return true; - } - auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = std::make_shared<StatBucketOperation>(*this, distributorBucketSpace, cmd); + auto& metrics = getMetrics().stats[cmd->getLoadType()]; + bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { + auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); + _op = std::make_shared<StatBucketOperation>(*this, bucket_space, cmd); + }); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList) { - if (!checkDistribution(*cmd, cmd->getBucket())) { - return true; - } - auto bucketSpace(cmd->getBucket().getBucketSpace()); - auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace)); - auto &bucketDatabase(distributorBucketSpace.getBucketDatabase()); - _op = std::make_shared<StatBucketListOperation>(bucketDatabase, _operationGenerator, getIndex(), cmd); + auto& metrics = getMetrics().getbucketlists[cmd->getLoadType()]; + bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { + auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); + auto& bucket_database = bucket_space.getBucketDatabase(); + _op = std::make_shared<StatBucketListOperation>(bucket_database, _operationGenerator, getIndex(), cmd); + }); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) { + // TODO same handling as Gets (VisitorOperation needs to change) const DistributorConfiguration& config(getDistributor().getConfig()); VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index c198fe30159..655feb5d00c 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -37,10 +37,11 @@ public: ExternalOperationHandler(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const MaintenanceOperationGenerator&, DistributorComponentRegister& compReg); - ~ExternalOperationHandler(); + ~ExternalOperationHandler() override; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg, Operation::SP& operation); @@ -55,6 +56,18 @@ private: Operation::SP _op; TimePoint _rejectFeedBeforeTimeReached; + template <typename Func> + void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, + const document::Bucket& bucket, + PersistenceOperationMetricSet& metrics, + Func f); + + void bounce_with_wrong_distribution(api::StorageCommand& cmd); + void bounce_with_busy_during_state_transition(api::StorageCommand& cmd, + const lib::ClusterState& current_state, + const lib::ClusterState& pending_state); + void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result); + bool checkSafeTimeReached(api::StorageCommand& cmd); api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime); bool checkTimestampMutationPreconditions( diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.cpp b/storage/src/vespa/storage/distributor/idealstatemanager.cpp index 77b924ad351..5a1ff31e2e7 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.cpp +++ b/storage/src/vespa/storage/distributor/idealstatemanager.cpp @@ -26,11 +26,12 @@ namespace distributor { IdealStateManager::IdealStateManager( Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorComponentRegister& compReg, bool manageActiveBucketCopies) : HtmlStatusReporter("idealstateman", "Ideal state manager"), _metrics(new IdealStateMetricSet), - _distributorComponent(owner, bucketSpaceRepo, compReg, "Ideal state manager"), + _distributorComponent(owner, bucketSpaceRepo, readOnlyBucketSpaceRepo, compReg, "Ideal state manager"), _bucketSpaceRepo(bucketSpaceRepo) { _distributorComponent.registerStatusPage(*this); diff --git a/storage/src/vespa/storage/distributor/idealstatemanager.h b/storage/src/vespa/storage/distributor/idealstatemanager.h index c8be2a40ad7..3bb6d0dd757 100644 --- a/storage/src/vespa/storage/distributor/idealstatemanager.h +++ b/storage/src/vespa/storage/distributor/idealstatemanager.h @@ -37,6 +37,7 @@ public: IdealStateManager(Distributor& owner, DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, DistributorComponentRegister& compReg, bool manageActiveBucketCopies); diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 198c588dfd1..3936f13077e 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -34,6 +34,9 @@ public: bool hasConsistentCopies() const; + // Exposed for unit testing. TODO feels a bit dirty :I + const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; } + private: class GroupId { public: diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 5f74a82c28a..6cba7084037 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -27,7 +27,8 @@ PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) @@ -40,6 +41,9 @@ PendingClusterState::PendingClusterState( _creationTimestamp(creationTimestamp), _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), + _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo), + _clusterStateVersion(_cmd->getClusterStateBundle().getVersion()), + _isVersionedTransition(true), _bucketOwnershipTransfer(false), _pendingTransitions() { @@ -51,7 +55,8 @@ PendingClusterState::PendingClusterState( const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, api::Timestamp creationTimestamp) : _requestedNodes(clusterInfo->getStorageNodeCount()), _prevClusterStateBundle(clusterInfo->getClusterStateBundle()), @@ -61,6 +66,9 @@ PendingClusterState::PendingClusterState( _creationTimestamp(creationTimestamp), _sender(sender), _bucketSpaceRepo(bucketSpaceRepo), + _readOnlyBucketSpaceRepo(readOnlyBucketSpaceRepo), + _clusterStateVersion(0), + _isVersionedTransition(false), _bucketOwnershipTransfer(true), _pendingTransitions() { diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.h b/storage/src/vespa/storage/distributor/pendingclusterstate.h index b96ba8cbbd7..cedc0573381 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.h +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.h @@ -45,15 +45,16 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp) { - return std::unique_ptr<PendingClusterState>( - new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, newStateCmd, - outdatedNodesMap, - creationTimestamp)); + // Naked new due to private constructor + return std::unique_ptr<PendingClusterState>(new PendingClusterState( + clock, clusterInfo, sender, bucketSpaceRepo, readOnlyBucketSpaceRepo, + newStateCmd, outdatedNodesMap, creationTimestamp)); } /** @@ -64,16 +65,19 @@ public: const framework::Clock& clock, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, api::Timestamp creationTimestamp) { - return std::unique_ptr<PendingClusterState>( - new PendingClusterState(clock, clusterInfo, sender, bucketSpaceRepo, creationTimestamp)); + // Naked new due to private constructor + return std::unique_ptr<PendingClusterState>(new PendingClusterState( + clock, clusterInfo, sender, bucketSpaceRepo, + readOnlyBucketSpaceRepo, creationTimestamp)); } PendingClusterState(const PendingClusterState &) = delete; PendingClusterState & operator = (const PendingClusterState &) = delete; - ~PendingClusterState(); + ~PendingClusterState() override; /** * Adds the info from the reply to our list of information. @@ -104,10 +108,31 @@ public: return _bucketOwnershipTransfer; } + bool hasCommand() const noexcept { + return (_cmd.get() != nullptr); + } + std::shared_ptr<api::SetSystemStateCommand> getCommand() { return _cmd; } + bool isVersionedTransition() const noexcept { + return _isVersionedTransition; + } + + uint32_t clusterStateVersion() const noexcept { + return _clusterStateVersion; + } + + bool isDeferred() const noexcept { + return (isVersionedTransition() + && _newClusterStateBundle.deferredActivation()); + } + + void clearCommand() { + _cmd.reset(); + } + const lib::ClusterStateBundle& getNewClusterStateBundle() const { return _newClusterStateBundle; } @@ -141,7 +166,8 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, const std::shared_ptr<api::SetSystemStateCommand>& newStateCmd, const OutdatedNodesMap &outdatedNodesMap, api::Timestamp creationTimestamp); @@ -154,7 +180,8 @@ private: const framework::Clock&, const ClusterInformation::CSP& clusterInfo, DistributorMessageSender& sender, - DistributorBucketSpaceRepo &bucketSpaceRepo, + DistributorBucketSpaceRepo& bucketSpaceRepo, + DistributorBucketSpaceRepo& readOnlyBucketSpaceRepo, api::Timestamp creationTimestamp); struct BucketSpaceAndNode { @@ -204,8 +231,10 @@ private: api::Timestamp _creationTimestamp; DistributorMessageSender& _sender; - DistributorBucketSpaceRepo &_bucketSpaceRepo; - + DistributorBucketSpaceRepo& _bucketSpaceRepo; + DistributorBucketSpaceRepo& _readOnlyBucketSpaceRepo; + uint32_t _clusterStateVersion; + bool _isVersionedTransition; bool _bucketOwnershipTransfer; std::unordered_map<document::BucketSpace, std::unique_ptr<PendingBucketSpaceDbTransition>, document::BucketSpace::hash> _pendingTransitions; }; diff --git a/storage/src/vespa/storage/storageserver/bouncer.cpp b/storage/src/vespa/storage/storageserver/bouncer.cpp index 0541c7322f1..fdbfd553315 100644 --- a/storage/src/vespa/storage/storageserver/bouncer.cpp +++ b/storage/src/vespa/storage/storageserver/bouncer.cpp @@ -235,6 +235,7 @@ Bouncer::onDown(const std::shared_ptr<api::StorageMessage>& msg) case api::MessageType::SETNODESTATE_ID: case api::MessageType::GETNODESTATE_ID: case api::MessageType::SETSYSTEMSTATE_ID: + case api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_ID: case api::MessageType::NOTIFYBUCKETCHANGE_ID: // state commands are always ok return false; diff --git a/storage/src/vespa/storage/storageserver/communicationmanager.cpp b/storage/src/vespa/storage/storageserver/communicationmanager.cpp index 7fb85ef0ecc..978d434847e 100644 --- a/storage/src/vespa/storage/storageserver/communicationmanager.cpp +++ b/storage/src/vespa/storage/storageserver/communicationmanager.cpp @@ -622,20 +622,25 @@ CommunicationManager::sendDirectRPCReply( { std::string requestName(request.getMethodName()); if (requestName == "getnodestate3") { - api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); + auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); request.addReturnString(gns.getNodeInfo().c_str()); LOGBP(debug, "Sending getnodestate3 reply with host info '%s'.", gns.getNodeInfo().c_str()); } else if (requestName == "getnodestate2") { - api::GetNodeStateReply& gns(static_cast<api::GetNodeStateReply&>(*reply)); + auto& gns(dynamic_cast<api::GetNodeStateReply&>(*reply)); std::ostringstream ns; serializeNodeState(gns, ns, true, true, false); request.addReturnString(ns.str().c_str()); LOGBP(debug, "Sending getnodestate2 reply with no host info."); } else if (requestName == "setsystemstate2" || requestName == "setdistributionstates") { // No data to return + } else if (requestName == "activate_cluster_state_version") { + auto& activate_reply(dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply)); + request.addReturnInt(activate_reply.actualVersion()); + LOGBP(debug, "sending activate_cluster_state_version reply for version %u with actual version %u ", + activate_reply.activateVersion(), activate_reply.actualVersion()); } else { request.addReturnInt(reply->getResult().getResult()); request.addReturnString(reply->getResult().getMessage().c_str()); diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index e31bded772c..ec488b25714 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -92,6 +92,11 @@ FNetListener::initRPC() rb.ParamDesc("uncompressedSize", "Uncompressed size for payload"); rb.ParamDesc("payload", "Binary Slime format payload"); //------------------------------------------------------------------------- + rb.DefineMethod("activate_cluster_state_version", "i", "i", FRT_METHOD(FNetListener::RPC_activateClusterStateVersion), this); + rb.MethodDesc("Explicitly activates an already prepared cluster state version"); + rb.ParamDesc("activate_version", "Expected cluster state version to activate"); + rb.ReturnDesc("actual_version", "Cluster state version that was prepared on the node prior to receiving RPC"); + //------------------------------------------------------------------------- rb.DefineMethod("getcurrenttime", "", "lis", FRT_METHOD(FNetListener::RPC_getCurrentTime), this); rb.MethodDesc("Get current time on this node"); rb.ReturnDesc("seconds", "Current time in seconds since epoch"); @@ -203,6 +208,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what()); return; } + LOG(debug, "Got state bundle %s", state_bundle->toString().c_str()); // TODO add constructor taking in shared_ptr directly instead? auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle); @@ -211,4 +217,20 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { detach_and_forward_to_enqueuer(std::move(cmd), req); } +void FNetListener::RPC_activateClusterStateVersion(FRT_RPCRequest* req) { + if (_closed) { + LOG(debug, "Not handling RPC call activate_cluster_state_version() as we have closed"); + req->SetError(RPCRequestWrapper::ERR_NODE_SHUTTING_DOWN, "Node shutting down"); + return; + } + + const uint32_t activate_version = req->GetParams()->GetValue(0)._intval32; + auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(activate_version); + cmd->setPriority(api::StorageMessage::VERYHIGH); + + LOG(debug, "Got state activation request for version %u", activate_version); + + detach_and_forward_to_enqueuer(std::move(cmd), req); +} + } diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.h b/storage/src/vespa/storage/storageserver/fnetlistener.h index abcba18e0be..2097be15491 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.h +++ b/storage/src/vespa/storage/storageserver/fnetlistener.h @@ -26,6 +26,7 @@ public: void RPC_setSystemState2(FRT_RPCRequest *req); void RPC_getCurrentTime(FRT_RPCRequest *req); void RPC_setDistributionStates(FRT_RPCRequest* req); + void RPC_activateClusterStateVersion(FRT_RPCRequest* req); void registerHandle(vespalib::stringref handle); void close(); diff --git a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp index 5b7e0ab4621..1f854bc724e 100644 --- a/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp +++ b/storage/src/vespa/storage/storageserver/slime_cluster_state_bundle_codec.cpp @@ -53,6 +53,9 @@ EncodedClusterStateBundle SlimeClusterStateBundleCodec::encode( { vespalib::Slime slime; Cursor& root = slime.setObject(); + if (bundle.deferredActivation()) { + root.setBool("deferred-activation", bundle.deferredActivation()); + } Cursor& states = root.setObject("states"); states.setString("baseline", serialize_state(*bundle.getBaselineClusterState())); Cursor& spaces = states.setObject("spaces"); @@ -79,6 +82,7 @@ namespace { static const Memory StatesField("states"); static const Memory BaselineField("baseline"); static const Memory SpacesField("spaces"); +static const Memory DeferredActivationField("deferred-activation"); struct StateInserter : vespalib::slime::ObjectTraverser { lib::ClusterStateBundle::BucketSpaceStateMapping& _space_states; @@ -118,8 +122,11 @@ std::shared_ptr<const lib::ClusterStateBundle> SlimeClusterStateBundleCodec::dec lib::ClusterStateBundle::BucketSpaceStateMapping space_states; StateInserter inserter(space_states); spaces.traverse(inserter); + + const bool deferred_activation = root[DeferredActivationField].asBool(); // Defaults to false if not set. + // TODO add shared_ptr constructor for baseline? - return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states)); + return std::make_shared<lib::ClusterStateBundle>(baseline, std::move(space_states), deferred_activation); } } diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 95cb5dec696..af01a880fea 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -514,6 +514,19 @@ StateManager::onSetSystemState( return true; } +bool +StateManager::onActivateClusterStateVersion( + const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) +{ + auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(*cmd); + { + vespalib::LockGuard lock(_stateLock); + reply->setActualVersion(_systemState ? _systemState->getVersion() : 0); + } + sendUp(reply); + return true; +} + void StateManager::run(framework::ThreadHandle& thread) { diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index 0bacd41f6d9..57f0e02a136 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -137,6 +137,7 @@ private: bool onGetNodeState(const std::shared_ptr<api::GetNodeStateCommand>&) override; bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>&) override; + bool onActivateClusterStateVersion(const std::shared_ptr<api::ActivateClusterStateVersionCommand>&) override; /** * _stateLock MUST NOT be held while calling. |