diff options
Diffstat (limited to 'storage/src/tests/distributor/externaloperationhandlertest.cpp')
-rw-r--r-- | storage/src/tests/distributor/externaloperationhandlertest.cpp | 114 |
1 files changed, 95 insertions, 19 deletions
diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index ddf88f50c36..ebf9d4e3bc8 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -4,6 +4,7 @@ #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/distributor/distributormetricsset.h> +#include <vespa/storage/distributor/operations/external/getoperation.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/update/documentupdate.h> @@ -20,8 +21,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest); CPPUNIT_TEST(testBucketSplitMask); - CPPUNIT_TEST(testOperationRejectedOnWrongDistribution); - CPPUNIT_TEST(testOperationRejectedOnPendingWrongDistribution); + CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution); + CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution); + CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution); CPPUNIT_TEST(reject_put_if_not_past_safe_time_point); CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point); CPPUNIT_TEST(reject_update_if_not_past_safe_time_point); @@ -37,6 +39,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict); CPPUNIT_TEST(sequencing_works_across_mutation_types); CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled); + CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period); + CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period); + CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled); CPPUNIT_TEST_SUITE_END(); document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state); @@ -49,6 +54,7 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, std::shared_ptr<api::UpdateCommand> makeUpdateCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr<api::UpdateCommand> makeUpdateCommand() const; + std::shared_ptr<api::UpdateCommand> makeUpdateCommandForUser(uint64_t id) const; std::shared_ptr<api::PutCommand> makePutCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const; @@ -80,10 +86,14 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"}; + // Returns an arbitrary bucket not owned in the pending state + document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled); + protected: void testBucketSplitMask(); - void testOperationRejectedOnWrongDistribution(); - void testOperationRejectedOnPendingWrongDistribution(); + void mutating_operation_wdr_bounced_on_wrong_current_distribution(); + void read_only_operation_wdr_bounced_on_wrong_current_distribution(); + void mutating_operation_busy_bounced_on_wrong_pending_distribution(); void reject_put_if_not_past_safe_time_point(); void reject_remove_if_not_past_safe_time_point(); void reject_update_if_not_past_safe_time_point(); @@ -99,6 +109,9 @@ protected: void concurrent_get_and_mutation_do_not_conflict(); void sequencing_works_across_mutation_types(); void sequencing_can_be_explicitly_config_disabled(); + void gets_are_started_with_mutable_db_outside_transition_period(); + void gets_are_started_with_read_only_db_during_transition_period(); + void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled(); void assert_rejection_due_to_unsafe_time( std::shared_ptr<api::StorageCommand> cmd); @@ -220,6 +233,11 @@ ExternalOperationHandlerTest::makeUpdateCommand() const { return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz"); } +std::shared_ptr<api::UpdateCommand> +ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const { + return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id)); +} + std::shared_ptr<api::PutCommand> ExternalOperationHandlerTest::makePutCommand( const vespalib::string& doc_type, const vespalib::string& id) const { @@ -233,7 +251,27 @@ std::shared_ptr<api::RemoveCommand> ExternalOperationHandlerTest::makeRemoveComm } void -ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution() +ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution() +{ + createLinks(); + std::string state("distributor:2 storage:2"); + setupDistributor(1, 2, state); + + document::BucketId bucket(findNonOwnedUserBucketInState(state)); + auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits()); + + Operation::SP genOp; + CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); + CPPUNIT_ASSERT(!genOp.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(WRONG_DISTRIBUTION, " + "distributor:2 storage:2)"), + _sender.replies[0]->getResult().toString()); +} + +void +ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); std::string state("distributor:2 storage:2"); @@ -253,35 +291,27 @@ ExternalOperationHandlerTest::testOperationRejectedOnWrongDistribution() } void -ExternalOperationHandlerTest::testOperationRejectedOnPendingWrongDistribution() +ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution() { createLinks(); - std::string current("distributor:2 storage:2"); - std::string pending("distributor:3 storage:3"); + std::string current("version:10 distributor:2 storage:2"); + std::string pending("version:11 distributor:3 storage:3"); setupDistributor(1, 3, current); document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending)); // Trigger pending cluster state - auto stateCmd = std::make_shared<api::SetSystemStateCommand>( - lib::ClusterState(pending)); + auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending)); getBucketDBUpdater().onSetSystemState(stateCmd); - auto cmd = makeGetCommandForUser(b.withoutCountBits()); + auto cmd = makeUpdateCommandForUser(b.withoutCountBits()); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); - // Fail back with _pending_ cluster state so client can start trying - // correct distributor immediately. If that distributor has not yet - // completed processing its pending cluster state, it'll return the - // old (current) cluster state, causing the client to bounce between - // the two until the pending states have been resolved. This is pretty - // much inevitable with the current design. CPPUNIT_ASSERT_EQUAL( - std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:3 storage:3)"), + std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"), _sender.replies[0]->getResult().toString()); } @@ -486,6 +516,52 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled( start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); } +void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() { + createLinks(); + std::string current = "distributor:1 storage:3"; + setupDistributor(1, 3, current); + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + + document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter + + auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits())); + auto& get_op = dynamic_cast<GetOperation&>(*op); + const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); +} + +document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) { + createLinks(); + std::string current = "version:123 distributor:2 storage:2"; + std::string pending = "version:321 distributor:3 storage:3"; + setupDistributor(1, 3, current); + getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled); + + // Trigger pending cluster state + auto stateCmd = std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(pending)); + getBucketDBUpdater().onSetSystemState(stateCmd); + return findOwned1stNotOwned2ndInStates(current, pending); +} + +void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() { + auto non_owned_bucket = set_up_pending_cluster_state_transition(true); + + auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); + auto& get_op = dynamic_cast<GetOperation&>(*op); + const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); + CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); +} + +void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() { + auto non_owned_bucket = set_up_pending_cluster_state_transition(false); + + start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"), + _sender.replies[0]->getResult().toString()); + +} + // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the |