From e7fdc8f5010e18cd5599a016156a5c202df0f8db Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 20 Jun 2024 09:16:44 +0000 Subject: Handle cluster state bundle with distribution config on content node If a cluster state bundle contains distribution config, this is internally propagated via the `StateManager` component to all registered state listeners. One such state listener is `FileStorManager`, which updates the content node-internal bucket space repository. All `SetSystemStateCommand` and internal config-aware components (`StateManager` and `ChangedBucketOwnershipHandler`) now explicitly track whether the cluster controller provides distribution config, or if the internally provided config should be used (including fallback to internal config if necessary). --- storage/src/tests/bucketdb/bucketmanagertest.cpp | 29 ++--- storage/src/tests/common/testnodestateupdater.cpp | 17 ++- storage/src/tests/common/testnodestateupdater.h | 9 +- storage/src/tests/common/teststorageapp.cpp | 10 +- storage/src/tests/common/teststorageapp.h | 3 +- .../changedbucketownershiphandlertest.cpp | 54 ++++++++- .../src/tests/storageserver/statemanagertest.cpp | 123 +++++++++++++++++++-- .../src/vespa/storage/bucketdb/bucketmanager.cpp | 12 +- .../vespa/storage/common/content_bucket_space.cpp | 29 +---- .../vespa/storage/common/content_bucket_space.h | 8 -- .../src/vespa/storage/common/storagecomponent.h | 2 +- .../servicelayercomponentregisterimpl.cpp | 6 +- .../component/servicelayercomponentregisterimpl.h | 2 +- .../component/storagecomponentregisterimpl.cpp | 2 +- .../component/storagecomponentregisterimpl.h | 32 +++--- .../persistence/bucketownershipnotifier.cpp | 25 +++-- .../storage/persistence/bucketownershipnotifier.h | 20 ++-- .../persistence/filestorage/filestormanager.cpp | 18 +-- .../changedbucketownershiphandler.cpp | 90 +++++++-------- .../storageserver/changedbucketownershiphandler.h | 20 ++-- .../vespa/storage/storageserver/statemanager.cpp | 91 +++++++++++---- .../src/vespa/storage/storageserver/statemanager.h | 4 + 22 files changed, 394 insertions(+), 212 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index f41dae89eec..a20902422b0 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -1,9 +1,10 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include +#include #include #include -#include +#include #include #include #include @@ -25,7 +26,6 @@ #include #include #include -#include #include #include @@ -116,16 +116,17 @@ public: BucketManagerTest::~BucketManagerTest() = default; -#define ASSERT_DUMMYLINK_REPLY_COUNT(link, count) \ - if (link->getNumReplies() != count) { \ - std::ostringstream ost; \ - ost << "Expected there to be " << count << " replies in link, but " \ - << "found " << link->getNumReplies() << ":\n"; \ - for (uint32_t i=0; igetNumReplies(); ++i) { \ - ost << link->getReply(i)->getType() << "\n"; \ - } \ - FAIL() << ost.str(); \ +void check_dummy_link_reply_count(const DummyStorageLink& link, size_t expected_count) { + if (link.getNumReplies() != expected_count) { + std::ostringstream ost; + ost << "Expected there to be " << expected_count << " replies in link, but " + << "found " << link.getNumReplies() << ":\n"; + for (uint32_t i = 0; i < link.getNumReplies(); ++i) { + ost << link.getReply(i)->getType() << "\n"; + } + FAIL() << ost.str(); } +} void BucketManagerTest::setupTestEnvironment() { @@ -313,7 +314,7 @@ TEST_F(BucketManagerTest, DISABLED_request_bucket_info_with_state) { { LOG(info, "Waiting for response from 3 request bucket info messages"); _top->waitForMessages(3, 5); - ASSERT_DUMMYLINK_REPLY_COUNT(_top, 3); + ASSERT_NO_FATAL_FAILURE(check_dummy_link_reply_count(*_top, 3)); std::map replies; for (uint32_t i=0; i<3; ++i) { replies[_top->getReply(i)->getMsgId()] @@ -357,7 +358,7 @@ TEST_F(BucketManagerTest, request_bucket_info_with_list) { _top->sendDown(cmd); _top->waitForMessages(1, 5); - ASSERT_DUMMYLINK_REPLY_COUNT(_top, 1); + ASSERT_NO_FATAL_FAILURE(check_dummy_link_reply_count(*_top, 1)); auto reply = std::dynamic_pointer_cast(_top->getReply(0)); _top->reset(); ASSERT_TRUE(reply.get()); @@ -527,7 +528,7 @@ class ConcurrentOperationFixture { public: explicit ConcurrentOperationFixture(BucketManagerTest& self) : _self(self), - _state(std::make_shared("distributor:1 storage:1")) + _state(std::make_shared("version:2 distributor:1 storage:1")) { _self.setupTestEnvironment(); _self._top->open(); diff --git a/storage/src/tests/common/testnodestateupdater.cpp b/storage/src/tests/common/testnodestateupdater.cpp index f9671617352..5d1d7a085b9 100644 --- a/storage/src/tests/common/testnodestateupdater.cpp +++ b/storage/src/tests/common/testnodestateupdater.cpp @@ -24,18 +24,29 @@ TestNodeStateUpdater::getClusterStateBundle() const return _clusterStateBundle; } +void +TestNodeStateUpdater::patch_distribution(std::shared_ptr distribution) +{ + _clusterStateBundle = _clusterStateBundle->clone_with_new_distribution( + lib::DistributionConfigBundle::of(std::move(distribution))); +} + void TestNodeStateUpdater::setClusterState(std::shared_ptr c) { - setClusterStateBundle(std::make_shared(*c)); + setClusterStateBundle(std::make_shared(std::move(c))); } void TestNodeStateUpdater::setClusterStateBundle(std::shared_ptr clusterStateBundle) { + auto existing_distr = _clusterStateBundle->distribution_config_bundle(); _clusterStateBundle = std::move(clusterStateBundle); - for (uint32_t i = 0; i < _listeners.size(); ++i) { - _listeners[i]->handleNewState(); + if (!_clusterStateBundle->has_distribution_config() && existing_distr) { + _clusterStateBundle = _clusterStateBundle->clone_with_new_distribution(existing_distr); + } + for (auto* listener : _listeners) { + listener->handleNewState(); } } diff --git a/storage/src/tests/common/testnodestateupdater.h b/storage/src/tests/common/testnodestateupdater.h index e5418c238d5..6a00e9a2264 100644 --- a/storage/src/tests/common/testnodestateupdater.h +++ b/storage/src/tests/common/testnodestateupdater.h @@ -1,11 +1,4 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::TestNodeStateUpdater - * \ingroup common - * - * \brief Test implementation of the node state updater. - */ - #pragma once #include @@ -14,6 +7,7 @@ namespace storage::lib { class ClusterState; class ClusterStateBundle; + class Distribution; } namespace storage { @@ -53,6 +47,7 @@ public: _current = std::make_shared(state); } + void patch_distribution(std::shared_ptr distribution); void setClusterState(std::shared_ptr c); void setClusterStateBundle(std::shared_ptr clusterStateBundle); diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index d811f100aec..b2b82a46850 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -6,10 +6,10 @@ #include #include #include -#include #include #include #include +#include #include #include #include @@ -59,6 +59,7 @@ TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg, auto distr = std::make_shared( lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount)); _compReg.setDistribution(distr); + _nodeStateUpdater.patch_distribution(distr); } TestStorageApp::~TestStorageApp() = default; @@ -69,6 +70,7 @@ TestStorageApp::setDistribution(Redundancy redundancy, NodeCount nodeCount) auto distr = std::make_shared( lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount)); _compReg.setDistribution(distr); + _nodeStateUpdater.patch_distribution(distr); } void @@ -83,6 +85,12 @@ TestStorageApp::setClusterState(const lib::ClusterState& c) _nodeStateUpdater.setClusterState(std::make_shared(c)); } +void +TestStorageApp::set_cluster_state_bundle(std::shared_ptr state_bundle) +{ + _nodeStateUpdater.setClusterStateBundle(std::move(state_bundle)); +} + namespace { NodeIndex node_index_from_config(const config::ConfigUri& uri) { diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 04fa6996e15..c423761a9a2 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -73,6 +73,7 @@ public: void setDistribution(Redundancy, NodeCount); void setTypeRepo(std::shared_ptr repo); void setClusterState(const lib::ClusterState&); + void set_cluster_state_bundle(std::shared_ptr); // Utility functions for getting a hold of currently used bits. Practical // to avoid adding extra components in the tests. @@ -81,7 +82,7 @@ public: std::shared_ptr getTypeRepo() { return _compReg.getTypeRepo(); } const document::BucketIdFactory& getBucketIdFactory() { return _compReg.getBucketIdFactory(); } TestNodeStateUpdater& getStateUpdater() { return _nodeStateUpdater; } - std::shared_ptr & getDistribution() { return _compReg.getDistribution(); } + std::shared_ptr getDistribution() { return _compReg.getDistribution(); } TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; } uint16_t getIndex() const { return _compReg.getIndex(); } const NodeIdentity& node_identity() const noexcept { return _node_identity; } diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 8982b02f2b7..66fda885df7 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -53,6 +53,7 @@ struct ChangedBucketOwnershipHandlerTest : Test { void applyDistribution(Redundancy, NodeCount); void applyClusterState(const lib::ClusterState&); + void apply_cluster_state_bundle(std::shared_ptr); document::BucketId nextOwnedBucket( uint16_t wantedOwner, @@ -84,6 +85,21 @@ struct ChangedBucketOwnershipHandlerTest : Test { return lib::ClusterState("distributor:4 storage:1 .0.s:d"); } + static std::shared_ptr make_distr_bundle(uint16_t node_count) { + return lib::DistributionConfigBundle::of(lib::Distribution::getDefaultDistributionConfig(1, node_count)); + } + + static std::shared_ptr make_state_bundle_with_config( + vespalib::stringref state_str, uint16_t node_count) + { + return std::make_shared( + std::make_shared(state_str), + lib::ClusterStateBundle::BucketSpaceStateMapping{}, + std::nullopt, + make_distr_bundle(node_count), + false); + } + void SetUp() override; }; @@ -194,6 +210,7 @@ hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) { void ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount) { + // TODO set distribution via state bundle instead _app->setDistribution(redundancy, nodeCount); _handler->storageDistributionChanged(); } @@ -205,6 +222,13 @@ ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& st _handler->reloadClusterState(); } +void +ChangedBucketOwnershipHandlerTest::apply_cluster_state_bundle(std::shared_ptr state_bundle) +{ + _app->set_cluster_state_bundle(std::move(state_bundle)); + _handler->reloadClusterState(); +} + TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed_nodes) { lib::ClusterState stateBefore("distributor:4 storage:1"); applyDistribution(Redundancy(1), NodeCount(4)); @@ -225,7 +249,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed EXPECT_TRUE(hasAbortedNoneOf(cmd, node2Buckets)); // Handler must swallow abort replies - _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release())); + _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply())); EXPECT_EQ(size_t(0), _top->getNumReplies()); } @@ -312,7 +336,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed EXPECT_TRUE(hasAbortedNoneOf(cmd, node2Buckets)); // Handler must swallow abort replies - _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release())); + _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply())); EXPECT_EQ(0, _top->getNumReplies()); } @@ -345,6 +369,32 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own sendAndExpectAbortedCreateBucket(2); } +TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_via_state_bundle_change_updates_ownership) { + apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:3 storage:1", 3)); + // Apply new distribution config containing only 1 distributor, meaning + // any messages sent from >1 must be aborted. + // This test case is a bit dodgy since the CC should never send a state with more nodes in it than + // the distribution config allows for when _it_ is responsible for also sending the config. + apply_cluster_state_bundle(make_state_bundle_with_config("version:3 distributor:3 storage:1", 1)); + sendAndExpectAbortedCreateBucket(2); +} + +TEST_F(ChangedBucketOwnershipHandlerTest, ignore_internal_config_once_state_bundle_with_config_received) { + apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:1 storage:3", 1)); + applyDistribution(Redundancy(1), NodeCount(3)); + // Bundle config says 1 node, internal config says 3. Trust the bundle(tm). + sendAndExpectAbortedCreateBucket(2); +} + +TEST_F(ChangedBucketOwnershipHandlerTest, revert_to_internal_config_if_distribution_no_longer_received_in_state_bundle) { + apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:1 storage:3", 3)); + + applyDistribution(Redundancy(1), NodeCount(1)); // not yet used + applyClusterState(lib::ClusterState("version:3 distributor:3 storage:3")); // no bundle config; revert to internal + + sendAndExpectAbortedCreateBucket(2); +} + /** * Generate and dispatch a message of the given type with the provided * arguments as if that message was sent from distributor 1. Messages will diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 79246cb3ce1..90f13850072 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include using storage::lib::NodeState; @@ -38,6 +39,23 @@ struct StateManagerTest : Test, NodeStateReporter { return cmd; } + static std::shared_ptr make_state_bundle_with_config( + vespalib::stringref state_str, uint16_t num_nodes) + { + auto state = std::make_shared(state_str); + auto distr = lib::DistributionConfigBundle::of(lib::Distribution::getDefaultDistributionConfig(1, num_nodes)); + return std::make_shared(std::move(state), + lib::ClusterStateBundle::BucketSpaceStateMapping{}, + std::nullopt, std::move(distr), false); + } + + + static std::shared_ptr make_set_state_cmd_with_config( + vespalib::stringref state_str, uint16_t num_nodes) + { + return std::make_shared(make_state_bundle_with_config(state_str, num_nodes)); + } + void get_single_reply(std::shared_ptr& reply_out); void get_only_ok_reply(std::shared_ptr& reply_out); void force_current_cluster_state_version(uint32_t version, uint16_t cc_index); @@ -56,6 +74,10 @@ struct StateManagerTest : Test, NodeStateReporter { void report(vespalib::JsonStream &) const override {} void extract_cluster_state_version_from_host_info(uint32_t& version_out); + + static vespalib::string to_string(const lib::Distribution::DistributionConfig& cfg) { + return lib::Distribution(cfg).serialized(); + } }; StateManagerTest::StateManagerTest() @@ -148,26 +170,107 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version version_out = clusterStateVersionCursor.asLong(); } -TEST_F(StateManagerTest, cluster_state) { - std::shared_ptr reply; - // Verify initial state on startup - auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); +TEST_F(StateManagerTest, cluster_state_and_config_has_expected_values_at_bootstrap) { + auto initial_bundle = _manager->getClusterStateBundle(); + auto currentState = initial_bundle->getBaselineClusterState(); EXPECT_EQ("cluster:d", currentState->toString(false)); EXPECT_EQ(currentState->getVersion(), 0); + // Distribution config should be equal to the config the node is running with. + ASSERT_TRUE(initial_bundle->has_distribution_config()); + EXPECT_EQ(to_string(initial_bundle->distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); + auto currentNodeState = _manager->getCurrentNodeState(); EXPECT_EQ("s:d", currentNodeState->toString(false)); +} + +TEST_F(StateManagerTest, can_receive_state_bundle_without_distribution_config) { + ClusterState send_state("version:2 distributor:1 storage:4 .2.s:m"); + auto cmd = std::make_shared(send_state); + _upper->sendDown(cmd); + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(send_state, *current_bundle->getBaselineClusterState()); + // Distribution config should be unchanged from boostrap. + ASSERT_TRUE(current_bundle->has_distribution_config()); + EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); + + auto current_node_state = _manager->getCurrentNodeState(); + EXPECT_EQ("s:m", current_node_state->toString(false)); +} + +TEST_F(StateManagerTest, can_receive_state_bundle_with_distribution_config) { + auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5); + EXPECT_NE(to_string(cmd->getClusterStateBundle().distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); + _upper->sendDown(cmd); + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, cmd->getClusterStateBundle()); // also compares distribution configs +} - ClusterState sendState("storage:4 .2.s:m"); - auto cmd = std::make_shared(sendState); +TEST_F(StateManagerTest, receiving_cc_bundle_with_distribution_config_disables_node_distribution_config_propagation) { + auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5); _upper->sendDown(cmd); + std::shared_ptr reply; ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + // Explicitly setting distribution config should not propagate to the active state bundle + // since we've flipped to expecting config from the cluster controllers instead. + auto distr = std::make_shared(lib::Distribution::getDefaultDistributionConfig(2, 7)); + _node->getComponentRegister().setDistribution(distr); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, cmd->getClusterStateBundle()); // unchanged +} + +TEST_F(StateManagerTest, internal_distribution_config_is_propagated_if_none_yet_received_from_cc) { + _upper->sendDown(make_set_state_cmd("version:10 distributor:1 storage:4", 0)); + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto expected_bundle = make_state_bundle_with_config("version:10 distributor:1 storage:4", 7); + // Explicitly set internal config + _node->getComponentRegister().setDistribution(expected_bundle->distribution_config_bundle()->default_distribution_sp()); + _manager->storageDistributionChanged(); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, *expected_bundle); +} + +TEST_F(StateManagerTest, revert_to_internal_config_if_cc_no_longer_sends_distribution_config) { + // Initial state bundle _with_ distribution config + auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5); + _upper->sendDown(cmd); + std::shared_ptr reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()), + to_string(cmd->getClusterStateBundle().distribution_config_bundle()->config())); + + // CC then sends a new bundle _without_ config + _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:4", 0)); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + // Config implicitly reverted to the active internal config + current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); - currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); - EXPECT_EQ(sendState, *currentState); + // Explicitly set internal config + auto expected_bundle = make_state_bundle_with_config("version:3 distributor:1 storage:4", 7); + _node->getComponentRegister().setDistribution(expected_bundle->distribution_config_bundle()->default_distribution_sp()); + _manager->storageDistributionChanged(); - currentNodeState = _manager->getCurrentNodeState(); - EXPECT_EQ("s:m", currentNodeState->toString(false)); + // Internal config shall have taken effect, overriding that of the initial bundle + current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, *expected_bundle); } TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) { diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 1f20a19ec51..280afe8fb91 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -526,12 +526,12 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac using RBISP = std::shared_ptr; std::map requests; - // TODO fetch distribution from bundle as well - auto distribution(_component.getBucketSpaceRepo().get(bucketSpace).getDistribution()); - auto clusterStateBundle(_component.getStateUpdater().getClusterStateBundle()); - assert(clusterStateBundle); - lib::ClusterState::CSP clusterState(clusterStateBundle->getDerivedClusterState(bucketSpace)); - assert(clusterState.get()); + auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); + assert(clusterStateBundle && clusterStateBundle->has_distribution_config()); + auto clusterState = clusterStateBundle->getDerivedClusterState(bucketSpace); + assert(clusterState); + const auto distribution = clusterStateBundle->bucket_space_distribution_or_nullptr(bucketSpace); + assert(distribution); const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash(); diff --git a/storage/src/vespa/storage/common/content_bucket_space.cpp b/storage/src/vespa/storage/common/content_bucket_space.cpp index 92b5257b991..d66219794c7 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.cpp +++ b/storage/src/vespa/storage/common/content_bucket_space.cpp @@ -10,6 +10,7 @@ ClusterStateAndDistribution::ClusterStateAndDistribution( : _cluster_state(std::move(cluster_state)), _distribution(std::move(distribution)) { + assert(_cluster_state && _distribution); } ClusterStateAndDistribution::~ClusterStateAndDistribution() = default; @@ -48,34 +49,6 @@ ContentBucketSpace::state_and_distribution() const noexcept { return _state_and_distribution; } -void -ContentBucketSpace::setClusterState(std::shared_ptr clusterState) -{ - std::lock_guard guard(_lock); - _state_and_distribution = _state_and_distribution->with_new_state(std::move(clusterState)); -} - -std::shared_ptr -ContentBucketSpace::getClusterState() const -{ - std::lock_guard guard(_lock); - return _state_and_distribution->_cluster_state; -} - -void -ContentBucketSpace::setDistribution(std::shared_ptr distribution) -{ - std::lock_guard guard(_lock); - _state_and_distribution = _state_and_distribution->with_new_distribution(std::move(distribution)); -} - -std::shared_ptr -ContentBucketSpace::getDistribution() const -{ - std::lock_guard guard(_lock); - return _state_and_distribution->_distribution; -} - bool ContentBucketSpace::getNodeUpInLastNodeStateSeenByProvider() const { diff --git a/storage/src/vespa/storage/common/content_bucket_space.h b/storage/src/vespa/storage/common/content_bucket_space.h index eb48640c97b..67ee4209d35 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.h +++ b/storage/src/vespa/storage/common/content_bucket_space.h @@ -54,14 +54,6 @@ public: void set_state_and_distribution(std::shared_ptr state_and_distr) noexcept; [[nodiscard]] std::shared_ptr state_and_distribution() const noexcept; - // TODO deprecate; only use atomic state+distribution setter - void setClusterState(std::shared_ptr clusterState); - // TODO deprecate; only use atomic state+distribution getter - std::shared_ptr getClusterState() const; - // TODO deprecate; only use atomic state+distribution setter - void setDistribution(std::shared_ptr distribution); - // TODO deprecate; only use atomic state+distribution getter - std::shared_ptr getDistribution() const; bool getNodeUpInLastNodeStateSeenByProvider() const; void setNodeUpInLastNodeStateSeenByProvider(bool nodeUpInLastNodeStateSeenByProvider); diff --git a/storage/src/vespa/storage/common/storagecomponent.h b/storage/src/vespa/storage/common/storagecomponent.h index 677d5652fe3..ba12fc666a9 100644 --- a/storage/src/vespa/storage/common/storagecomponent.h +++ b/storage/src/vespa/storage/common/storagecomponent.h @@ -61,7 +61,7 @@ public: const std::shared_ptr fieldSetRepo; }; using UP = std::unique_ptr; - using DistributionSP = std::shared_ptr; + using DistributionSP = std::shared_ptr; /** * Node type is supposed to be set immediately, and never be updated. diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp index ab1cbf0b4d7..4d1daa329fd 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp @@ -23,11 +23,9 @@ ServiceLayerComponentRegisterImpl::registerServiceLayerComponent(ServiceLayerMan } void -ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr distribution) +ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr distribution) { - _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); - auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); - _bucketSpaceRepo.get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); + // TODO remove this override entirely? StorageComponentRegisterImpl::setDistribution(distribution); } diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h index 1589192b92e..3478c0b3b9c 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h @@ -34,7 +34,7 @@ public: } void registerServiceLayerComponent(ServiceLayerManagedComponent&) override; - void setDistribution(std::shared_ptr distribution) override; + void setDistribution(std::shared_ptr distribution) override; }; } // storage diff --git a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp index bd0853a3524..ab1d80dc0b9 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp +++ b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp @@ -91,7 +91,7 @@ StorageComponentRegisterImpl::setBucketIdFactory(const document::BucketIdFactory } void -StorageComponentRegisterImpl::setDistribution(std::shared_ptr distribution) +StorageComponentRegisterImpl::setDistribution(std::shared_ptr distribution) { std::lock_guard lock(_componentLock); _distribution = distribution; diff --git a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h index abb60051fe1..271b38399b6 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h +++ b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h @@ -21,16 +21,16 @@ class StorageComponentRegisterImpl { using BucketspacesConfig = vespa::config::content::core::internal::InternalBucketspacesType; - std::mutex _componentLock; - std::vector _components; - vespalib::string _clusterName; - const lib::NodeType* _nodeType; - uint16_t _index; + std::mutex _componentLock; + std::vector _components; + vespalib::string _clusterName; + const lib::NodeType* _nodeType; + uint16_t _index; std::shared_ptr _docTypeRepo; - document::BucketIdFactory _bucketIdFactory; - std::shared_ptr _distribution; - NodeStateUpdater* _nodeStateUpdater; - BucketspacesConfig _bucketSpacesConfig; + document::BucketIdFactory _bucketIdFactory; + std::shared_ptr _distribution; + NodeStateUpdater* _nodeStateUpdater; + BucketspacesConfig _bucketSpacesConfig; public: using UP = std::unique_ptr; @@ -38,12 +38,12 @@ public: StorageComponentRegisterImpl(); ~StorageComponentRegisterImpl() override; - const lib::NodeType& getNodeType() const { return *_nodeType; } - uint16_t getIndex() const { return _index; } - std::shared_ptr getTypeRepo() { return _docTypeRepo; } - const document::BucketIdFactory& getBucketIdFactory() { return _bucketIdFactory; } - std::shared_ptr & getDistribution() { return _distribution; } - NodeStateUpdater& getNodeStateUpdater() { return *_nodeStateUpdater; } + [[nodiscard]] const lib::NodeType& getNodeType() const noexcept { return *_nodeType; } + [[nodiscard]] uint16_t getIndex() const noexcept { return _index; } + [[nodiscard]] std::shared_ptr getTypeRepo() const noexcept { return _docTypeRepo; } + [[nodiscard]] const document::BucketIdFactory& getBucketIdFactory() const noexcept { return _bucketIdFactory; } + [[nodiscard]] const std::shared_ptr& getDistribution() const noexcept { return _distribution; } + [[nodiscard]] NodeStateUpdater& getNodeStateUpdater() noexcept { return *_nodeStateUpdater; } void registerStorageComponent(StorageComponent&) override; @@ -51,7 +51,7 @@ public: virtual void setNodeStateUpdater(NodeStateUpdater& updater); virtual void setDocumentTypeRepo(std::shared_ptr); virtual void setBucketIdFactory(const document::BucketIdFactory&); - virtual void setDistribution(std::shared_ptr); + virtual void setDistribution(std::shared_ptr); virtual void setBucketSpacesConfig(const BucketspacesConfig&); }; diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp index d5de11c7d6f..f571f6272b8 100644 --- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp +++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp @@ -16,13 +16,14 @@ using document::BucketSpace; namespace storage { uint16_t -BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const +BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket& bucket) const { try { - // TODO use state updater bundle for everything? - auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution()); - const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - const auto &clusterState = *clusterStateBundle->getDerivedClusterState(bucket.getBucketSpace()); + const auto state_bundle = _component.getStateUpdater().getClusterStateBundle(); + assert(state_bundle && state_bundle->has_distribution_config()); + const auto* distribution = state_bundle->distribution_config_bundle()->bucket_space_distribution_or_nullptr_raw(bucket.getBucketSpace()); + assert(distribution); + const auto& clusterState = *state_bundle->getDerivedClusterState(bucket.getBucketSpace()); return (distribution->getIdealDistributorNode(clusterState, bucket.getBucketId())); // If we get exceptions there aren't any distributors, so they'll have // to explicitly fetch all bucket info eventually anyway. @@ -39,7 +40,7 @@ BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bu } bool -BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket &bucket) const +BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket& bucket) const { return (distributor == getOwnerDistributorForBucket(bucket)); } @@ -47,7 +48,7 @@ BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::B void BucketOwnershipNotifier::sendNotifyBucketToDistributor( uint16_t distributorIndex, - const document::Bucket &bucket, + const document::Bucket& bucket, const api::BucketInfo& infoToSend) { if (!infoToSend.valid()) { @@ -71,7 +72,7 @@ BucketOwnershipNotifier::sendNotifyBucketToDistributor( } void -BucketOwnershipNotifier::logNotification(const document::Bucket &bucket, +BucketOwnershipNotifier::logNotification(const document::Bucket& bucket, uint16_t sourceIndex, uint16_t currentOwnerIndex, const api::BucketInfo& newInfo) @@ -88,7 +89,7 @@ BucketOwnershipNotifier::logNotification(const document::Bucket &bucket, void BucketOwnershipNotifier::notifyIfOwnershipChanged( - const document::Bucket &bucket, + const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend) { @@ -111,7 +112,7 @@ BucketOwnershipNotifier::notifyIfOwnershipChanged( void BucketOwnershipNotifier::sendNotifyBucketToCurrentOwner( - const document::Bucket &bucket, + const document::Bucket& bucket, const api::BucketInfo& infoToSend) { uint16_t distributor(getOwnerDistributorForBucket(bucket)); @@ -134,7 +135,7 @@ NotificationGuard::~NotificationGuard() } void -NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket, +NotificationGuard::notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend) { @@ -142,7 +143,7 @@ NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket, } void -NotificationGuard::notifyAlways(const document::Bucket &bucket, +NotificationGuard::notifyAlways(const document::Bucket& bucket, const api::BucketInfo& infoToSend) { BucketToCheck bc(bucket, 0xffff, infoToSend); diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h index fc3d9209f5f..93a2c1a2104 100644 --- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h +++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h @@ -11,29 +11,29 @@ namespace storage { class BucketOwnershipNotifier { - const ServiceLayerComponent & _component; - MessageSender & _sender; + const ServiceLayerComponent& _component; + MessageSender& _sender; public: BucketOwnershipNotifier(const ServiceLayerComponent& component, MessageSender& sender) : _component(component), _sender(sender) {} - bool distributorOwns(uint16_t distributor, const document::Bucket &bucket) const; - void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); - void sendNotifyBucketToCurrentOwner(const document::Bucket &bucket, const api::BucketInfo& infoToSend); + [[nodiscard]] bool distributorOwns(uint16_t distributor, const document::Bucket& bucket) const; + void notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); + void sendNotifyBucketToCurrentOwner(const document::Bucket& bucket, const api::BucketInfo& infoToSend); private: enum IndexMeta { FAILED_TO_RESOLVE = 0xffff }; - void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket &bucket, + void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket& bucket, const api::BucketInfo& infoToSend); // Returns either index or FAILED_TO_RESOLVE uint16_t getOwnerDistributorForBucket(const document::Bucket &bucket) const; - void logNotification(const document::Bucket &bucket, uint16_t sourceIndex, + void logNotification(const document::Bucket& bucket, uint16_t sourceIndex, uint16_t currentOwnerIndex, const api::BucketInfo& newInfo); }; @@ -60,7 +60,7 @@ class NotificationGuard BucketOwnershipNotifier& _notifier; std::vector _bucketsToCheck; public: - NotificationGuard(BucketOwnershipNotifier& notifier) + explicit NotificationGuard(BucketOwnershipNotifier& notifier) : _notifier(notifier), _bucketsToCheck() {} @@ -69,8 +69,8 @@ public: ~NotificationGuard(); - void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); - void notifyAlways(const document::Bucket &bucket, const api::BucketInfo& infoToSend); + void notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); + void notifyAlways(const document::Bucket& bucket, const api::BucketInfo& infoToSend); }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 495497d507d..e44490388e2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -887,9 +887,9 @@ bool FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept { for (const auto& elem : _component.getBucketSpaceRepo()) { - const ContentBucketSpace& bucket_space = *elem.second; - auto derived_cluster_state = bucket_space.getClusterState(); - if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) { + const auto space_state_and_distr = elem.second->state_and_distribution(); + const auto& derived_cluster_state = space_state_and_distr->cluster_state(); + if (!derived_cluster_state.getNodeState(node).getState().oneOf("m")) { return false; } } @@ -915,8 +915,7 @@ FileStorManager::maybe_log_received_cluster_state() { if (LOG_WOULD_LOG(debug)) { auto cluster_state_bundle = _component.getStateUpdater().getClusterStateBundle(); - auto baseline_state = cluster_state_bundle->getBaselineClusterState(); - LOG(debug, "FileStorManager received baseline cluster state '%s'", baseline_state->toString().c_str()); + LOG(debug, "FileStorManager received baseline cluster state '%s'", cluster_state_bundle->toString().c_str()); } } @@ -951,16 +950,17 @@ FileStorManager::updateState() void FileStorManager::storageDistributionChanged() { - updateState(); } void FileStorManager::propagateClusterStates() { auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - for (const auto &elem : _component.getBucketSpaceRepo()) { - // TODO also distribution! bundle and repo must be 1-1 - elem.second->setClusterState(clusterStateBundle->getDerivedClusterState(elem.first)); + assert(clusterStateBundle->has_distribution_config()); + for (const auto& elem : _component.getBucketSpaceRepo()) { + elem.second->set_state_and_distribution(std::make_shared( + clusterStateBundle->getDerivedClusterState(elem.first), + clusterStateBundle->bucket_space_distribution_or_nullptr(elem.first))); } } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index 25829f3d391..8702cb1272b 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -31,10 +31,11 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( _stateLock(), _currentState(), // Not set yet, so ownership will not be valid _currentOwnership(std::make_shared( - _component.getBucketSpaceRepo(), _currentState)), + _currentState, lib::DistributionConfigBundle::of(_component.getDistribution()))), _abortQueuedAndPendingOnStateChange(false), _abortMutatingIdealStateOps(false), - _abortMutatingExternalLoadOps(false) + _abortMutatingExternalLoadOps(false), + _receiving_distribution_config_from_cc(false) { on_configure(bootstrap_config); _component.registerMetric(_metrics); @@ -61,16 +62,29 @@ ChangedBucketOwnershipHandler::reloadClusterState() { std::lock_guard guard(_stateLock); const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - setCurrentOwnershipWithStateNoLock(*clusterStateBundle); + setCurrentOwnershipWithStateNoLock(std::move(clusterStateBundle)); } void -ChangedBucketOwnershipHandler::setCurrentOwnershipWithStateNoLock( - const lib::ClusterStateBundle& newState) +ChangedBucketOwnershipHandler::setCurrentOwnershipWithStateNoLock(std::shared_ptr new_state) { - _currentState = std::make_shared(newState); - _currentOwnership = std::make_shared( - _component.getBucketSpaceRepo(), _currentState); + LOG(debug, "Setting new ownership state bundle: %s", new_state->toString().c_str()); + std::shared_ptr distributions; + _currentState = std::move(new_state); + // This partially duplicates distribution config fallback logic from StateManager, but that's because + // this component has the same approach to intercepting both state commands and distribution config + // changes. + // `new_state` can come straight from a SetSystemStateCommand, which may or may not have a state bundle + // with distribution config strapped to it. + if (_currentState->has_distribution_config()) { + distributions = _currentState->distribution_config_bundle(); + } else { + distributions = lib::DistributionConfigBundle::of(_component.getDistribution()); + LOG(debug, "No distribution config in bundle; using current host config of '%s'", + distributions->default_distribution().getNodeGraph().getDistributionConfigHash().c_str()); + } + _receiving_distribution_config_from_cc = _currentState->has_distribution_config(); + _currentOwnership = std::make_shared(_currentState, std::move(distributions)); } namespace { @@ -98,17 +112,11 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner) {} ChangedBucketOwnershipHandler::Metrics::~Metrics() = default; -ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo, - std::shared_ptr state) - : _distributions(), - _state(std::move(state)) +ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(std::shared_ptr state, + std::shared_ptr distributions) + : _state(std::move(state)), + _distributions(std::move(distributions)) { - for (const auto &elem : contentBucketSpaceRepo) { - auto distribution = elem.second->getDistribution(); - if (distribution) { - _distributions.emplace(elem.first, std::move(distribution)); - } - } } @@ -123,19 +131,15 @@ ChangedBucketOwnershipHandler::OwnershipState::getBaselineState() const } uint16_t -ChangedBucketOwnershipHandler::OwnershipState::ownerOf( - const document::Bucket& bucket) const +ChangedBucketOwnershipHandler::OwnershipState::ownerOf(const document::Bucket& bucket) const { - auto distributionItr = _distributions.find(bucket.getBucketSpace()); - assert(distributionItr != _distributions.end()); - const auto &distribution = *distributionItr->second; - const auto &derivedState = *_state->getDerivedClusterState(bucket.getBucketSpace()); + const auto* distribution = _distributions->bucket_space_distribution_or_nullptr_raw(bucket.getBucketSpace()); + assert(distribution); + const auto& derivedState = *_state->getDerivedClusterState(bucket.getBucketSpace()); try { - return distribution.getIdealDistributorNode(derivedState, bucket.getBucketId()); + return distribution->getIdealDistributorNode(derivedState, bucket.getBucketId()); } catch (lib::TooFewBucketBitsInUseException& e) { - LOGBP(debug, - "Too few bucket bits used for %s to be assigned to " - "a distributor.", + LOGBP(debug, "Too few bucket bits used for %s to be assigned to a distributor.", bucket.toString().c_str()); } catch (lib::NoDistributorsAvailableException& e) { LOGBP(warning, @@ -144,11 +148,9 @@ ChangedBucketOwnershipHandler::OwnershipState::ownerOf( "for available distributors before reaching this code path! " "Cluster state is '%s', distribution is '%s'", derivedState.toString().c_str(), - distribution.toString().c_str()); + distribution->toString().c_str()); } catch (const std::exception& e) { - LOG(error, - "Got unknown exception while resolving distributor: %s", - e.what()); + LOG(error, "Got unknown exception while resolving distributor: %s", e.what()); } return FAILED_TO_RESOLVE; } @@ -162,9 +164,7 @@ ChangedBucketOwnershipHandler::OwnershipState::storageNodeUp(document::BucketSpa } void -ChangedBucketOwnershipHandler::logTransition( - const lib::ClusterState& currentState, - const lib::ClusterState& newState) const +ChangedBucketOwnershipHandler::logTransition(const lib::ClusterState& currentState, const lib::ClusterState& newState) { LOG(debug, "State transition '%s' -> '%s' changes distributor bucket ownership, " @@ -269,7 +269,7 @@ public: { std::lock_guard guard(_owner._stateLock); old_ownership = _owner._currentOwnership; - _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle()); + _owner.setCurrentOwnershipWithStateNoLock(_command->cluster_state_bundle_ptr()); new_ownership = _owner._currentOwnership; } assert(new_ownership->valid()); @@ -287,7 +287,7 @@ public: new_ownership->getBaselineState().toString().c_str()); return _owner.sendDown(_command);; } - _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState()); + logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState()); metrics::MetricTimer duration_timer; auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership); @@ -327,17 +327,19 @@ ChangedBucketOwnershipHandler::onSetSystemState( * Invoked whenever a distribution config change happens and is called in the * context of the config updater thread (which is why we have to lock). */ + // TODO remove this when there are no more state bundles without distribution config void ChangedBucketOwnershipHandler::storageDistributionChanged() { std::lock_guard guard(_stateLock); - _currentOwnership = std::make_shared( - _component.getBucketSpaceRepo(), _currentState); + if (!_receiving_distribution_config_from_cc) { + _currentOwnership = std::make_shared( + _currentState, lib::DistributionConfigBundle::of(_component.getDistribution())); + } } bool -ChangedBucketOwnershipHandler::isMutatingIdealStateOperation( - const api::StorageMessage& msg) const +ChangedBucketOwnershipHandler::isMutatingIdealStateOperation(const api::StorageMessage& msg) { switch (msg.getType().getId()) { case api::MessageType::CREATEBUCKET_ID: @@ -357,8 +359,7 @@ ChangedBucketOwnershipHandler::isMutatingIdealStateOperation( bool -ChangedBucketOwnershipHandler::isMutatingExternalOperation( - const api::StorageMessage& msg) const +ChangedBucketOwnershipHandler::isMutatingExternalOperation(const api::StorageMessage& msg) { switch (msg.getType().getId()) { case api::MessageType::PUT_ID: @@ -418,8 +419,7 @@ ChangedBucketOwnershipHandler::abortOperation(api::StorageCommand& cmd) } bool -ChangedBucketOwnershipHandler::isMutatingCommandAndNeedsChecking( - const api::StorageMessage& msg) const +ChangedBucketOwnershipHandler::isMutatingCommandAndNeedsChecking(const api::StorageMessage& msg) const { if (enabledIdealStateAborting() && isMutatingIdealStateOperation(msg)) { return true; diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h index 801534385f7..2c32704d64b 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h @@ -25,6 +25,7 @@ namespace lib { class ClusterState; class ClusterStateBundle; class Distribution; + class DistributionConfigBundle; } /** @@ -77,20 +78,20 @@ public: */ class OwnershipState { using BucketSpace = document::BucketSpace; - std::unordered_map, BucketSpace::hash> _distributions; std::shared_ptr _state; + std::shared_ptr _distributions; public: using SP = std::shared_ptr; using CSP = std::shared_ptr; - OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo, - std::shared_ptr state); + OwnershipState(std::shared_ptr state, + std::shared_ptr distributions); ~OwnershipState(); static const uint16_t FAILED_TO_RESOLVE = 0xffff; [[nodiscard]] bool valid() const noexcept { - return (!_distributions.empty() && _state); + return (_distributions && _state); } /** @@ -124,20 +125,21 @@ private: std::atomic _abortQueuedAndPendingOnStateChange; std::atomic _abortMutatingIdealStateOps; std::atomic _abortMutatingExternalLoadOps; + bool _receiving_distribution_config_from_cc; std::unique_ptr makeLazyAbortPredicate( const OwnershipState::CSP& oldOwnership, const OwnershipState::CSP& newOwnership) const; - void logTransition(const lib::ClusterState& currentState, - const lib::ClusterState& newState) const; + static void logTransition(const lib::ClusterState& currentState, + const lib::ClusterState& newState); /** * Creates a new immutable OwnershipState based on the current distribution * and the provided cluster state and assigns it to _currentOwnership. */ - void setCurrentOwnershipWithStateNoLock(const lib::ClusterStateBundle&); + void setCurrentOwnershipWithStateNoLock(std::shared_ptr); /** * Grabs _stateLock and returns a shared_ptr to the current ownership @@ -147,9 +149,9 @@ private: bool isMutatingCommandAndNeedsChecking(const api::StorageMessage&) const; - bool isMutatingIdealStateOperation(const api::StorageMessage&) const; + static bool isMutatingIdealStateOperation(const api::StorageMessage&); - bool isMutatingExternalOperation(const api::StorageMessage&) const; + static bool isMutatingExternalOperation(const api::StorageMessage&); /** * Returns whether the operation in cmd has a bucket whose ownership in * the current cluster state does not match the distributor marked as diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index a2106dce8d2..560982fb6c0 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -29,7 +29,18 @@ using vespalib::make_string_short::fmt; namespace storage { namespace { - constexpr vespalib::duration MAX_TIMEOUT = 600s; +constexpr vespalib::duration MAX_TIMEOUT = 600s; + +[[nodiscard]] std::shared_ptr +make_bootstrap_state_bundle(std::shared_ptr config) { + return std::make_shared( + std::make_shared(), + lib::ClusterStateBundle::BucketSpaceStateMapping{}, + std::nullopt, + lib::DistributionConfigBundle::of(std::move(config)), + false); +} + } struct StateManager::StateManagerMetrics : metrics::MetricSet { @@ -62,7 +73,8 @@ StateManager::StateManager(StorageComponentRegister& compReg, _listenerLock(), _nodeState(std::make_shared(_component.getNodeType(), lib::State::DOWN)), _nextNodeState(), - _systemState(std::make_shared(lib::ClusterState())), + _configured_distribution(_component.getDistribution()), + _systemState(make_bootstrap_state_bundle(_configured_distribution)), _nextSystemState(), _reported_host_info_cluster_state_version(0), _stateListeners(), @@ -81,6 +93,7 @@ StateManager::StateManager(StorageComponentRegister& compReg, _noThreadTestMode(testMode), _grabbedExternalLock(false), _require_strictly_increasing_cluster_state_versions(false), + _receiving_distribution_config_from_cc(false), _notifyingListeners(false), _requested_almost_immediate_node_state_replies(false) { @@ -170,8 +183,7 @@ lib::NodeState::CSP StateManager::getCurrentNodeState() const { std::lock_guard lock(_stateLock); - return std::make_shared - (_systemState->getBaselineClusterState()->getNodeState(thisNode())); + return std::make_shared(_systemState->getBaselineClusterState()->getNodeState(thisNode())); } std::shared_ptr @@ -181,6 +193,28 @@ StateManager::getClusterStateBundle() const return _systemState; } +// TODO remove when distribution config is only received from cluster controller +void +StateManager::storageDistributionChanged() +{ + { + std::lock_guard lock(_stateLock); + _configured_distribution = _component.getDistribution(); + if (_receiving_distribution_config_from_cc) { + return; // nothing more to do + } + // Avoid losing any pending state if this callback happens in the middle of a + // state update. This edge case is practically impossible to unit test today... + const auto patch_state = _nextSystemState ? _nextSystemState : _systemState; + _nextSystemState = patch_state->clone_with_new_distribution( + lib::DistributionConfigBundle::of(_configured_distribution)); + } + // We've assembled a new state bundle based on the (non-distribution carrying) state + // bundle from the cluster controller and our own internal config. Propagate it as one + // unit to the internal components. + notifyStateListeners(); +} + void StateManager::addStateListener(StateListener& listener) { @@ -316,12 +350,12 @@ using BucketSpaceToTransitionString = std::unordered_map; void -considerInsertDerivedTransition(const lib::State ¤tBaseline, - const lib::State &newBaseline, - const lib::State ¤tDerived, - const lib::State &newDerived, - const document::BucketSpace &bucketSpace, - BucketSpaceToTransitionString &transitions) +considerInsertDerivedTransition(const lib::State& currentBaseline, + const lib::State& newBaseline, + const lib::State& currentDerived, + const lib::State& newDerived, + const document::BucketSpace& bucketSpace, + BucketSpaceToTransitionString& transitions) { bool considerDerivedTransition = ((currentDerived != newDerived) && ((currentDerived != currentBaseline) || (newDerived != newBaseline))); @@ -333,28 +367,28 @@ considerInsertDerivedTransition(const lib::State ¤tBaseline, } BucketSpaceToTransitionString -calculateDerivedClusterStateTransitions(const ClusterStateBundle ¤tState, - const ClusterStateBundle &newState, +calculateDerivedClusterStateTransitions(const ClusterStateBundle& currentState, + const ClusterStateBundle& newState, const lib::Node node) { BucketSpaceToTransitionString result; - const lib::State ¤tBaseline = currentState.getBaselineClusterState()->getNodeState(node).getState(); - const lib::State &newBaseline = newState.getBaselineClusterState()->getNodeState(node).getState(); - for (const auto &entry : currentState.getDerivedClusterStates()) { - const lib::State ¤tDerived = entry.second->getNodeState(node).getState(); - const lib::State &newDerived = newState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); + const lib::State& currentBaseline = currentState.getBaselineClusterState()->getNodeState(node).getState(); + const lib::State& newBaseline = newState.getBaselineClusterState()->getNodeState(node).getState(); + for (const auto& entry : currentState.getDerivedClusterStates()) { + const lib::State& currentDerived = entry.second->getNodeState(node).getState(); + const lib::State& newDerived = newState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); considerInsertDerivedTransition(currentBaseline, newBaseline, currentDerived, newDerived, entry.first, result); } - for (const auto &entry : newState.getDerivedClusterStates()) { - const lib::State &newDerived = entry.second->getNodeState(node).getState(); - const lib::State ¤tDerived = currentState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); + for (const auto& entry : newState.getDerivedClusterStates()) { + const lib::State& newDerived = entry.second->getNodeState(node).getState(); + const lib::State& currentDerived = currentState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); considerInsertDerivedTransition(currentBaseline, newBaseline, currentDerived, newDerived, entry.first, result); } return result; } vespalib::string -transitionsToString(const BucketSpaceToTransitionString &transitions) +transitionsToString(const BucketSpaceToTransitionString& transitions) { if (transitions.empty()) { return ""; @@ -362,7 +396,7 @@ transitionsToString(const BucketSpaceToTransitionString &transitions) vespalib::asciistream stream; stream << "["; bool first = true; - for (const auto &entry : transitions) { + for (const auto& entry : transitions) { if (!first) { stream << ", "; } @@ -435,7 +469,7 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) } void -StateManager::mark_controller_as_having_observed_explicit_node_state(const std::unique_lock &, uint16_t controller_index) { +StateManager::mark_controller_as_having_observed_explicit_node_state(const std::unique_lock&, uint16_t controller_index) { _controllers_observed_explicit_node_state.emplace(controller_index); } @@ -483,7 +517,16 @@ StateManager::try_set_cluster_state_bundle(std::shared_ptrhas_distribution_config(); + if (!c->has_distribution_config()) { + LOG(debug, "Next state bundle '%s' does not have distribution config; patching in existing config '%s'", + c->toString().c_str(), _configured_distribution->getNodeGraph().getDistributionConfigHash().c_str()); + _nextSystemState = c->clone_with_new_distribution(lib::DistributionConfigBundle::of(_configured_distribution)); + } else { + LOG(debug, "Next state bundle is '%s'", c->toString().c_str()); + // TODO print what's changed in distribution config? + _nextSystemState = std::move(c); + } } notifyStateListeners(); return std::nullopt; diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index d116f968731..d7a139f7ced 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -55,6 +55,7 @@ class StateManager : public NodeStateUpdater, std::mutex _listenerLock; std::shared_ptr _nodeState; std::shared_ptr _nextNodeState; + std::shared_ptr _configured_distribution; // From config system, not from CC std::shared_ptr _systemState; std::shared_ptr _nextSystemState; uint32_t _reported_host_info_cluster_state_version; @@ -78,6 +79,7 @@ class StateManager : public NodeStateUpdater, bool _noThreadTestMode; bool _grabbedExternalLock; bool _require_strictly_increasing_cluster_state_versions; + bool _receiving_distribution_config_from_cc; std::atomic _notifyingListeners; std::atomic _requested_almost_immediate_node_state_replies; @@ -102,6 +104,8 @@ public: lib::NodeState::CSP getCurrentNodeState() const override; std::shared_ptr getClusterStateBundle() const override; + void storageDistributionChanged() override; + void addStateListener(StateListener&) override; void removeStateListener(StateListener&) override; -- cgit v1.2.3