diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-15 16:05:22 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-15 16:05:22 +0000 |
commit | 00804dfe908b6a337f88c42ea3def0e1f1397474 (patch) | |
tree | e4e58c7a54e830fc5a237fe0be945e1f62f40492 /storage/src/tests | |
parent | 1b959abd3224c00f0347a8078dc333abfdd3ce9f (diff) |
Properly handle non-owned vs. missing buckets
Bonus: no more spurious "we have removed buckets" log messages caused
by ownership changes.
Also ensure that we BUSY-bounce operations in `ExternalOperationHandler`
when there is no actual state to send back in a `WrongDistributionReply`.
Diffstat (limited to 'storage/src/tests')
3 files changed, 199 insertions, 31 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index fd9512b2bd6..0a82fadbf68 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -114,19 +114,12 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection); CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change); + CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database); CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled); + CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received); + CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated); CPPUNIT_TEST_SUITE_END(); - /* - * TODO tests - * - buckets moved to read only db on ownership change - * - even when self is down in pending state - * - buckets NOT moved to read only db on content node down/maintenance - * - read only db cleared when cluster state activated - * - explicit cluster state activation path - * - legacy implicit cluster state activation support - */ - public: BucketDBUpdaterTest(); @@ -188,7 +181,10 @@ protected: void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted(); void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection(); void non_owned_buckets_moved_to_read_only_db_on_ownership_change(); + void buckets_no_longer_available_are_not_moved_to_read_only_database(); void non_owned_buckets_purged_when_read_only_support_is_config_disabled(); + void deferred_activated_state_does_not_enable_state_until_activation_received(); + void read_only_db_cleared_once_pending_state_is_activated(); auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } @@ -215,6 +211,19 @@ protected: // cluster state component (just not by operations), so it would not have the expected semantics. DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); } + BucketDatabase& mutable_default_db() noexcept { + return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + BucketDatabase& mutable_global_db() noexcept { + return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + BucketDatabase& read_only_default_db() noexcept { + return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + BucketDatabase& read_only_global_db() noexcept { + return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + static std::string getNodeList(std::vector<uint16_t> nodes, size_t count); std::string getNodeList(std::vector<uint16_t> nodes); @@ -228,6 +237,10 @@ protected: return messagesPerBucketSpace * _bucketSpaces.size(); } + void trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs, + vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs); + public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { @@ -391,8 +404,7 @@ public: void setSystemState(const lib::ClusterState& state) { const size_t sizeBeforeState = _sender.commands.size(); getBucketDBUpdater().onSetSystemState( - std::shared_ptr<api::SetSystemStateCommand>( - new api::SetSystemStateCommand(state))); + std::make_shared<api::SetSystemStateCommand>(state)); // A lot of test logic has the assumption that all messages sent as a // result of cluster state changes will be in increasing index order // (for simplicity, not because this is required for correctness). @@ -401,6 +413,18 @@ public: sortSentMessagesByIndex(_sender, sizeBeforeState); } + void setClusterStateBundle(const lib::ClusterStateBundle& state) { + const size_t sizeBeforeState = _sender.commands.size(); + getBucketDBUpdater().onSetSystemState( + std::make_shared<api::SetSystemStateCommand>(state)); + sortSentMessagesByIndex(_sender, sizeBeforeState); + } + + void activateClusterStateVersion(uint32_t version) { + getBucketDBUpdater().onActivateClusterStateVersion( + std::make_shared<api::ActivateClusterStateVersionCommand>(version)); + } + void completeBucketInfoGathering(const lib::ClusterState& state, size_t expectedMsgs, uint32_t bucketCount = 1, @@ -2640,7 +2664,7 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - setSystemState(initial_state); + setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; @@ -2665,9 +2689,9 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c }); CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty()); - setSystemState(pending_state); + setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation - CPPUNIT_ASSERT_EQUAL(size_t(n_buckets - buckets_not_owned_in_pending_state.size()), default_db.size()); + CPPUNIT_ASSERT_EQUAL(n_buckets - buckets_not_owned_in_pending_state.size(), default_db.size()); CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size()); // TODO replace with gmock unordered set equality matcher @@ -2679,6 +2703,26 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c // TODO check global space too! } +// TODO dedupe setup stuff +void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() { + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + constexpr uint32_t n_buckets = 10; + completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + _sender.clear(); + + // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will + // cause some buckets to be entirely unavailable. + lib::ClusterState pending_state("distributor:1 storage:4 .0.s:d .1.s:m"); + setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); + + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); +} + +// TODO must ensure this works correctly with cluster controller! void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() { getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); @@ -2690,13 +2734,79 @@ void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_con completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); _sender.clear(); - auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase(); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); lib::ClusterState pending_state("distributor:2 storage:4"); setSystemState(pending_state); // No buckets should be moved into read only db - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); +} + +void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs) +{ + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + lib::ClusterState initial_state(initial_state_str); + setSystemState(initial_state); + // TODO decouple expected message count + CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size()); + completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets); + _sender.clear(); + + lib::ClusterState pending_state(pending_state_str); // Ownership change + setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); + CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size()); + completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets); + _sender.clear(); +} + +void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4); + + // Version should not be switched over yet + CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); + + activateClusterStateVersion(2); + + CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size()); } +void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 storage:4", n_buckets, 0); + activateClusterStateVersion(2); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size()); +} + +/* + * TODO tests + * - [X] buckets moved to read only db on ownership change + * - even when self is down in pending state + * - [X] buckets NOT moved to read only db on content node down/maintenance + * - [X] read only db cleared when cluster state activated + * - explicit cluster state activation path + * - legacy implicit cluster state activation support + * - return activation ACK if already activated + * - buckets merged on explicit activation + * - version check for activation msg + * - deferred bundle with non-deferred config? + */ + +// TODO reconcile default deferred activation state of bundle between Java and C++! + } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index ebf9d4e3bc8..76917f856f0 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -23,7 +23,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST(testBucketSplitMask); CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution); CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution); + CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution); + CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(reject_put_if_not_past_safe_time_point); CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point); CPPUNIT_TEST(reject_update_if_not_past_safe_time_point); @@ -59,6 +61,8 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, const vespalib::string& id) const; std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const; + void verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd); + Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd); void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd); @@ -92,8 +96,10 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, protected: void testBucketSplitMask(); void mutating_operation_wdr_bounced_on_wrong_current_distribution(); - void read_only_operation_wdr_bounced_on_wrong_current_distribution(); void mutating_operation_busy_bounced_on_wrong_pending_distribution(); + void mutating_operation_busy_bounced_if_no_cluster_state_received_yet(); + void read_only_operation_wdr_bounced_on_wrong_current_distribution(); + void read_only_operation_busy_bounced_if_no_cluster_state_received_yet(); void reject_put_if_not_past_safe_time_point(); void reject_remove_if_not_past_safe_time_point(); void reject_update_if_not_past_safe_time_point(); @@ -254,7 +260,7 @@ void ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); - std::string state("distributor:2 storage:2"); + std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); @@ -266,7 +272,7 @@ ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_di CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:2 storage:2)"), + "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } @@ -274,7 +280,7 @@ void ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); - std::string state("distributor:2 storage:2"); + std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); @@ -286,7 +292,7 @@ ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_d CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:2 storage:2)"), + "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } @@ -315,6 +321,34 @@ ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_d _sender.replies[0]->getResult().toString()); } +void +ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd) +{ + createLinks(); + std::string state{}; // No version --> not yet received + setupDistributor(1, 2, state); + + Operation::SP genOp; + CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); + CPPUNIT_ASSERT(!genOp.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(BUSY, No cluster state activated yet)"), + _sender.replies[0]->getResult().toString()); +} + +void +ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet() +{ + verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345)); +} + +void +ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet() +{ + verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345)); +} + using TimePoint = ExternalOperationHandler::TimePoint; using namespace std::literals::chrono_literals; @@ -322,7 +356,7 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time( std::shared_ptr<api::StorageCommand> cmd) { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -357,7 +391,7 @@ void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() { void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -372,7 +406,7 @@ void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(10); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -390,7 +424,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); } Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected( @@ -518,7 +552,7 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled( void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() { createLinks(); - std::string current = "distributor:1 storage:3"; + std::string current = "version:1 distributor:1 storage:3"; setupDistributor(1, 3, current); getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 19f414482db..cdf990fa28f 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -37,6 +37,7 @@ struct StateManagerTest : public CppUnit::TestFixture { void can_explicitly_send_get_node_state_reply(); void explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request(); void immediate_node_state_replying_is_tracked_per_controller(); + void activation_command_is_bounced_with_current_cluster_state_version(); CPPUNIT_TEST_SUITE(StateManagerTest); CPPUNIT_TEST(testSystemState); @@ -45,8 +46,10 @@ struct StateManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(can_explicitly_send_get_node_state_reply); CPPUNIT_TEST(explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request); CPPUNIT_TEST(immediate_node_state_replying_is_tracked_per_controller); + CPPUNIT_TEST(activation_command_is_bounced_with_current_cluster_state_version); CPPUNIT_TEST_SUITE_END(); + void force_current_cluster_state_version(uint32_t version); void mark_reported_node_state_up(); void send_down_get_node_state_request(uint16_t controller_index); void assert_ok_get_node_state_reply_sent_and_clear(); @@ -101,6 +104,12 @@ StateManagerTest::tearDown() { _metricManager.reset(); } +void StateManagerTest::force_current_cluster_state_version(uint32_t version) { + ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); + state.setVersion(version); + _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); +} + #define GET_ONLY_OK_REPLY(varname) \ { \ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \ @@ -236,9 +245,7 @@ StateManagerTest::testReportedNodeState() } void StateManagerTest::current_cluster_state_version_is_included_in_host_info_json() { - ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); - state.setVersion(123); - _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); + force_current_cluster_state_version(123); std::string nodeInfoString(_manager->getNodeInfo()); vespalib::Memory goldenMemory(nodeInfoString); @@ -343,4 +350,21 @@ void StateManagerTest::immediate_node_state_replying_is_tracked_per_controller() CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); } +void StateManagerTest::activation_command_is_bounced_with_current_cluster_state_version() { + force_current_cluster_state_version(12345); + + auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340); + cmd->setTimeout(10000000); + cmd->setSourceIndex(0); + _upper->sendDown(cmd); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); + std::shared_ptr<api::StorageReply> reply; + GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + CPPUNIT_ASSERT_EQUAL(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType()); + auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply); + CPPUNIT_ASSERT_EQUAL(uint32_t(12340), activate_reply.activateVersion()); + CPPUNIT_ASSERT_EQUAL(uint32_t(12345), activate_reply.actualVersion()); +} + } // storage |