// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include using document::test::makeDocumentBucket; namespace storage::distributor { class ExternalOperationHandlerTest : public CppUnit::TestFixture, public DistributorTestUtil { document::TestDocMan _testDocMan; CPPUNIT_TEST_SUITE(ExternalOperationHandlerTest); CPPUNIT_TEST(testBucketSplitMask); CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution); CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution); CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution); CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(reject_put_if_not_past_safe_time_point); CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point); CPPUNIT_TEST(reject_update_if_not_past_safe_time_point); CPPUNIT_TEST(get_not_rejected_by_unsafe_time_point); CPPUNIT_TEST(mutation_not_rejected_when_safe_point_reached); CPPUNIT_TEST(reject_put_with_concurrent_mutation_to_same_id); CPPUNIT_TEST(do_not_reject_put_operations_to_different_ids); CPPUNIT_TEST(reject_remove_with_concurrent_mutation_to_same_id); CPPUNIT_TEST(do_not_reject_remove_operations_to_different_ids); CPPUNIT_TEST(reject_update_with_concurrent_mutation_to_same_id); CPPUNIT_TEST(do_not_reject_update_operations_to_different_ids); CPPUNIT_TEST(operation_destruction_allows_new_mutations_for_id); CPPUNIT_TEST(concurrent_get_and_mutation_do_not_conflict); CPPUNIT_TEST(sequencing_works_across_mutation_types); CPPUNIT_TEST(sequencing_can_be_explicitly_config_disabled); CPPUNIT_TEST(gets_are_started_with_mutable_db_outside_transition_period); CPPUNIT_TEST(gets_are_started_with_read_only_db_during_transition_period); CPPUNIT_TEST(gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled); CPPUNIT_TEST_SUITE_END(); document::BucketId findNonOwnedUserBucketInState(vespalib::stringref state); document::BucketId findOwned1stNotOwned2ndInStates( vespalib::stringref state1, vespalib::stringref state2); std::shared_ptr makeGetCommandForUser(uint64_t id) const; std::shared_ptr makeGetCommand(const vespalib::string& id) const; std::shared_ptr makeUpdateCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr makeUpdateCommand() const; std::shared_ptr makeUpdateCommandForUser(uint64_t id) const; std::shared_ptr makePutCommand(const vespalib::string& doc_type, const vespalib::string& id) const; std::shared_ptr makeRemoveCommand(const vespalib::string& id) const; void verify_busy_bounced_due_to_no_active_state(std::shared_ptr cmd); Operation::SP start_operation_verify_not_rejected(std::shared_ptr cmd); void start_operation_verify_rejected(std::shared_ptr cmd); int64_t safe_time_not_reached_metric_count( const metrics::LoadMetric& metrics) const { return metrics[documentapi::LoadType::DEFAULT].failures .safe_time_not_reached.getLongValue("count"); } int64_t safe_time_not_reached_metric_count(const metrics::LoadMetric& metrics) const { return metrics[documentapi::LoadType::DEFAULT].failures.safe_time_not_reached.getLongValue("count"); } int64_t concurrent_mutatations_metric_count( const metrics::LoadMetric& metrics) const { return metrics[documentapi::LoadType::DEFAULT].failures .concurrent_mutations.getLongValue("count"); } int64_t concurrent_mutatations_metric_count(const metrics::LoadMetric& metrics) const { return metrics[documentapi::LoadType::DEFAULT].failures.concurrent_mutations.getLongValue("count"); } void set_up_distributor_for_sequencing_test(); const vespalib::string _dummy_id{"id:foo:testdoctype1::bar"}; // Returns an arbitrary bucket not owned in the pending state document::BucketId set_up_pending_cluster_state_transition(bool read_only_enabled); protected: void testBucketSplitMask(); void mutating_operation_wdr_bounced_on_wrong_current_distribution(); void mutating_operation_busy_bounced_on_wrong_pending_distribution(); void mutating_operation_busy_bounced_if_no_cluster_state_received_yet(); void read_only_operation_wdr_bounced_on_wrong_current_distribution(); void read_only_operation_busy_bounced_if_no_cluster_state_received_yet(); void reject_put_if_not_past_safe_time_point(); void reject_remove_if_not_past_safe_time_point(); void reject_update_if_not_past_safe_time_point(); void get_not_rejected_by_unsafe_time_point(); void mutation_not_rejected_when_safe_point_reached(); void reject_put_with_concurrent_mutation_to_same_id(); void do_not_reject_put_operations_to_different_ids(); void reject_remove_with_concurrent_mutation_to_same_id(); void do_not_reject_remove_operations_to_different_ids(); void reject_update_with_concurrent_mutation_to_same_id(); void do_not_reject_update_operations_to_different_ids(); void operation_destruction_allows_new_mutations_for_id(); void concurrent_get_and_mutation_do_not_conflict(); void sequencing_works_across_mutation_types(); void sequencing_can_be_explicitly_config_disabled(); void gets_are_started_with_mutable_db_outside_transition_period(); void gets_are_started_with_read_only_db_during_transition_period(); void gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled(); void assert_rejection_due_to_unsafe_time( std::shared_ptr cmd); void assert_second_command_rejected_due_to_concurrent_mutation( std::shared_ptr cmd1, std::shared_ptr cmd2, const vespalib::string& expected_id_in_message); void assert_second_command_not_rejected_due_to_concurrent_mutation( std::shared_ptr cmd1, std::shared_ptr cmd2); public: void tearDown() override { close(); } }; CPPUNIT_TEST_SUITE_REGISTRATION(ExternalOperationHandlerTest); using document::DocumentId; void ExternalOperationHandlerTest::testBucketSplitMask() { { createLinks(); getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "16"); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0xffff), getExternalOperationHandler().getBucketId(document::DocumentId( vespalib::make_string("userdoc:ns:%d::", 0xffff)) ).stripUnused()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0), getExternalOperationHandler().getBucketId(document::DocumentId( vespalib::make_string("userdoc:ns:%d::", 0x10000)) ).stripUnused()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0xffff), getExternalOperationHandler().getBucketId(document::DocumentId( vespalib::make_string("userdoc:ns:%d::", 0xffff)) ).stripUnused()); CPPUNIT_ASSERT_EQUAL(document::BucketId(16, 0x100), getExternalOperationHandler().getBucketId(document::DocumentId( vespalib::make_string("userdoc:ns:%d::", 0x100)) ).stripUnused()); close(); } { getDirConfig().getConfig("stor-distributormanager").set("minsplitcount", "20"); createLinks(); CPPUNIT_ASSERT_EQUAL(document::BucketId(20, 0x11111), getExternalOperationHandler().getBucketId(document::DocumentId( vespalib::make_string("userdoc:ns:%d::", 0x111111)) ).stripUnused()); CPPUNIT_ASSERT_EQUAL(document::BucketId(20, 0x22222), getExternalOperationHandler().getBucketId(document::DocumentId( vespalib::make_string("userdoc:ns:%d::", 0x222222)) ).stripUnused()); } } document::BucketId ExternalOperationHandlerTest::findNonOwnedUserBucketInState( vespalib::stringref statestr) { lib::ClusterState state(statestr); for (uint64_t i = 1; i < 1000; ++i) { document::BucketId bucket(32, i); if (!getExternalOperationHandler().ownsBucketInState(state, makeDocumentBucket(bucket))) { return bucket; } } throw std::runtime_error("no appropriate bucket found"); } document::BucketId ExternalOperationHandlerTest::findOwned1stNotOwned2ndInStates( vespalib::stringref statestr1, vespalib::stringref statestr2) { lib::ClusterState state1(statestr1); lib::ClusterState state2(statestr2); for (uint64_t i = 1; i < 1000; ++i) { document::BucketId bucket(32, i); if (getExternalOperationHandler().ownsBucketInState(state1, makeDocumentBucket(bucket)) && !getExternalOperationHandler().ownsBucketInState(state2, makeDocumentBucket(bucket))) { return bucket; } } throw std::runtime_error("no appropriate bucket found"); } std::shared_ptr ExternalOperationHandlerTest::makeGetCommand(const vespalib::string& id) const { return std::make_shared(makeDocumentBucket(document::BucketId(0)), DocumentId(id), "[all]"); } std::shared_ptr ExternalOperationHandlerTest::makeGetCommandForUser(uint64_t id) const { DocumentId docId(document::UserDocIdString(vespalib::make_string("userdoc:foo:%" PRIu64 ":bar", id))); return std::make_shared(makeDocumentBucket(document::BucketId(0)), docId, "[all]"); } std::shared_ptr ExternalOperationHandlerTest::makeUpdateCommand( const vespalib::string& doc_type, const vespalib::string& id) const { auto update = std::make_shared( _testDocMan.getTypeRepo(), *_testDocMan.getTypeRepo().getDocumentType(doc_type), document::DocumentId(id)); return std::make_shared( makeDocumentBucket(document::BucketId(0)), std::move(update), api::Timestamp(0)); } std::shared_ptr ExternalOperationHandlerTest::makeUpdateCommand() const { return makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz"); } std::shared_ptr ExternalOperationHandlerTest::makeUpdateCommandForUser(uint64_t id) const { return makeUpdateCommand("testdoctype1", vespalib::make_string("id::testdoctype1:n=%" PRIu64 ":bar", id)); } std::shared_ptr ExternalOperationHandlerTest::makePutCommand( const vespalib::string& doc_type, const vespalib::string& id) const { auto doc = _testDocMan.createDocument(doc_type, id); return std::make_shared( makeDocumentBucket(document::BucketId(0)), std::move(doc), api::Timestamp(0)); } std::shared_ptr ExternalOperationHandlerTest::makeRemoveCommand(const vespalib::string& id) const { return std::make_shared(makeDocumentBucket(document::BucketId(0)), DocumentId(id), api::Timestamp(0)); } void ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); auto cmd = makeUpdateCommandForUser(bucket.withoutCountBits()); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } void ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); auto cmd = makeGetCommandForUser(bucket.withoutCountBits()); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } void ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_distribution() { createLinks(); std::string current("version:10 distributor:2 storage:2"); std::string pending("version:11 distributor:3 storage:3"); setupDistributor(1, 3, current); document::BucketId b(findOwned1stNotOwned2ndInStates(current, pending)); // Trigger pending cluster state auto stateCmd = std::make_shared(lib::ClusterState(pending)); getBucketDBUpdater().onSetSystemState(stateCmd); auto cmd = makeUpdateCommandForUser(b.withoutCountBits()); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 10 to 11)"), _sender.replies[0]->getResult().toString()); } void ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr cmd) { createLinks(); std::string state{}; // No version --> not yet received setupDistributor(1, 2, state); Operation::SP genOp; CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); CPPUNIT_ASSERT(!genOp.get()); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(BUSY, No cluster state activated yet)"), _sender.replies[0]->getResult().toString()); } void ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet() { verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345)); } void ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet() { verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345)); } using TimePoint = ExternalOperationHandler::TimePoint; using namespace std::literals::chrono_literals; void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time( std::shared_ptr cmd) { createLinks(); setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); Operation::SP generated; getExternalOperationHandler().handleMessage(cmd, generated); CPPUNIT_ASSERT(generated.get() == nullptr); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(STALE_TIMESTAMP, " "Operation received at time 9, which is before " "bucket ownership transfer safe time of 10)"), _sender.replies[0]->getResult().toString()); } void ExternalOperationHandlerTest::reject_put_if_not_past_safe_time_point() { assert_rejection_due_to_unsafe_time(makePutCommand("foo", "id:foo:testdoctype1::bar")); CPPUNIT_ASSERT_EQUAL(int64_t(1), safe_time_not_reached_metric_count( getDistributor().getMetrics().puts)); } void ExternalOperationHandlerTest::reject_remove_if_not_past_safe_time_point() { assert_rejection_due_to_unsafe_time(makeRemoveCommand("id:foo:testdoctype1::bar")); CPPUNIT_ASSERT_EQUAL(int64_t(1), safe_time_not_reached_metric_count( getDistributor().getMetrics().removes)); } void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() { assert_rejection_due_to_unsafe_time(makeUpdateCommand()); CPPUNIT_ASSERT_EQUAL(int64_t(1), safe_time_not_reached_metric_count( getDistributor().getMetrics().updates)); } void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { createLinks(); setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); Operation::SP generated; getExternalOperationHandler().handleMessage( makeGetCommandForUser(0), generated); CPPUNIT_ASSERT(generated.get() != nullptr); CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL(int64_t(0), safe_time_not_reached_metric_count( getDistributor().getMetrics().gets)); } void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() { createLinks(); setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(10); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); Operation::SP generated; DocumentId id("id:foo:testdoctype1::bar"); getExternalOperationHandler().handleMessage( std::make_shared( makeDocumentBucket(document::BucketId(0)), id, api::Timestamp(0)), generated); CPPUNIT_ASSERT(generated.get() != nullptr); CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL(int64_t(0), safe_time_not_reached_metric_count( getDistributor().getMetrics().removes)); } void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() { createLinks(); setupDistributor(1, 2, "version:1 distributor:1 storage:1"); } Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected( std::shared_ptr cmd) { Operation::SP generated; _sender.replies.clear(); getExternalOperationHandler().handleMessage(cmd, generated); CPPUNIT_ASSERT(generated.get() != nullptr); CPPUNIT_ASSERT_EQUAL(size_t(0), _sender.replies.size()); return generated; } void ExternalOperationHandlerTest::start_operation_verify_rejected( std::shared_ptr cmd) { Operation::SP generated; _sender.replies.clear(); getExternalOperationHandler().handleMessage(cmd, generated); CPPUNIT_ASSERT(generated.get() == nullptr); CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); } void ExternalOperationHandlerTest::assert_second_command_rejected_due_to_concurrent_mutation( std::shared_ptr cmd1, std::shared_ptr cmd2, const vespalib::string& expected_id_in_message) { set_up_distributor_for_sequencing_test(); // Must hold ref to started operation, or sequencing handle will be released. Operation::SP generated1 = start_operation_verify_not_rejected(std::move(cmd1)); start_operation_verify_rejected(std::move(cmd2)); // TODO reconsider BUSY return code. Need something transient and non-noisy CPPUNIT_ASSERT_EQUAL( std::string(vespalib::make_string( "ReturnCode(BUSY, A mutating operation for document " "'%s' is already in progress)", expected_id_in_message.c_str())), _sender.replies[0]->getResult().toString()); } void ExternalOperationHandlerTest::assert_second_command_not_rejected_due_to_concurrent_mutation( std::shared_ptr cmd1, std::shared_ptr cmd2) { set_up_distributor_for_sequencing_test(); Operation::SP generated1 = start_operation_verify_not_rejected(std::move(cmd1)); start_operation_verify_not_rejected(std::move(cmd2)); } void ExternalOperationHandlerTest::reject_put_with_concurrent_mutation_to_same_id() { assert_second_command_rejected_due_to_concurrent_mutation( makePutCommand("testdoctype1", _dummy_id), makePutCommand("testdoctype1", _dummy_id), _dummy_id); CPPUNIT_ASSERT_EQUAL(int64_t(1), concurrent_mutatations_metric_count(getDistributor().getMetrics().puts)); } void ExternalOperationHandlerTest::do_not_reject_put_operations_to_different_ids() { assert_second_command_not_rejected_due_to_concurrent_mutation( makePutCommand("testdoctype1", "id:foo:testdoctype1::baz"), makePutCommand("testdoctype1", "id:foo:testdoctype1::foo")); CPPUNIT_ASSERT_EQUAL(int64_t(0), concurrent_mutatations_metric_count(getDistributor().getMetrics().puts)); } void ExternalOperationHandlerTest::reject_remove_with_concurrent_mutation_to_same_id() { assert_second_command_rejected_due_to_concurrent_mutation( makeRemoveCommand(_dummy_id), makeRemoveCommand(_dummy_id), _dummy_id); CPPUNIT_ASSERT_EQUAL(int64_t(1), concurrent_mutatations_metric_count(getDistributor().getMetrics().removes)); } void ExternalOperationHandlerTest::do_not_reject_remove_operations_to_different_ids() { assert_second_command_not_rejected_due_to_concurrent_mutation( makeRemoveCommand("id:foo:testdoctype1::baz"), makeRemoveCommand("id:foo:testdoctype1::foo")); CPPUNIT_ASSERT_EQUAL(int64_t(0), concurrent_mutatations_metric_count(getDistributor().getMetrics().removes)); } void ExternalOperationHandlerTest::reject_update_with_concurrent_mutation_to_same_id() { assert_second_command_rejected_due_to_concurrent_mutation( makeUpdateCommand("testdoctype1", _dummy_id), makeUpdateCommand("testdoctype1", _dummy_id), _dummy_id); CPPUNIT_ASSERT_EQUAL(int64_t(1), concurrent_mutatations_metric_count(getDistributor().getMetrics().updates)); } void ExternalOperationHandlerTest::do_not_reject_update_operations_to_different_ids() { assert_second_command_not_rejected_due_to_concurrent_mutation( makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::baz"), makeUpdateCommand("testdoctype1", "id:foo:testdoctype1::foo")); CPPUNIT_ASSERT_EQUAL(int64_t(0), concurrent_mutatations_metric_count(getDistributor().getMetrics().updates)); } void ExternalOperationHandlerTest::operation_destruction_allows_new_mutations_for_id() { set_up_distributor_for_sequencing_test(); Operation::SP generated = start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); generated.reset(); // Implicitly release sequencing handle start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); } void ExternalOperationHandlerTest::concurrent_get_and_mutation_do_not_conflict() { set_up_distributor_for_sequencing_test(); Operation::SP generated1 = start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); start_operation_verify_not_rejected(makeGetCommand(_dummy_id)); } void ExternalOperationHandlerTest::sequencing_works_across_mutation_types() { set_up_distributor_for_sequencing_test(); Operation::SP generated = start_operation_verify_not_rejected(makePutCommand("testdoctype1", _dummy_id)); start_operation_verify_rejected(makeRemoveCommand(_dummy_id)); start_operation_verify_rejected(makeUpdateCommand("testdoctype1", _dummy_id)); } void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled() { set_up_distributor_for_sequencing_test(); // Should be able to modify config after links have been created, i.e. this is a live config. getConfig().setSequenceMutatingOperations(false); Operation::SP generated = start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); // Sequencing is disabled, so concurrent op is not rejected. start_operation_verify_not_rejected(makeRemoveCommand(_dummy_id)); } void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() { createLinks(); std::string current = "version:1 distributor:1 storage:3"; setupDistributor(1, 3, current); getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); document::BucketId b(16, 1234); // Only 1 distributor (us), so doesn't matter auto op = start_operation_verify_not_rejected(makeGetCommandForUser(b.withoutCountBits())); auto& get_op = dynamic_cast(*op); const auto* expected_space = &getBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); } document::BucketId ExternalOperationHandlerTest::set_up_pending_cluster_state_transition(bool read_only_enabled) { createLinks(); std::string current = "version:123 distributor:2 storage:2"; std::string pending = "version:321 distributor:3 storage:3"; setupDistributor(1, 3, current); getConfig().setAllowStaleReadsDuringClusterStateTransitions(read_only_enabled); // Trigger pending cluster state auto stateCmd = std::make_shared(lib::ClusterState(pending)); getBucketDBUpdater().onSetSystemState(stateCmd); return findOwned1stNotOwned2ndInStates(current, pending); } void ExternalOperationHandlerTest::gets_are_started_with_read_only_db_during_transition_period() { auto non_owned_bucket = set_up_pending_cluster_state_transition(true); auto op = start_operation_verify_not_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); auto& get_op = dynamic_cast(*op); const auto* expected_space = &getReadOnlyBucketSpaceRepo().get(document::FixedBucketSpaces::default_space()); CPPUNIT_ASSERT_EQUAL(expected_space, &get_op.bucketSpace()); } void ExternalOperationHandlerTest::gets_are_busy_bounced_during_transition_period_if_stale_reads_disabled() { auto non_owned_bucket = set_up_pending_cluster_state_transition(false); start_operation_verify_rejected(makeGetCommandForUser(non_owned_bucket.withoutCountBits())); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(BUSY, Currently pending cluster state transition from version 123 to 321)"), _sender.replies[0]->getResult().toString()); } // TODO support sequencing of RemoveLocation? It's a mutating operation, but supporting it with // the current approach is not trivial. A RemoveLocation operation covers the _entire_ bucket // sub tree under a given location, while the sequencer works on individual GIDs. Mapping the // former to the latter is not trivial unless we introduce higher level "location" mutation // pseudo-locks in the sequencer. I.e. if we get a RemoveLocation with id.user==123456, this // prevents any handles from being acquired to any GID under location BucketId(32, 123456). }