diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-13 13:37:23 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-14 14:44:29 +0000 |
commit | 4a18ca637cff723bcc45acc425689a69bcf4db66 (patch) | |
tree | 4c91b293de883cf16a0d8f024536498fce2da70a /storage | |
parent | b97b0a3cf981c20ac1c5b7733116c5a218aa2c9b (diff) |
Move non-owned buckets to read-only DB and allow use for read-only ops
Diffstat (limited to 'storage')
12 files changed, 321 insertions, 66 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index bad6e80de47..6c70988cbc6 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -26,6 +26,7 @@ using document::test::makeDocumentBucket; using document::test::makeBucketSpace; using document::BucketSpace; using document::FixedBucketSpaces; +using document::BucketId; namespace storage::distributor { @@ -113,6 +114,7 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection); CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change); + CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled); CPPUNIT_TEST_SUITE_END(); /* @@ -186,6 +188,7 @@ protected: void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted(); void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection(); void non_owned_buckets_moved_to_read_only_db_on_ownership_change(); + void non_owned_buckets_purged_when_read_only_support_is_config_disabled(); auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } @@ -243,7 +246,7 @@ public: uint32_t bucketCount, uint32_t invalidBucketCount = 0) { - RequestBucketInfoReply* sreply = new RequestBucketInfoReply(cmd); + auto sreply = std::make_shared<RequestBucketInfoReply>(cmd); sreply->setAddress(storageAddress(storageIndex)); api::RequestBucketInfoReply::EntryVector &vec = sreply->getBucketInfo(); @@ -276,7 +279,7 @@ public: } } - return std::shared_ptr<api::RequestBucketInfoReply>(sreply); + return sreply; } void fakeBucketReply(const lib::ClusterState &state, @@ -619,19 +622,17 @@ public: } }; - auto createPendingStateFixtureForStateChange( + std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForStateChange( const std::string& oldClusterState, const std::string& newClusterState) { - return std::make_unique<PendingClusterStateFixture>( - *this, oldClusterState, newClusterState); + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState, newClusterState); } - auto createPendingStateFixtureForDistributionChange( + std::unique_ptr<PendingClusterStateFixture> createPendingStateFixtureForDistributionChange( const std::string& oldClusterState) { - return std::make_unique<PendingClusterStateFixture>( - *this, oldClusterState); + return std::make_unique<PendingClusterStateFixture>(*this, oldClusterState); } }; @@ -639,8 +640,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION(BucketDBUpdaterTest); BucketDBUpdaterTest::BucketDBUpdaterTest() : CppUnit::TestFixture(), - DistributorTestUtil(), - _bucketSpaces() + DistributorTestUtil(), + _bucketSpaces() { } @@ -2617,8 +2618,83 @@ void BucketDBUpdaterTest::global_distribution_hash_falls_back_to_legacy_format_u CPPUNIT_ASSERT_EQUAL(current_hash, new_current_req.getDistributionHash()); } +namespace { + +template <typename Func> +void for_each_bucket(const BucketDatabase& db, Func f) { + BucketId last(0); + auto e = db.getNext(last); + while (e.valid()) { + f(e); + e = db.getNext(e.getBucketId()); + } +} + +} + +using ConfigBuilder = vespa::config::content::core::StorDistributormanagerConfigBuilder; + void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_change() { + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + setSystemState(initial_state); + + CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + constexpr uint32_t n_buckets = 10; + completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + _sender.clear(); + + auto& default_db = mutable_repo().get(makeBucketSpace()).getBucketDatabase(); + auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase(); + + CPPUNIT_ASSERT_EQUAL(size_t(n_buckets), default_db.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + + lib::ClusterState pending_state("distributor:2 storage:4"); + + // TODO iterate over buckets with spaces instead + std::unordered_set<BucketId, BucketId::hash> buckets_not_owned_in_pending_state; + for_each_bucket(default_db, [&](const auto& entry) { + if (!getBucketDBUpdater().getDistributorComponent() + .ownsBucketInState(pending_state, makeDocumentBucket(entry.getBucketId()))) { + buckets_not_owned_in_pending_state.insert(entry.getBucketId()); + } + }); + CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty()); + + setSystemState(pending_state); + + CPPUNIT_ASSERT_EQUAL(size_t(n_buckets - buckets_not_owned_in_pending_state.size()), default_db.size()); + CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size()); + + // TODO replace with gmock unordered set equality matcher + for_each_bucket(read_only_db, [&](const auto& entry) { + CPPUNIT_ASSERT(buckets_not_owned_in_pending_state.find(entry.getBucketId()) + != buckets_not_owned_in_pending_state.end()); + }); + + // TODO check global space too! +} + +void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() { + getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); + + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + setSystemState(initial_state); + + CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + constexpr uint32_t n_buckets = 10; + completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + _sender.clear(); + + auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase(); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + lib::ClusterState pending_state("distributor:2 storage:4"); + setSystemState(pending_state); + // No buckets should be moved into read only db + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); } } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index ddf88f50c36..ebf9d4e3bc8 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/operations/external/getoperation.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/update/documentupdate.h> @@ -20,8 +21,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest); CPPUNIT_TEST(testBucketSplitMask); - CPPUNIT_TEST(testOperationRejectedOnWrongDistribution); - CPPUNIT_TEST(testOperationRejectedOnPendingWrongDistribution); + CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution); + CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution); + CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution); CPPUNIT_TEST(reject_put_if_not_past_safe_time_point); CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point); CPPUNIT_TEST(reject_update_if_not_past_safe_time_point); @@ -37,6 +39,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict); CPPUNIT_TEST(sequencing_works_across_mutation_types); CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled); + CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period); + CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period); + CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled); CPPUNIT_TEST_SUITE_END(); document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state); @@ -49,6 +54,7 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const; + std::shared_ptr<api::UpdateCommand> makeUpdateCommandForUser(uint64_t id) const; std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const; @@ -80,10 +86,14 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"}; + // Returns an arbitrary bucket not owned in the pending state + document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled); + protected: void testBucketSplitMask(); - void testOperationRejectedOnWrongDistribution(); - void testOperationRejectedOnPendingWrongDistribution(); + void mutating_operation_wdr_bounced_on_wrong_current_distribution(); + void read_only_operation_wdr_bounced_on_wrong_current_distribution(); + void mutating_operation_busy_bounced_on_wrong_pending_distribution(); void reject_put_if_not_past_safe_time_point(); void reject_remove_if_not_past_safe_time_point(); void reject_update_if_not_past_safe_time_point(); @@ -99,6 +109,9 @@ protected: void concurrent_get_and_mutation_do_not_conflict(); void sequencing_works_across_mutation_types(); void sequencing_can_be_explicitly_config_disabled(); + void gets_are_started_with_mutable_db_outside_transition_period(); + void gets_are_started_with_read_only_db_during_transition_period(); + void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled(); void assert_rejection_due_to_unsafe_time( std::shared_ptr<api::StorageCommand> cmd); @@ -220,6 +233,11 @@ ExternalOperationHandlerTest::makeUpdateCommand() const { return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz"); } +std::shared_ptr<api::UpdateCommand> +ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const { + return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id)); +} + std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand( const vespalib::string& doc_type, const vespalib::string& id) const { @@ -233,7 +251,27 @@ std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveComm } void -ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution() +ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution() +{ + createLinks(); + std::string state("distributor:2 storage:2"); + setupDistributor(1, 2, state); + + document::BucketId bucket(findNonOwnedUserBucketInState(state)); + auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits()); + + Operation::SP genOp; + CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); + CPPUNIT_ASSERT(!genOp.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(WRONG_DISTRIBUTION, " + "distributor:2 storage:2)"), + _sender.replies[0]->getResult().toString()); +} + +void +ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); std::string state("distributor:2 storage:2"); @@ -253,35 +291,27 @@ ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution() } void -ExternalOperationHandlerTest::testOperationRejectedOnPendingWrongDistribution() +ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution() { createLinks(); - std::string current("distributor:2 storage:2"); - std::string pending("distributor:3 storage:3"); + std::string current("version:10 distributor:2 storage:2"); + std::string pending("version:11 distributor:3 storage:3"); setupDistributor(1, 3, current); document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending)); // Trigger pending cluster state - auto stateCmd = std::make_shared<api::SetSystemStateCommand>( - lib::ClusterState(pending)); + auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending)); getBucketDBUpdater().onSetSystemState(stateCmd); - auto cmd = makeGetCommandForUser(b.withoutCountBits()); + auto cmd = makeUpdateCommandForUser(b.withoutCountBits()); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); - // Fail back with _pending_ cluster state so client can start trying - // correct distributor immediately. If that distributor has not yet - // completed processing its pending cluster state, it'll return the - // old (current) cluster state, causing the client to bounce between - // the two until the pending states have been resolved. This is pretty - // much inevitable with the current design. CPPUNIT_ASSERT_EQUAL( - std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:3 storage:3)"), + std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"), _sender.replies[0]->getResult().toString()); } @@ -486,6 +516,52 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled( start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); } +void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() { + createLinks(); + std::string current = "distributor:1 storage:3"; + setupDistributor(1, 3, current); + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + + document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter + + auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits())); + auto& get_op = dynamic_cast<GetOperation&>(*op); + const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); +} + +document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) { + createLinks(); + std::string current = "version:123 distributor:2 storage:2"; + std::string pending = "version:321 distributor:3 storage:3"; + setupDistributor(1, 3, current); + getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled); + + // Trigger pending cluster state + auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending)); + getBucketDBUpdater().onSetSystemState(stateCmd); + return findOwned1stNotOwned2ndInStates(current, pending); +} + +void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() { + auto non_owned_bucket = set_up_pending_cluster_state_transition(true); + + auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); + auto& get_op = dynamic_cast<GetOperation&>(*op); + const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); +} + +void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() { + auto non_owned_bucket = set_up_pending_cluster_state_transition(false); + + start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"), + _sender.replies[0]->getResult().toString()); + +} + // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index 44cf56fdff8..294ce56f536 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -34,6 +34,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _enableHostInfoReporting(true), _disableBucketActivation(false), _sequenceMutatingOperations(true), + _allowStaleReadsDuringClusterStateTransitions(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } @@ -144,6 +145,7 @@ DistributorConfiguration::configure(const vespa::config::content::core::StorDist _enableHostInfoReporting = config.enableHostInfoReporting; _disableBucketActivation = config.disableBucketActivation; _sequenceMutatingOperations = config.sequenceMutatingOperations; + _allowStaleReadsDuringClusterStateTransitions = config.allowStaleReadsDuringClusterStateTransitions; _minimumReplicaCountingMode = config.minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index 5dfc4f66cb8..8c84fef47b5 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -235,6 +235,13 @@ public: void setSequenceMutatingOperations(bool sequenceMutations) noexcept { _sequenceMutatingOperations = sequenceMutations; } + + bool allowStaleReadsDuringClusterStateTransitions() const noexcept { + return _allowStaleReadsDuringClusterStateTransitions; + } + void setAllowStaleReadsDuringClusterStateTransitions(bool allow) noexcept { + _allowStaleReadsDuringClusterStateTransitions = allow; + } private: DistributorConfiguration(const DistributorConfiguration& other); @@ -274,6 +281,7 @@ private: bool _enableHostInfoReporting; bool _disableBucketActivation; bool _sequenceMutatingOperations; + bool _allowStaleReadsDuringClusterStateTransitions; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/config/stor-distributormanager.def b/storage/src/vespa/storage/config/stor-distributormanager.def index 89aad427ca9..d4f69073cc6 100644 --- a/storage/src/vespa/storage/config/stor-distributormanager.def +++ b/storage/src/vespa/storage/config/stor-distributormanager.def @@ -184,3 +184,10 @@ sequence_mutating_operations bool default=true ## towards a node if it has indicated that its merge queues are full or it is ## suffering from resource exhaustion. inhibit_merge_sending_on_busy_node_duration_sec int default=10 + +## If set, enables potentially stale reads during cluster state transitions where +## buckets change ownership. This also implicitly enables support for two-phase +## cluster state transitions on the distributor. +## For this option to take effect, the cluster controller must also have two-phase +## states enabled. +allow_stale_reads_during_cluster_state_transitions bool default=false diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 97028c20191..9c6ec4a7f0e 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -114,24 +114,33 @@ void BucketDBUpdater::removeSuperfluousBuckets( const lib::ClusterStateBundle& newState) { + // TODO too many indirections to get at config. + const bool move_to_read_only_db = _distributorComponent.getDistributor().getConfig() + .allowStaleReadsDuringClusterStateTransitions(); for (auto &elem : _distributorComponent.getBucketSpaceRepo()) { const auto &newDistribution(elem.second->getDistribution()); const auto &oldClusterState(elem.second->getClusterState()); auto &bucketDb(elem.second->getBucketDatabase()); + auto& readOnlyDb(_distributorComponent.getReadOnlyBucketSpaceRepo().get(elem.first).getBucketDatabase()); // Remove all buckets not belonging to this distributor, or // being on storage nodes that are no longer up. NodeRemover proc( oldClusterState, *newState.getDerivedClusterState(elem.first), - _distributorComponent.getBucketIdFactory(), _distributorComponent.getIndex(), newDistribution, _distributorComponent.getDistributor().getStorageNodeUpStates()); bucketDb.forEach(proc); - for (const auto & entry :proc.getBucketsToRemove()) { - bucketDb.remove(entry); + // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory... + for (const auto & bucket : proc.getBucketsToRemove()) { + if (move_to_read_only_db) { + // TODO explicit transfer function? + auto db_entry = bucketDb.get(bucket); + readOnlyDb.update(db_entry); // TODO Entry move support + } + bucketDb.remove(bucket); } } } @@ -202,7 +211,6 @@ BucketDBUpdater::onSetSystemState( } ensureTransitionTimerStarted(); - // TODO removeSuperfluousBuckets(cmd->getClusterStateBundle()); replyToPreviousPendingClusterStateIfAny(); diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index a31b62d4e9b..18893edaeda 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -192,7 +192,6 @@ private: public: NodeRemover(const lib::ClusterState& oldState, const lib::ClusterState& s, - [[maybe_unused]] const document::BucketIdFactory& factory, uint16_t localIndex, const lib::Distribution& distribution, const char* upStates) @@ -202,7 +201,7 @@ private: _distribution(distribution), _upStates(upStates) {} - ~NodeRemover(); + ~NodeRemover() override; bool process(BucketDatabase::Entry& e) override; void logRemove(const document::BucketId& bucketId, const char* msg) const; bool distributorOwnsBucket(const document::BucketId&) const; diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp index 927dc06182d..83923a1f00e 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp +++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp @@ -17,7 +17,7 @@ DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt) removelocations(lt, PersistenceOperationMetricSet("removelocations"), this), gets(lt, PersistenceOperationMetricSet("gets"), this), stats(lt, PersistenceOperationMetricSet("stats"), this), - multioperations(lt, PersistenceOperationMetricSet("multioperations"), this), + getbucketlists(lt, PersistenceOperationMetricSet("getbucketlists"), this), visits(lt, VisitorMetricSet(), this), stateTransitionTime("state_transition_time", {}, "Time it takes to complete a cluster state transition. If a " diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h index 5a64027f500..dfe976a89ab 100644 --- a/storage/src/vespa/storage/distributor/distributormetricsset.h +++ b/storage/src/vespa/storage/distributor/distributormetricsset.h @@ -20,7 +20,7 @@ public: metrics::LoadMetric<PersistenceOperationMetricSet> removelocations; metrics::LoadMetric<PersistenceOperationMetricSet> gets; metrics::LoadMetric<PersistenceOperationMetricSet> stats; - metrics::LoadMetric<PersistenceOperationMetricSet> multioperations; + metrics::LoadMetric<PersistenceOperationMetricSet> getbucketlists; metrics::LoadMetric<VisitorMetricSet> visits; metrics::DoubleAverageMetric stateTransitionTime; metrics::DoubleAverageMetric recoveryModeTime; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 447a5bb156c..5d3261a71f7 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -20,6 +20,8 @@ #include "distributor_bucket_space.h" #include <vespa/log/log.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> + LOG_SETUP(".distributor.manager"); namespace storage::distributor { @@ -70,19 +72,60 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) return true; } +void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { + // Distributor ownership is equal across cluster states, so always send back default state. + // This also helps client avoid getting confused by possibly observing different actual + // states for global/non-global document types for the same state version. + auto cluster_state_str = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()) + .getClusterState().toString(); + LOG(debug, "Got message with wrong distribution, sending back state '%s'", + cluster_state_str.c_str()); + + api::StorageReply::UP reply(cmd.makeReply()); + api::ReturnCode ret(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str); + reply->setResult(ret); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + +void ExternalOperationHandler::bounce_with_busy_during_state_transition( + api::StorageCommand& cmd, + const lib::ClusterState& current_state, + const lib::ClusterState& pending_state) +{ + auto status_str = vespalib::make_string("Currently pending cluster state transition" + " from version %u to %u", + current_state.getVersion(), pending_state.getVersion()); + + api::StorageReply::UP reply(cmd.makeReply()); + api::ReturnCode ret(api::ReturnCode::BUSY, status_str); + reply->setResult(ret); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + bool ExternalOperationHandler::checkTimestampMutationPreconditions(api::StorageCommand& cmd, const document::BucketId &bucketId, PersistenceOperationMetricSet& persistenceMetrics) { document::Bucket bucket(cmd.getBucket().getBucketSpace(), bucketId); - if (!checkDistribution(cmd, bucket)) { + if (!ownsBucketInCurrentState(bucket)) { LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", cmd.toString().c_str(), bucket.toString().c_str()); - + bounce_with_wrong_distribution(cmd); persistenceMetrics.failures.wrongdistributor.inc(); return false; } + + auto pending = getDistributor().checkOwnershipInPendingState(bucket); + if (!pending.isOwned()) { + // We return BUSY here instead of WrongDistributionReply to avoid clients potentially + // ping-ponging between cluster state versions during a state transition. + auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + auto& pending_state = pending.getNonOwnedState(); + bounce_with_busy_during_state_transition(cmd, current_state, pending_state); + return false; + } + if (!checkSafeTimeReached(cmd)) { persistenceMetrics.failures.safe_time_not_reached.inc(); return false; @@ -113,6 +156,35 @@ bool ExternalOperationHandler::allowMutation(const SequencingHandle& handle) con return handle.valid(); } +template <typename Func> +void ExternalOperationHandler::bounce_or_invoke_read_only_op( + api::StorageCommand& cmd, + const document::Bucket& bucket, + PersistenceOperationMetricSet& metrics, + Func func) +{ + if (!ownsBucketInCurrentState(bucket)) { + LOG(debug, "Distributor manager received %s, bucket %s with wrong distribution", + cmd.toString().c_str(), bucket.toString().c_str()); + bounce_with_wrong_distribution(cmd); + metrics.failures.wrongdistributor.inc(); + return; + } + + auto pending = getDistributor().checkOwnershipInPendingState(bucket); + if (pending.isOwned()) { + func(_bucketSpaceRepo); + } else { + if (getDistributor().getConfig().allowStaleReadsDuringClusterStateTransitions()) { + func(_readOnlyBucketSpaceRepo); + } else { + auto& current_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + auto& pending_state = pending.getNonOwnedState(); + bounce_with_busy_during_state_transition(cmd, current_state, pending_state); + } + } +} + IMPL_MSG_COMMAND_H(ExternalOperationHandler, Put) { auto& metrics = getMetrics().puts[cmd->getLoadType()]; @@ -188,10 +260,8 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) RemoveLocationOperation::getBucketId(*this, *cmd, bid); document::Bucket bucket(cmd->getBucket().getBucketSpace(), bid); - if (!checkDistribution(*cmd, bucket)) { - LOG(debug, "Distributor manager received %s with wrong distribution", cmd->toString().c_str()); - - getMetrics().removelocations[cmd->getLoadType()].failures.wrongdistributor.inc(); + auto& metrics = getMetrics().removelocations[cmd->getLoadType()]; + if (!checkTimestampMutationPreconditions(*cmd, bucket.getBucketId(), metrics)) { return true; } @@ -203,43 +273,38 @@ IMPL_MSG_COMMAND_H(ExternalOperationHandler, RemoveLocation) IMPL_MSG_COMMAND_H(ExternalOperationHandler, Get) { document::Bucket bucket(cmd->getBucket().getBucketSpace(), getBucketId(cmd->getDocumentId())); - if (!checkDistribution(*cmd, bucket)) { - LOG(debug, "Distributor manager received get for %s, bucket %s with wrong distribution", - cmd->getDocumentId().toString().c_str(), bucket.toString().c_str()); - - getMetrics().gets[cmd->getLoadType()].failures.wrongdistributor.inc(); - return true; - } - - _op = std::make_shared<GetOperation>(*this, _bucketSpaceRepo.get(cmd->getBucket().getBucketSpace()), - cmd, getMetrics().gets[cmd->getLoadType()]); + auto& metrics = getMetrics().gets[cmd->getLoadType()]; + bounce_or_invoke_read_only_op(*cmd, bucket, metrics, [&](auto& bucket_space_repo) { + _op = std::make_shared<GetOperation>(*this, bucket_space_repo.get(cmd->getBucket().getBucketSpace()), + cmd, metrics); + }); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, StatBucket) { - if (!checkDistribution(*cmd, cmd->getBucket())) { - return true; - } - auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); - _op = std::make_shared<StatBucketOperation>(*this, distributorBucketSpace, cmd); + auto& metrics = getMetrics().stats[cmd->getLoadType()]; + bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { + auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); + _op = std::make_shared<StatBucketOperation>(*this, bucket_space, cmd); + }); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, GetBucketList) { - if (!checkDistribution(*cmd, cmd->getBucket())) { - return true; - } - auto bucketSpace(cmd->getBucket().getBucketSpace()); - auto &distributorBucketSpace(_bucketSpaceRepo.get(bucketSpace)); - auto &bucketDatabase(distributorBucketSpace.getBucketDatabase()); - _op = std::make_shared<StatBucketListOperation>(bucketDatabase, _operationGenerator, getIndex(), cmd); + auto& metrics = getMetrics().getbucketlists[cmd->getLoadType()]; + bounce_or_invoke_read_only_op(*cmd, cmd->getBucket(), metrics, [&](auto& bucket_space_repo) { + auto& bucket_space = bucket_space_repo.get(cmd->getBucket().getBucketSpace()); + auto& bucket_database = bucket_space.getBucketDatabase(); + _op = std::make_shared<StatBucketListOperation>(bucket_database, _operationGenerator, getIndex(), cmd); + }); return true; } IMPL_MSG_COMMAND_H(ExternalOperationHandler, CreateVisitor) { + // TODO same handling as Gets (VisitorOperation needs to change) const DistributorConfiguration& config(getDistributor().getConfig()); VisitorOperation::Config visitorConfig(config.getMinBucketsPerVisitor(), config.getMaxVisitorsPerNodePerClientVisitor()); auto &distributorBucketSpace(_bucketSpaceRepo.get(cmd->getBucket().getBucketSpace())); diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index 67fff2a9f39..bd51a914ea8 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -41,7 +41,7 @@ public: const MaintenanceOperationGenerator&, DistributorComponentRegister& compReg); - ~ExternalOperationHandler(); + ~ExternalOperationHandler() override; bool handleMessage(const std::shared_ptr<api::StorageMessage>& msg, Operation::SP& operation); @@ -56,6 +56,17 @@ private: Operation::SP _op; TimePoint _rejectFeedBeforeTimeReached; + template <typename Func> + void bounce_or_invoke_read_only_op(api::StorageCommand& cmd, + const document::Bucket& bucket, + PersistenceOperationMetricSet& metrics, + Func f); + + void bounce_with_wrong_distribution(api::StorageCommand& cmd); + void bounce_with_busy_during_state_transition(api::StorageCommand& cmd, + const lib::ClusterState& current_state, + const lib::ClusterState& pending_state); + bool checkSafeTimeReached(api::StorageCommand& cmd); api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime); bool checkTimestampMutationPreconditions( diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 198c588dfd1..3936f13077e 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -34,6 +34,9 @@ public: bool hasConsistentCopies() const; + // Exposed for unit testing. TODO feels a bit dirty :I + const DistributorBucketSpace& bucketSpace() const noexcept { return _bucketSpace; } + private: class GroupId { public: |