diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-15 16:05:22 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2019-03-15 16:05:22 +0000 |
commit | 00804dfe908b6a337f88c42ea3def0e1f1397474 (patch) | |
tree | e4e58c7a54e830fc5a237fe0be945e1f62f40492 /storage | |
parent | 1b959abd3224c00f0347a8078dc333abfdd3ce9f (diff) |
Properly handle non-owned vs. missing buckets
Bonus: no more spurious "we have removed buckets" log messages caused
by ownership changes.
Also ensure that we BUSY-bounce operations in `ExternalOperationHandler`
when there is no actual state to send back in a `WrongDistributionReply`.
Diffstat (limited to 'storage')
9 files changed, 250 insertions, 52 deletions
diff --git a/storage/src/tests/distributor/bucketdbupdatertest.cpp b/storage/src/tests/distributor/bucketdbupdatertest.cpp index fd9512b2bd6..0a82fadbf68 100644 --- a/storage/src/tests/distributor/bucketdbupdatertest.cpp +++ b/storage/src/tests/distributor/bucketdbupdatertest.cpp @@ -114,19 +114,12 @@ class BucketDBUpdaterTest : public CppUnit::TestFixture, CPPUNIT_TEST(batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted); CPPUNIT_TEST(global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection); CPPUNIT_TEST(non_owned_buckets_moved_to_read_only_db_on_ownership_change); + CPPUNIT_TEST(buckets_no_longer_available_are_not_moved_to_read_only_database); CPPUNIT_TEST(non_owned_buckets_purged_when_read_only_support_is_config_disabled); + CPPUNIT_TEST(deferred_activated_state_does_not_enable_state_until_activation_received); + CPPUNIT_TEST(read_only_db_cleared_once_pending_state_is_activated); CPPUNIT_TEST_SUITE_END(); - /* - * TODO tests - * - buckets moved to read only db on ownership change - * - even when self is down in pending state - * - buckets NOT moved to read only db on content node down/maintenance - * - read only db cleared when cluster state activated - * - explicit cluster state activation path - * - legacy implicit cluster state activation support - */ - public: BucketDBUpdaterTest(); @@ -188,7 +181,10 @@ protected: void batch_update_from_distributor_change_does_not_mark_diverging_replicas_as_trusted(); void global_distribution_hash_falls_back_to_legacy_format_upon_request_rejection(); void non_owned_buckets_moved_to_read_only_db_on_ownership_change(); + void buckets_no_longer_available_are_not_moved_to_read_only_database(); void non_owned_buckets_purged_when_read_only_support_is_config_disabled(); + void deferred_activated_state_does_not_enable_state_until_activation_received(); + void read_only_db_cleared_once_pending_state_is_activated(); auto &defaultDistributorBucketSpace() { return getBucketSpaceRepo().get(makeBucketSpace()); } @@ -215,6 +211,19 @@ protected: // cluster state component (just not by operations), so it would not have the expected semantics. DistributorBucketSpaceRepo& read_only_repo() noexcept { return getReadOnlyBucketSpaceRepo(); } + BucketDatabase& mutable_default_db() noexcept { + return mutable_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + BucketDatabase& mutable_global_db() noexcept { + return mutable_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + BucketDatabase& read_only_default_db() noexcept { + return read_only_repo().get(FixedBucketSpaces::default_space()).getBucketDatabase(); + } + BucketDatabase& read_only_global_db() noexcept { + return read_only_repo().get(FixedBucketSpaces::global_space()).getBucketDatabase(); + } + static std::string getNodeList(std::vector<uint16_t> nodes, size_t count); std::string getNodeList(std::vector<uint16_t> nodes); @@ -228,6 +237,10 @@ protected: return messagesPerBucketSpace * _bucketSpaces.size(); } + void trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state, uint32_t initial_buckets, uint32_t initial_expected_msgs, + vespalib::stringref pending_state, uint32_t pending_buckets, uint32_t pending_expected_msgs); + public: using OutdatedNodesMap = dbtransition::OutdatedNodesMap; void setUp() override { @@ -391,8 +404,7 @@ public: void setSystemState(const lib::ClusterState& state) { const size_t sizeBeforeState = _sender.commands.size(); getBucketDBUpdater().onSetSystemState( - std::shared_ptr<api::SetSystemStateCommand>( - new api::SetSystemStateCommand(state))); + std::make_shared<api::SetSystemStateCommand>(state)); // A lot of test logic has the assumption that all messages sent as a // result of cluster state changes will be in increasing index order // (for simplicity, not because this is required for correctness). @@ -401,6 +413,18 @@ public: sortSentMessagesByIndex(_sender, sizeBeforeState); } + void setClusterStateBundle(const lib::ClusterStateBundle& state) { + const size_t sizeBeforeState = _sender.commands.size(); + getBucketDBUpdater().onSetSystemState( + std::make_shared<api::SetSystemStateCommand>(state)); + sortSentMessagesByIndex(_sender, sizeBeforeState); + } + + void activateClusterStateVersion(uint32_t version) { + getBucketDBUpdater().onActivateClusterStateVersion( + std::make_shared<api::ActivateClusterStateVersionCommand>(version)); + } + void completeBucketInfoGathering(const lib::ClusterState& state, size_t expectedMsgs, uint32_t bucketCount = 1, @@ -2640,7 +2664,7 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition - setSystemState(initial_state); + setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); constexpr uint32_t n_buckets = 10; @@ -2665,9 +2689,9 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c }); CPPUNIT_ASSERT(!buckets_not_owned_in_pending_state.empty()); - setSystemState(pending_state); + setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); // Now requires activation - CPPUNIT_ASSERT_EQUAL(size_t(n_buckets - buckets_not_owned_in_pending_state.size()), default_db.size()); + CPPUNIT_ASSERT_EQUAL(n_buckets - buckets_not_owned_in_pending_state.size(), default_db.size()); CPPUNIT_ASSERT_EQUAL(buckets_not_owned_in_pending_state.size(), read_only_db.size()); // TODO replace with gmock unordered set equality matcher @@ -2679,6 +2703,26 @@ void BucketDBUpdaterTest::non_owned_buckets_moved_to_read_only_db_on_ownership_c // TODO check global space too! } +// TODO dedupe setup stuff +void BucketDBUpdaterTest::buckets_no_longer_available_are_not_moved_to_read_only_database() { + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + lib::ClusterState initial_state("distributor:1 storage:4"); // All buckets owned by us by definition + setClusterStateBundle(lib::ClusterStateBundle(initial_state, {}, false)); // Skip activation step for simplicity + + CPPUNIT_ASSERT_EQUAL(messageCount(4), _sender.commands.size()); + constexpr uint32_t n_buckets = 10; + completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); + _sender.clear(); + + // No ownership change, just node down. Test redundancy is 2, so removing 2 nodes will + // cause some buckets to be entirely unavailable. + lib::ClusterState pending_state("distributor:1 storage:4 .0.s:d .1.s:m"); + setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); + + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); +} + +// TODO must ensure this works correctly with cluster controller! void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_config_disabled() { getConfig().setAllowStaleReadsDuringClusterStateTransitions(false); @@ -2690,13 +2734,79 @@ void BucketDBUpdaterTest::non_owned_buckets_purged_when_read_only_support_is_con completeBucketInfoGathering(initial_state, messageCount(4), n_buckets); _sender.clear(); - auto& read_only_db = read_only_repo().get(makeBucketSpace()).getBucketDatabase(); - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); lib::ClusterState pending_state("distributor:2 storage:4"); setSystemState(pending_state); // No buckets should be moved into read only db - CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_db.size()); + CPPUNIT_ASSERT_EQUAL(size_t(0), read_only_default_db().size()); +} + +void BucketDBUpdaterTest::trigger_completed_but_not_yet_activated_transition( + vespalib::stringref initial_state_str, + uint32_t initial_buckets, + uint32_t initial_expected_msgs, + vespalib::stringref pending_state_str, + uint32_t pending_buckets, + uint32_t pending_expected_msgs) +{ + getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); + lib::ClusterState initial_state(initial_state_str); + setSystemState(initial_state); + // TODO decouple expected message count + CPPUNIT_ASSERT_EQUAL(messageCount(initial_expected_msgs), _sender.commands.size()); + completeBucketInfoGathering(initial_state, messageCount(initial_expected_msgs), initial_buckets); + _sender.clear(); + + lib::ClusterState pending_state(pending_state_str); // Ownership change + setClusterStateBundle(lib::ClusterStateBundle(pending_state, {}, true)); + CPPUNIT_ASSERT_EQUAL(messageCount(pending_expected_msgs), _sender.commands.size()); + completeBucketInfoGathering(pending_state, messageCount(pending_expected_msgs), pending_buckets); + _sender.clear(); +} + +void BucketDBUpdaterTest::deferred_activated_state_does_not_enable_state_until_activation_received() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:2 storage:4", 0, 4, + "version:2 distributor:1 storage:4", n_buckets, 4); + + // Version should not be switched over yet + CPPUNIT_ASSERT_EQUAL(uint32_t(1), getDistributor().getClusterStateBundle().getVersion()); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), mutable_global_db().size()); + + activateClusterStateVersion(2); + + CPPUNIT_ASSERT_EQUAL(uint32_t(2), getDistributor().getClusterStateBundle().getVersion()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(n_buckets), mutable_global_db().size()); } +void BucketDBUpdaterTest::read_only_db_cleared_once_pending_state_is_activated() { + constexpr uint32_t n_buckets = 10; + trigger_completed_but_not_yet_activated_transition("version:1 distributor:1 storage:4", n_buckets, 4, + "version:2 distributor:2 storage:4", n_buckets, 0); + activateClusterStateVersion(2); + + CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_default_db().size()); + CPPUNIT_ASSERT_EQUAL(uint64_t(0), read_only_global_db().size()); +} + +/* + * TODO tests + * - [X] buckets moved to read only db on ownership change + * - even when self is down in pending state + * - [X] buckets NOT moved to read only db on content node down/maintenance + * - [X] read only db cleared when cluster state activated + * - explicit cluster state activation path + * - legacy implicit cluster state activation support + * - return activation ACK if already activated + * - buckets merged on explicit activation + * - version check for activation msg + * - deferred bundle with non-deferred config? + */ + +// TODO reconcile default deferred activation state of bundle between Java and C++! + } diff --git a/storage/src/tests/distributor/externaloperationhandlertest.cpp b/storage/src/tests/distributor/externaloperationhandlertest.cpp index ebf9d4e3bc8..76917f856f0 100644 --- a/storage/src/tests/distributor/externaloperationhandlertest.cpp +++ b/storage/src/tests/distributor/externaloperationhandlertest.cpp @@ -23,7 +23,9 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, CPPUNIT_TEST(testBucketSplitMask); CPPUNIT_TEST(mutating_operation_wdr_bounced_on_wrong_current_distribution); CPPUNIT_TEST(mutating_operation_busy_bounced_on_wrong_pending_distribution); + CPPUNIT_TEST(mutating_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(read_only_operation_wdr_bounced_on_wrong_current_distribution); + CPPUNIT_TEST(read_only_operation_busy_bounced_if_no_cluster_state_received_yet); CPPUNIT_TEST(reject_put_if_not_past_safe_time_point); CPPUNIT_TEST(reject_remove_if_not_past_safe_time_point); CPPUNIT_TEST(reject_update_if_not_past_safe_time_point); @@ -59,6 +61,8 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, const vespalib::string& id) const; std::shared_ptr<api::RemoveCommand> makeRemoveCommand(const vespalib::string& id) const; + void verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd); + Operation::SP start_operation_verify_not_rejected(std::shared_ptr<api::StorageCommand> cmd); void start_operation_verify_rejected(std::shared_ptr<api::StorageCommand> cmd); @@ -92,8 +96,10 @@ class ExternalOperationHandlerTest : public CppUnit::TestFixture, protected: void testBucketSplitMask(); void mutating_operation_wdr_bounced_on_wrong_current_distribution(); - void read_only_operation_wdr_bounced_on_wrong_current_distribution(); void mutating_operation_busy_bounced_on_wrong_pending_distribution(); + void mutating_operation_busy_bounced_if_no_cluster_state_received_yet(); + void read_only_operation_wdr_bounced_on_wrong_current_distribution(); + void read_only_operation_busy_bounced_if_no_cluster_state_received_yet(); void reject_put_if_not_past_safe_time_point(); void reject_remove_if_not_past_safe_time_point(); void reject_update_if_not_past_safe_time_point(); @@ -254,7 +260,7 @@ void ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); - std::string state("distributor:2 storage:2"); + std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); @@ -266,7 +272,7 @@ ExternalOperationHandlerTest::mutating_operation_wdr_bounced_on_wrong_current_di CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:2 storage:2)"), + "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } @@ -274,7 +280,7 @@ void ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_distribution() { createLinks(); - std::string state("distributor:2 storage:2"); + std::string state("version:1 distributor:2 storage:2"); setupDistributor(1, 2, state); document::BucketId bucket(findNonOwnedUserBucketInState(state)); @@ -286,7 +292,7 @@ ExternalOperationHandlerTest::read_only_operation_wdr_bounced_on_wrong_current_d CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); CPPUNIT_ASSERT_EQUAL( std::string("ReturnCode(WRONG_DISTRIBUTION, " - "distributor:2 storage:2)"), + "version:1 distributor:2 storage:2)"), _sender.replies[0]->getResult().toString()); } @@ -315,6 +321,34 @@ ExternalOperationHandlerTest::mutating_operation_busy_bounced_on_wrong_pending_d _sender.replies[0]->getResult().toString()); } +void +ExternalOperationHandlerTest::verify_busy_bounced_due_to_no_active_state(std::shared_ptr<api::StorageCommand> cmd) +{ + createLinks(); + std::string state{}; // No version --> not yet received + setupDistributor(1, 2, state); + + Operation::SP genOp; + CPPUNIT_ASSERT(getExternalOperationHandler().handleMessage(cmd, genOp)); + CPPUNIT_ASSERT(!genOp.get()); + CPPUNIT_ASSERT_EQUAL(size_t(1), _sender.replies.size()); + CPPUNIT_ASSERT_EQUAL( + std::string("ReturnCode(BUSY, No cluster state activated yet)"), + _sender.replies[0]->getResult().toString()); +} + +void +ExternalOperationHandlerTest::mutating_operation_busy_bounced_if_no_cluster_state_received_yet() +{ + verify_busy_bounced_due_to_no_active_state(makeUpdateCommandForUser(12345)); +} + +void +ExternalOperationHandlerTest::read_only_operation_busy_bounced_if_no_cluster_state_received_yet() +{ + verify_busy_bounced_due_to_no_active_state(makeGetCommandForUser(12345)); +} + using TimePoint = ExternalOperationHandler::TimePoint; using namespace std::literals::chrono_literals; @@ -322,7 +356,7 @@ void ExternalOperationHandlerTest::assert_rejection_due_to_unsafe_time( std::shared_ptr<api::StorageCommand> cmd) { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -357,7 +391,7 @@ void ExternalOperationHandlerTest::reject_update_if_not_past_safe_time_point() { void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(9); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -372,7 +406,7 @@ void ExternalOperationHandlerTest::get_not_rejected_by_unsafe_time_point() { void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); getClock().setAbsoluteTimeInSeconds(10); getExternalOperationHandler().rejectFeedBeforeTimeReached(TimePoint(10s)); @@ -390,7 +424,7 @@ void ExternalOperationHandlerTest::mutation_not_rejected_when_safe_point_reached void ExternalOperationHandlerTest::set_up_distributor_for_sequencing_test() { createLinks(); - setupDistributor(1, 2, "distributor:1 storage:1"); + setupDistributor(1, 2, "version:1 distributor:1 storage:1"); } Operation::SP ExternalOperationHandlerTest::start_operation_verify_not_rejected( @@ -518,7 +552,7 @@ void ExternalOperationHandlerTest::sequencing_can_be_explicitly_config_disabled( void ExternalOperationHandlerTest::gets_are_started_with_mutable_db_outside_transition_period() { createLinks(); - std::string current = "distributor:1 storage:3"; + std::string current = "version:1 distributor:1 storage:3"; setupDistributor(1, 3, current); getConfig().setAllowStaleReadsDuringClusterStateTransitions(true); diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 19f414482db..cdf990fa28f 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -37,6 +37,7 @@ struct StateManagerTest : public CppUnit::TestFixture { void can_explicitly_send_get_node_state_reply(); void explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request(); void immediate_node_state_replying_is_tracked_per_controller(); + void activation_command_is_bounced_with_current_cluster_state_version(); CPPUNIT_TEST_SUITE(StateManagerTest); CPPUNIT_TEST(testSystemState); @@ -45,8 +46,10 @@ struct StateManagerTest : public CppUnit::TestFixture { CPPUNIT_TEST(can_explicitly_send_get_node_state_reply); CPPUNIT_TEST(explicit_node_state_replying_without_pending_request_immediately_replies_on_next_request); CPPUNIT_TEST(immediate_node_state_replying_is_tracked_per_controller); + CPPUNIT_TEST(activation_command_is_bounced_with_current_cluster_state_version); CPPUNIT_TEST_SUITE_END(); + void force_current_cluster_state_version(uint32_t version); void mark_reported_node_state_up(); void send_down_get_node_state_request(uint16_t controller_index); void assert_ok_get_node_state_reply_sent_and_clear(); @@ -101,6 +104,12 @@ StateManagerTest::tearDown() { _metricManager.reset(); } +void StateManagerTest::force_current_cluster_state_version(uint32_t version) { + ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); + state.setVersion(version); + _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); +} + #define GET_ONLY_OK_REPLY(varname) \ { \ CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); \ @@ -236,9 +245,7 @@ StateManagerTest::testReportedNodeState() } void StateManagerTest::current_cluster_state_version_is_included_in_host_info_json() { - ClusterState state(*_manager->getClusterStateBundle()->getBaselineClusterState()); - state.setVersion(123); - _manager->setClusterStateBundle(lib::ClusterStateBundle(state)); + force_current_cluster_state_version(123); std::string nodeInfoString(_manager->getNodeInfo()); vespalib::Memory goldenMemory(nodeInfoString); @@ -343,4 +350,21 @@ void StateManagerTest::immediate_node_state_replying_is_tracked_per_controller() CPPUNIT_ASSERT_EQUAL(size_t(0), _upper->getNumReplies()); } +void StateManagerTest::activation_command_is_bounced_with_current_cluster_state_version() { + force_current_cluster_state_version(12345); + + auto cmd = std::make_shared<api::ActivateClusterStateVersionCommand>(12340); + cmd->setTimeout(10000000); + cmd->setSourceIndex(0); + _upper->sendDown(cmd); + + CPPUNIT_ASSERT_EQUAL(size_t(1), _upper->getNumReplies()); + std::shared_ptr<api::StorageReply> reply; + GET_ONLY_OK_REPLY(reply); // Implicitly clears messages from _upper + CPPUNIT_ASSERT_EQUAL(api::MessageType::ACTIVATE_CLUSTER_STATE_VERSION_REPLY, reply->getType()); + auto& activate_reply = dynamic_cast<api::ActivateClusterStateVersionReply&>(*reply); + CPPUNIT_ASSERT_EQUAL(uint32_t(12340), activate_reply.activateVersion()); + CPPUNIT_ASSERT_EQUAL(uint32_t(12345), activate_reply.actualVersion()); +} + } // storage diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp index 20ec67fdb14..202455098db 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.cpp +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.cpp @@ -121,7 +121,6 @@ void BucketDBUpdater::removeSuperfluousBuckets( const lib::ClusterStateBundle& newState) { - // TODO too many indirections to get at config. const bool move_to_read_only_db = _distributorComponent.getDistributor().getConfig() .allowStaleReadsDuringClusterStateTransitions(); for (auto &elem : _distributorComponent.getBucketSpaceRepo()) { @@ -140,15 +139,18 @@ BucketDBUpdater::removeSuperfluousBuckets( _distributorComponent.getDistributor().getStorageNodeUpStates()); bucketDb.forEach(proc); - // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory... for (const auto & bucket : proc.getBucketsToRemove()) { + bucketDb.remove(bucket); + } + // TODO vec of Entry instead to avoid lookup and remove? Uses more transient memory... + for (const auto& bucket : proc.getNonOwnedBuckets()) { if (move_to_read_only_db) { - // TODO explicit transfer function? auto db_entry = bucketDb.get(bucket); readOnlyDb.update(db_entry); // TODO Entry move support } bucketDb.remove(bucket); } + } } @@ -546,11 +548,17 @@ void BucketDBUpdater::processCompletedPendingClusterState() { if (_pendingClusterState->isDeferred()) { + LOG(debug, "Deferring completion of pending cluster state version %u until explicitly activated", + _pendingClusterState->clusterStateVersion()); assert(_pendingClusterState->hasCommand()); // Deferred transitions should only ever be created by state commands. // Sending down SetSystemState command will reach the state manager and a reply // will be auto-sent back to the cluster controller in charge. Once this happens, // it will send an explicit activation command once all distributors have reported // that their pending cluster states have completed. + // A booting distributor will treat itself as "system Up" before the state has actually + // taken effect via activation. External operation handler will keep operations from + // actually being scheduled until state has been activated. The external operation handler + // needs to be explicitly aware of the case where no state has yet to be activated. _distributorComponent.getDistributor().getMessageSender().sendDown( _pendingClusterState->getCommand()); _pendingClusterState->clearCommand(); @@ -567,6 +575,7 @@ BucketDBUpdater::activatePendingClusterState() _pendingClusterState->mergeIntoBucketDatabases(); if (_pendingClusterState->isVersionedTransition()) { + LOG(debug, "Activating pending cluster state version %u", _pendingClusterState->clusterStateVersion()); enableCurrentClusterStateBundleInDistributor(); if (_pendingClusterState->hasCommand()) { _distributorComponent.getDistributor().getMessageSender().sendDown( @@ -574,6 +583,7 @@ BucketDBUpdater::activatePendingClusterState() } addCurrentStateToClusterStateHistory(); } else { + LOG(debug, "Activating pending distribution config"); // TODO distribution changes cannot currently be deferred as they are not // initiated by the cluster controller! _distributorComponent.getDistributor().notifyDistributionChangeEnabled(); @@ -583,13 +593,16 @@ BucketDBUpdater::activatePendingClusterState() _outdatedNodesMap.clear(); sendAllQueuedBucketRechecks(); completeTransitionTimer(); + for (auto& space : _distributorComponent.getReadOnlyBucketSpaceRepo()) { + space.second->getBucketDatabase().clear(); + } } void BucketDBUpdater::enableCurrentClusterStateBundleInDistributor() { const lib::ClusterStateBundle& state( - _pendingClusterState->getCommand()->getClusterStateBundle()); + _pendingClusterState->getNewClusterStateBundle()); LOG(debug, "BucketDBUpdater finished processing state %s", @@ -771,7 +784,7 @@ BucketDBUpdater::NodeRemover::process(BucketDatabase::Entry& e) return true; } if (!distributorOwnsBucket(bucketId)) { - _removedBuckets.push_back(bucketId); + _nonOwnedBuckets.push_back(bucketId); return true; } diff --git a/storage/src/vespa/storage/distributor/bucketdbupdater.h b/storage/src/vespa/storage/distributor/bucketdbupdater.h index 973495d2007..ee98e398f6d 100644 --- a/storage/src/vespa/storage/distributor/bucketdbupdater.h +++ b/storage/src/vespa/storage/distributor/bucketdbupdater.h @@ -203,6 +203,8 @@ private: const char* upStates) : _oldState(oldState), _state(s), + _nonOwnedBuckets(), + _removedBuckets(), _localIndex(localIndex), _distribution(distribution), _upStates(upStates) {} @@ -212,15 +214,19 @@ private: void logRemove(const document::BucketId& bucketId, const char* msg) const; bool distributorOwnsBucket(const document::BucketId&) const; - const std::vector<document::BucketId>& getBucketsToRemove() const { + const std::vector<document::BucketId>& getBucketsToRemove() const noexcept { return _removedBuckets; } + const std::vector<document::BucketId>& getNonOwnedBuckets() const noexcept { + return _nonOwnedBuckets; + } private: void setCopiesInEntry(BucketDatabase::Entry& e, const std::vector<BucketCopy>& copies) const; void removeEmptyBucket(const document::BucketId& bucketId); const lib::ClusterState _oldState; const lib::ClusterState _state; + std::vector<document::BucketId> _nonOwnedBuckets; std::vector<document::BucketId> _removedBuckets; uint16_t _localIndex; diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp index 5d3261a71f7..7d0681df88d 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.cpp +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.cpp @@ -72,19 +72,27 @@ ExternalOperationHandler::checkSafeTimeReached(api::StorageCommand& cmd) return true; } +void ExternalOperationHandler::bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result) { + api::StorageReply::UP reply(cmd.makeReply()); + reply->setResult(result); + sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); +} + void ExternalOperationHandler::bounce_with_wrong_distribution(api::StorageCommand& cmd) { // Distributor ownership is equal across cluster states, so always send back default state. // This also helps client avoid getting confused by possibly observing different actual - // states for global/non-global document types for the same state version. - auto cluster_state_str = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()) - .getClusterState().toString(); - LOG(debug, "Got message with wrong distribution, sending back state '%s'", - cluster_state_str.c_str()); - - api::StorageReply::UP reply(cmd.makeReply()); - api::ReturnCode ret(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str); - reply->setResult(ret); - sendUp(std::shared_ptr<api::StorageMessage>(reply.release())); + // (derived) state strings for global/non-global document types for the same state version. + // Similarly, if we've yet to activate any version at all we send back BUSY instead + // of a suspiciously empty WrongDistributionReply. + const auto& cluster_state = _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).getClusterState(); + if (cluster_state.getVersion() != 0) { + auto cluster_state_str = cluster_state.toString(); + LOG(debug, "Got message with wrong distribution, sending back state '%s'", cluster_state_str.c_str()); + bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::WRONG_DISTRIBUTION, cluster_state_str)); + } else { // Only valid for empty startup state + LOG(debug, "Got message with wrong distribution, but no cluster state activated yet. Sending back BUSY"); + bounce_with_result(cmd, api::ReturnCode(api::ReturnCode::BUSY, "No cluster state activated yet")); + } } void ExternalOperationHandler::bounce_with_busy_during_state_transition( diff --git a/storage/src/vespa/storage/distributor/externaloperationhandler.h b/storage/src/vespa/storage/distributor/externaloperationhandler.h index bd51a914ea8..655feb5d00c 100644 --- a/storage/src/vespa/storage/distributor/externaloperationhandler.h +++ b/storage/src/vespa/storage/distributor/externaloperationhandler.h @@ -66,6 +66,7 @@ private: void bounce_with_busy_during_state_transition(api::StorageCommand& cmd, const lib::ClusterState& current_state, const lib::ClusterState& pending_state); + void bounce_with_result(api::StorageCommand& cmd, const api::ReturnCode& result); bool checkSafeTimeReached(api::StorageCommand& cmd); api::ReturnCode makeSafeTimeRejectionResult(TimePoint unsafeTime); diff --git a/storage/src/vespa/storage/storageserver/fnetlistener.cpp b/storage/src/vespa/storage/storageserver/fnetlistener.cpp index 3654b17a2a7..4e9fdc99a8a 100644 --- a/storage/src/vespa/storage/storageserver/fnetlistener.cpp +++ b/storage/src/vespa/storage/storageserver/fnetlistener.cpp @@ -208,6 +208,7 @@ void FNetListener::RPC_setDistributionStates(FRT_RPCRequest* req) { req->SetError(RPCRequestWrapper::ERR_BAD_REQUEST, e.what()); return; } + LOG(info, "Got state bundle %s", state_bundle->toString().c_str()); // TODO // TODO add constructor taking in shared_ptr directly instead? auto cmd = std::make_shared<api::SetSystemStateCommand>(*state_bundle); diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index 02b9a765dd8..af01a880fea 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -518,11 +518,12 @@ bool StateManager::onActivateClusterStateVersion( const std::shared_ptr<api::ActivateClusterStateVersionCommand>& cmd) { - // TODO we probably don't want to invoke listeners here? but just bounce with - // currently activated bundle version? - // Must ensure that layer above (i.e. distributor) maintains strict operation - // ordering. - sendUp(std::make_shared<api::ActivateClusterStateVersionReply>(*cmd)); + auto reply = std::make_shared<api::ActivateClusterStateVersionReply>(*cmd); + { + vespalib::LockGuard lock(_stateLock); + reply->setActualVersion(_systemState ? _systemState->getVersion() : 0); + } + sendUp(reply); return true; } |