diff options
author | Tor Brede Vekterli <vekterli@vespa.ai> | 2024-06-20 09:16:44 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@vespa.ai> | 2024-06-24 14:01:26 +0000 |
commit | e7fdc8f5010e18cd5599a016156a5c202df0f8db (patch) | |
tree | ccc2915d5e6135578b73f2f421b3b2b0766df37b /storage | |
parent | efd20bf3e2556975d177ceaaf9a1908cb7239fe1 (diff) |
Handle cluster state bundle with distribution config on content node
If a cluster state bundle contains distribution config, this is
internally propagated via the `StateManager` component to all registered
state listeners. One such state listener is `FileStorManager`, which
updates the content node-internal bucket space repository.
All `SetSystemStateCommand` and internal config-aware components
(`StateManager` and `ChangedBucketOwnershipHandler`) now explicitly
track whether the cluster controller provides distribution config,
or if the internally provided config should be used (including
fallback to internal config if necessary).
Diffstat (limited to 'storage')
22 files changed, 394 insertions, 212 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index f41dae89eec..a20902422b0 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -1,9 +1,10 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <tests/common/dummystoragelink.h> +#include <tests/common/storage_config_set.h> #include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> -#include <tests/common/storage_config_set.h> +#include <vespa/config-stor-distribution.h> #include <vespa/config/helper/configgetter.hpp> #include <vespa/document/config/config-documenttypes.h> #include <vespa/document/datatype/documenttype.h> @@ -25,7 +26,6 @@ #include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/testkit/test_path.h> -#include <vespa/config-stor-distribution.h> #include <future> #include <vespa/log/log.h> @@ -116,16 +116,17 @@ public: BucketManagerTest::~BucketManagerTest() = default; -#define ASSERT_DUMMYLINK_REPLY_COUNT(link, count) \ - if (link->getNumReplies() != count) { \ - std::ostringstream ost; \ - ost << "Expected there to be " << count << " replies in link, but " \ - << "found " << link->getNumReplies() << ":\n"; \ - for (uint32_t i=0; i<link->getNumReplies(); ++i) { \ - ost << link->getReply(i)->getType() << "\n"; \ - } \ - FAIL() << ost.str(); \ +void check_dummy_link_reply_count(const DummyStorageLink& link, size_t expected_count) { + if (link.getNumReplies() != expected_count) { + std::ostringstream ost; + ost << "Expected there to be " << expected_count << " replies in link, but " + << "found " << link.getNumReplies() << ":\n"; + for (uint32_t i = 0; i < link.getNumReplies(); ++i) { + ost << link.getReply(i)->getType() << "\n"; + } + FAIL() << ost.str(); } +} void BucketManagerTest::setupTestEnvironment() { @@ -313,7 +314,7 @@ TEST_F(BucketManagerTest, DISABLED_request_bucket_info_with_state) { { LOG(info, "Waiting for response from 3 request bucket info messages"); _top->waitForMessages(3, 5); - ASSERT_DUMMYLINK_REPLY_COUNT(_top, 3); + ASSERT_NO_FATAL_FAILURE(check_dummy_link_reply_count(*_top, 3)); std::map<uint64_t, api::RequestBucketInfoReply::SP> replies; for (uint32_t i=0; i<3; ++i) { replies[_top->getReply(i)->getMsgId()] @@ -357,7 +358,7 @@ TEST_F(BucketManagerTest, request_bucket_info_with_list) { _top->sendDown(cmd); _top->waitForMessages(1, 5); - ASSERT_DUMMYLINK_REPLY_COUNT(_top, 1); + ASSERT_NO_FATAL_FAILURE(check_dummy_link_reply_count(*_top, 1)); auto reply = std::dynamic_pointer_cast<api::RequestBucketInfoReply>(_top->getReply(0)); _top->reset(); ASSERT_TRUE(reply.get()); @@ -527,7 +528,7 @@ class ConcurrentOperationFixture { public: explicit ConcurrentOperationFixture(BucketManagerTest& self) : _self(self), - _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1")) + _state(std::make_shared<lib::ClusterState>("version:2 distributor:1 storage:1")) { _self.setupTestEnvironment(); _self._top->open(); diff --git a/storage/src/tests/common/testnodestateupdater.cpp b/storage/src/tests/common/testnodestateupdater.cpp index f9671617352..5d1d7a085b9 100644 --- a/storage/src/tests/common/testnodestateupdater.cpp +++ b/storage/src/tests/common/testnodestateupdater.cpp @@ -25,17 +25,28 @@ TestNodeStateUpdater::getClusterStateBundle() const } void +TestNodeStateUpdater::patch_distribution(std::shared_ptr<const lib::Distribution> distribution) +{ + _clusterStateBundle = _clusterStateBundle->clone_with_new_distribution( + lib::DistributionConfigBundle::of(std::move(distribution))); +} + +void TestNodeStateUpdater::setClusterState(std::shared_ptr<const lib::ClusterState> c) { - setClusterStateBundle(std::make_shared<const lib::ClusterStateBundle>(*c)); + setClusterStateBundle(std::make_shared<const lib::ClusterStateBundle>(std::move(c))); } void TestNodeStateUpdater::setClusterStateBundle(std::shared_ptr<const lib::ClusterStateBundle> clusterStateBundle) { + auto existing_distr = _clusterStateBundle->distribution_config_bundle(); _clusterStateBundle = std::move(clusterStateBundle); - for (uint32_t i = 0; i < _listeners.size(); ++i) { - _listeners[i]->handleNewState(); + if (!_clusterStateBundle->has_distribution_config() && existing_distr) { + _clusterStateBundle = _clusterStateBundle->clone_with_new_distribution(existing_distr); + } + for (auto* listener : _listeners) { + listener->handleNewState(); } } diff --git a/storage/src/tests/common/testnodestateupdater.h b/storage/src/tests/common/testnodestateupdater.h index e5418c238d5..6a00e9a2264 100644 --- a/storage/src/tests/common/testnodestateupdater.h +++ b/storage/src/tests/common/testnodestateupdater.h @@ -1,11 +1,4 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::TestNodeStateUpdater - * \ingroup common - * - * \brief Test implementation of the node state updater. - */ - #pragma once #include <vespa/storage/common/nodestateupdater.h> @@ -14,6 +7,7 @@ namespace storage::lib { class ClusterState; class ClusterStateBundle; + class Distribution; } namespace storage { @@ -53,6 +47,7 @@ public: _current = std::make_shared<lib::NodeState>(state); } + void patch_distribution(std::shared_ptr<const lib::Distribution> distribution); void setClusterState(std::shared_ptr<const lib::ClusterState> c); void setClusterStateBundle(std::shared_ptr<const lib::ClusterStateBundle> clusterStateBundle); diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp index d811f100aec..b2b82a46850 100644 --- a/storage/src/tests/common/teststorageapp.cpp +++ b/storage/src/tests/common/teststorageapp.cpp @@ -6,10 +6,10 @@ #include <vespa/storage/config/config-stor-distributormanager.h> #include <vespa/storage/config/config-stor-visitordispatcher.h> #include <vespa/config-stor-distribution.h> -#include <vespa/config-fleetcontroller.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> +#include <vespa/vdslib/state/cluster_state_bundle.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/time.h> #include <vespa/config/subscription/configuri.h> @@ -59,6 +59,7 @@ TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg, auto distr = std::make_shared<lib::Distribution>( lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount)); _compReg.setDistribution(distr); + _nodeStateUpdater.patch_distribution(distr); } TestStorageApp::~TestStorageApp() = default; @@ -69,6 +70,7 @@ TestStorageApp::setDistribution(Redundancy redundancy, NodeCount nodeCount) auto distr = std::make_shared<lib::Distribution>( lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount)); _compReg.setDistribution(distr); + _nodeStateUpdater.patch_distribution(distr); } void @@ -83,6 +85,12 @@ TestStorageApp::setClusterState(const lib::ClusterState& c) _nodeStateUpdater.setClusterState(std::make_shared<lib::ClusterState>(c)); } +void +TestStorageApp::set_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle> state_bundle) +{ + _nodeStateUpdater.setClusterStateBundle(std::move(state_bundle)); +} + namespace { NodeIndex node_index_from_config(const config::ConfigUri& uri) { diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h index 04fa6996e15..c423761a9a2 100644 --- a/storage/src/tests/common/teststorageapp.h +++ b/storage/src/tests/common/teststorageapp.h @@ -73,6 +73,7 @@ public: void setDistribution(Redundancy, NodeCount); void setTypeRepo(std::shared_ptr<const document::DocumentTypeRepo> repo); void setClusterState(const lib::ClusterState&); + void set_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle>); // Utility functions for getting a hold of currently used bits. Practical // to avoid adding extra components in the tests. @@ -81,7 +82,7 @@ public: std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() { return _compReg.getTypeRepo(); } const document::BucketIdFactory& getBucketIdFactory() { return _compReg.getBucketIdFactory(); } TestNodeStateUpdater& getStateUpdater() { return _nodeStateUpdater; } - std::shared_ptr<lib::Distribution> & getDistribution() { return _compReg.getDistribution(); } + std::shared_ptr<const lib::Distribution> getDistribution() { return _compReg.getDistribution(); } TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; } uint16_t getIndex() const { return _compReg.getIndex(); } const NodeIdentity& node_identity() const noexcept { return _node_identity; } diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 8982b02f2b7..66fda885df7 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -53,6 +53,7 @@ struct ChangedBucketOwnershipHandlerTest : Test { void applyDistribution(Redundancy, NodeCount); void applyClusterState(const lib::ClusterState&); + void apply_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle>); document::BucketId nextOwnedBucket( uint16_t wantedOwner, @@ -84,6 +85,21 @@ struct ChangedBucketOwnershipHandlerTest : Test { return lib::ClusterState("distributor:4 storage:1 .0.s:d"); } + static std::shared_ptr<const lib::DistributionConfigBundle> make_distr_bundle(uint16_t node_count) { + return lib::DistributionConfigBundle::of(lib::Distribution::getDefaultDistributionConfig(1, node_count)); + } + + static std::shared_ptr<const lib::ClusterStateBundle> make_state_bundle_with_config( + vespalib::stringref state_str, uint16_t node_count) + { + return std::make_shared<const lib::ClusterStateBundle>( + std::make_shared<const lib::ClusterState>(state_str), + lib::ClusterStateBundle::BucketSpaceStateMapping{}, + std::nullopt, + make_distr_bundle(node_count), + false); + } + void SetUp() override; }; @@ -194,6 +210,7 @@ hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) { void ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount) { + // TODO set distribution via state bundle instead _app->setDistribution(redundancy, nodeCount); _handler->storageDistributionChanged(); } @@ -205,6 +222,13 @@ ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& st _handler->reloadClusterState(); } +void +ChangedBucketOwnershipHandlerTest::apply_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle> state_bundle) +{ + _app->set_cluster_state_bundle(std::move(state_bundle)); + _handler->reloadClusterState(); +} + TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed_nodes) { lib::ClusterState stateBefore("distributor:4 storage:1"); applyDistribution(Redundancy(1), NodeCount(4)); @@ -225,7 +249,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed EXPECT_TRUE(hasAbortedNoneOf(cmd, node2Buckets)); // Handler must swallow abort replies - _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release())); + _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply())); EXPECT_EQ(size_t(0), _top->getNumReplies()); } @@ -312,7 +336,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed EXPECT_TRUE(hasAbortedNoneOf(cmd, node2Buckets)); // Handler must swallow abort replies - _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release())); + _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply())); EXPECT_EQ(0, _top->getNumReplies()); } @@ -345,6 +369,32 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own sendAndExpectAbortedCreateBucket(2); } +TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_via_state_bundle_change_updates_ownership) { + apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:3 storage:1", 3)); + // Apply new distribution config containing only 1 distributor, meaning + // any messages sent from >1 must be aborted. + // This test case is a bit dodgy since the CC should never send a state with more nodes in it than + // the distribution config allows for when _it_ is responsible for also sending the config. + apply_cluster_state_bundle(make_state_bundle_with_config("version:3 distributor:3 storage:1", 1)); + sendAndExpectAbortedCreateBucket(2); +} + +TEST_F(ChangedBucketOwnershipHandlerTest, ignore_internal_config_once_state_bundle_with_config_received) { + apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:1 storage:3", 1)); + applyDistribution(Redundancy(1), NodeCount(3)); + // Bundle config says 1 node, internal config says 3. Trust the bundle(tm). + sendAndExpectAbortedCreateBucket(2); +} + +TEST_F(ChangedBucketOwnershipHandlerTest, revert_to_internal_config_if_distribution_no_longer_received_in_state_bundle) { + apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:1 storage:3", 3)); + + applyDistribution(Redundancy(1), NodeCount(1)); // not yet used + applyClusterState(lib::ClusterState("version:3 distributor:3 storage:3")); // no bundle config; revert to internal + + sendAndExpectAbortedCreateBucket(2); +} + /** * Generate and dispatch a message of the given type with the provided * arguments as if that message was sent from distributor 1. Messages will diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp index 79246cb3ce1..90f13850072 100644 --- a/storage/src/tests/storageserver/statemanagertest.cpp +++ b/storage/src/tests/storageserver/statemanagertest.cpp @@ -10,6 +10,7 @@ #include <vespa/vdslib/state/clusterstate.h> #include <vespa/storage/storageserver/statemanager.h> #include <vespa/vespalib/data/slime/slime.h> +#include <vespa/config-stor-distribution.h> #include <vespa/vespalib/gtest/gtest.h> using storage::lib::NodeState; @@ -38,6 +39,23 @@ struct StateManagerTest : Test, NodeStateReporter { return cmd; } + static std::shared_ptr<const lib::ClusterStateBundle> make_state_bundle_with_config( + vespalib::stringref state_str, uint16_t num_nodes) + { + auto state = std::make_shared<const ClusterState>(state_str); + auto distr = lib::DistributionConfigBundle::of(lib::Distribution::getDefaultDistributionConfig(1, num_nodes)); + return std::make_shared<lib::ClusterStateBundle>(std::move(state), + lib::ClusterStateBundle::BucketSpaceStateMapping{}, + std::nullopt, std::move(distr), false); + } + + + static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd_with_config( + vespalib::stringref state_str, uint16_t num_nodes) + { + return std::make_shared<api::SetSystemStateCommand>(make_state_bundle_with_config(state_str, num_nodes)); + } + void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out); void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out); void force_current_cluster_state_version(uint32_t version, uint16_t cc_index); @@ -56,6 +74,10 @@ struct StateManagerTest : Test, NodeStateReporter { void report(vespalib::JsonStream &) const override {} void extract_cluster_state_version_from_host_info(uint32_t& version_out); + + static vespalib::string to_string(const lib::Distribution::DistributionConfig& cfg) { + return lib::Distribution(cfg).serialized(); + } }; StateManagerTest::StateManagerTest() @@ -148,26 +170,107 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version version_out = clusterStateVersionCursor.asLong(); } -TEST_F(StateManagerTest, cluster_state) { - std::shared_ptr<api::StorageReply> reply; - // Verify initial state on startup - auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); +TEST_F(StateManagerTest, cluster_state_and_config_has_expected_values_at_bootstrap) { + auto initial_bundle = _manager->getClusterStateBundle(); + auto currentState = initial_bundle->getBaselineClusterState(); EXPECT_EQ("cluster:d", currentState->toString(false)); EXPECT_EQ(currentState->getVersion(), 0); + // Distribution config should be equal to the config the node is running with. + ASSERT_TRUE(initial_bundle->has_distribution_config()); + EXPECT_EQ(to_string(initial_bundle->distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); + auto currentNodeState = _manager->getCurrentNodeState(); EXPECT_EQ("s:d", currentNodeState->toString(false)); +} + +TEST_F(StateManagerTest, can_receive_state_bundle_without_distribution_config) { + ClusterState send_state("version:2 distributor:1 storage:4 .2.s:m"); + auto cmd = std::make_shared<api::SetSystemStateCommand>(send_state); + _upper->sendDown(cmd); + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(send_state, *current_bundle->getBaselineClusterState()); + // Distribution config should be unchanged from boostrap. + ASSERT_TRUE(current_bundle->has_distribution_config()); + EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); + + auto current_node_state = _manager->getCurrentNodeState(); + EXPECT_EQ("s:m", current_node_state->toString(false)); +} + +TEST_F(StateManagerTest, can_receive_state_bundle_with_distribution_config) { + auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5); + EXPECT_NE(to_string(cmd->getClusterStateBundle().distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); + _upper->sendDown(cmd); + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, cmd->getClusterStateBundle()); // also compares distribution configs +} - ClusterState sendState("storage:4 .2.s:m"); - auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState); +TEST_F(StateManagerTest, receiving_cc_bundle_with_distribution_config_disables_node_distribution_config_propagation) { + auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5); _upper->sendDown(cmd); + std::shared_ptr<api::StorageReply> reply; ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + // Explicitly setting distribution config should not propagate to the active state bundle + // since we've flipped to expecting config from the cluster controllers instead. + auto distr = std::make_shared<lib::Distribution>(lib::Distribution::getDefaultDistributionConfig(2, 7)); + _node->getComponentRegister().setDistribution(distr); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, cmd->getClusterStateBundle()); // unchanged +} + +TEST_F(StateManagerTest, internal_distribution_config_is_propagated_if_none_yet_received_from_cc) { + _upper->sendDown(make_set_state_cmd("version:10 distributor:1 storage:4", 0)); + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto expected_bundle = make_state_bundle_with_config("version:10 distributor:1 storage:4", 7); + // Explicitly set internal config + _node->getComponentRegister().setDistribution(expected_bundle->distribution_config_bundle()->default_distribution_sp()); + _manager->storageDistributionChanged(); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, *expected_bundle); +} + +TEST_F(StateManagerTest, revert_to_internal_config_if_cc_no_longer_sends_distribution_config) { + // Initial state bundle _with_ distribution config + auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5); + _upper->sendDown(cmd); + std::shared_ptr<api::StorageReply> reply; + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + auto current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()), + to_string(cmd->getClusterStateBundle().distribution_config_bundle()->config())); + + // CC then sends a new bundle _without_ config + _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:4", 0)); + ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply)); + + // Config implicitly reverted to the active internal config + current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()), + _node->getComponentRegister().getDistribution()->serialized()); - currentState = _manager->getClusterStateBundle()->getBaselineClusterState(); - EXPECT_EQ(sendState, *currentState); + // Explicitly set internal config + auto expected_bundle = make_state_bundle_with_config("version:3 distributor:1 storage:4", 7); + _node->getComponentRegister().setDistribution(expected_bundle->distribution_config_bundle()->default_distribution_sp()); + _manager->storageDistributionChanged(); - currentNodeState = _manager->getCurrentNodeState(); - EXPECT_EQ("s:m", currentNodeState->toString(false)); + // Internal config shall have taken effect, overriding that of the initial bundle + current_bundle = _manager->getClusterStateBundle(); + EXPECT_EQ(*current_bundle, *expected_bundle); } TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) { diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 1f20a19ec51..280afe8fb91 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -526,12 +526,12 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac using RBISP = std::shared_ptr<api::RequestBucketInfoCommand>; std::map<uint16_t, RBISP> requests; - // TODO fetch distribution from bundle as well - auto distribution(_component.getBucketSpaceRepo().get(bucketSpace).getDistribution()); - auto clusterStateBundle(_component.getStateUpdater().getClusterStateBundle()); - assert(clusterStateBundle); - lib::ClusterState::CSP clusterState(clusterStateBundle->getDerivedClusterState(bucketSpace)); - assert(clusterState.get()); + auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); + assert(clusterStateBundle && clusterStateBundle->has_distribution_config()); + auto clusterState = clusterStateBundle->getDerivedClusterState(bucketSpace); + assert(clusterState); + const auto distribution = clusterStateBundle->bucket_space_distribution_or_nullptr(bucketSpace); + assert(distribution); const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash(); diff --git a/storage/src/vespa/storage/common/content_bucket_space.cpp b/storage/src/vespa/storage/common/content_bucket_space.cpp index 92b5257b991..d66219794c7 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.cpp +++ b/storage/src/vespa/storage/common/content_bucket_space.cpp @@ -10,6 +10,7 @@ ClusterStateAndDistribution::ClusterStateAndDistribution( : _cluster_state(std::move(cluster_state)), _distribution(std::move(distribution)) { + assert(_cluster_state && _distribution); } ClusterStateAndDistribution::~ClusterStateAndDistribution() = default; @@ -48,34 +49,6 @@ ContentBucketSpace::state_and_distribution() const noexcept { return _state_and_distribution; } -void -ContentBucketSpace::setClusterState(std::shared_ptr<const lib::ClusterState> clusterState) -{ - std::lock_guard guard(_lock); - _state_and_distribution = _state_and_distribution->with_new_state(std::move(clusterState)); -} - -std::shared_ptr<const lib::ClusterState> -ContentBucketSpace::getClusterState() const -{ - std::lock_guard guard(_lock); - return _state_and_distribution->_cluster_state; -} - -void -ContentBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution) -{ - std::lock_guard guard(_lock); - _state_and_distribution = _state_and_distribution->with_new_distribution(std::move(distribution)); -} - -std::shared_ptr<const lib::Distribution> -ContentBucketSpace::getDistribution() const -{ - std::lock_guard guard(_lock); - return _state_and_distribution->_distribution; -} - bool ContentBucketSpace::getNodeUpInLastNodeStateSeenByProvider() const { diff --git a/storage/src/vespa/storage/common/content_bucket_space.h b/storage/src/vespa/storage/common/content_bucket_space.h index eb48640c97b..67ee4209d35 100644 --- a/storage/src/vespa/storage/common/content_bucket_space.h +++ b/storage/src/vespa/storage/common/content_bucket_space.h @@ -54,14 +54,6 @@ public: void set_state_and_distribution(std::shared_ptr<const ClusterStateAndDistribution> state_and_distr) noexcept; [[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> state_and_distribution() const noexcept; - // TODO deprecate; only use atomic state+distribution setter - void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState); - // TODO deprecate; only use atomic state+distribution getter - std::shared_ptr<const lib::ClusterState> getClusterState() const; - // TODO deprecate; only use atomic state+distribution setter - void setDistribution(std::shared_ptr<const lib::Distribution> distribution); - // TODO deprecate; only use atomic state+distribution getter - std::shared_ptr<const lib::Distribution> getDistribution() const; bool getNodeUpInLastNodeStateSeenByProvider() const; void setNodeUpInLastNodeStateSeenByProvider(bool nodeUpInLastNodeStateSeenByProvider); diff --git a/storage/src/vespa/storage/common/storagecomponent.h b/storage/src/vespa/storage/common/storagecomponent.h index 677d5652fe3..ba12fc666a9 100644 --- a/storage/src/vespa/storage/common/storagecomponent.h +++ b/storage/src/vespa/storage/common/storagecomponent.h @@ -61,7 +61,7 @@ public: const std::shared_ptr<const document::FieldSetRepo> fieldSetRepo; }; using UP = std::unique_ptr<StorageComponent>; - using DistributionSP = std::shared_ptr<lib::Distribution>; + using DistributionSP = std::shared_ptr<const lib::Distribution>; /** * Node type is supposed to be set immediately, and never be updated. diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp index ab1cbf0b4d7..4d1daa329fd 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp @@ -23,11 +23,9 @@ ServiceLayerComponentRegisterImpl::registerServiceLayerComponent(ServiceLayerMan } void -ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr<lib::Distribution> distribution) +ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr<const lib::Distribution> distribution) { - _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).setDistribution(distribution); - auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution); - _bucketSpaceRepo.get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr); + // TODO remove this override entirely? StorageComponentRegisterImpl::setDistribution(distribution); } diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h index 1589192b92e..3478c0b3b9c 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h +++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h @@ -34,7 +34,7 @@ public: } void registerServiceLayerComponent(ServiceLayerManagedComponent&) override; - void setDistribution(std::shared_ptr<lib::Distribution> distribution) override; + void setDistribution(std::shared_ptr<const lib::Distribution> distribution) override; }; } // storage diff --git a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp index bd0853a3524..ab1d80dc0b9 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp +++ b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp @@ -91,7 +91,7 @@ StorageComponentRegisterImpl::setBucketIdFactory(const document::BucketIdFactory } void -StorageComponentRegisterImpl::setDistribution(std::shared_ptr<lib::Distribution> distribution) +StorageComponentRegisterImpl::setDistribution(std::shared_ptr<const lib::Distribution> distribution) { std::lock_guard lock(_componentLock); _distribution = distribution; diff --git a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h index abb60051fe1..271b38399b6 100644 --- a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h +++ b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h @@ -21,16 +21,16 @@ class StorageComponentRegisterImpl { using BucketspacesConfig = vespa::config::content::core::internal::InternalBucketspacesType; - std::mutex _componentLock; - std::vector<StorageComponent*> _components; - vespalib::string _clusterName; - const lib::NodeType* _nodeType; - uint16_t _index; + std::mutex _componentLock; + std::vector<StorageComponent*> _components; + vespalib::string _clusterName; + const lib::NodeType* _nodeType; + uint16_t _index; std::shared_ptr<const document::DocumentTypeRepo> _docTypeRepo; - document::BucketIdFactory _bucketIdFactory; - std::shared_ptr<lib::Distribution> _distribution; - NodeStateUpdater* _nodeStateUpdater; - BucketspacesConfig _bucketSpacesConfig; + document::BucketIdFactory _bucketIdFactory; + std::shared_ptr<const lib::Distribution> _distribution; + NodeStateUpdater* _nodeStateUpdater; + BucketspacesConfig _bucketSpacesConfig; public: using UP = std::unique_ptr<StorageComponentRegisterImpl>; @@ -38,12 +38,12 @@ public: StorageComponentRegisterImpl(); ~StorageComponentRegisterImpl() override; - const lib::NodeType& getNodeType() const { return *_nodeType; } - uint16_t getIndex() const { return _index; } - std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() { return _docTypeRepo; } - const document::BucketIdFactory& getBucketIdFactory() { return _bucketIdFactory; } - std::shared_ptr<lib::Distribution> & getDistribution() { return _distribution; } - NodeStateUpdater& getNodeStateUpdater() { return *_nodeStateUpdater; } + [[nodiscard]] const lib::NodeType& getNodeType() const noexcept { return *_nodeType; } + [[nodiscard]] uint16_t getIndex() const noexcept { return _index; } + [[nodiscard]] std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() const noexcept { return _docTypeRepo; } + [[nodiscard]] const document::BucketIdFactory& getBucketIdFactory() const noexcept { return _bucketIdFactory; } + [[nodiscard]] const std::shared_ptr<const lib::Distribution>& getDistribution() const noexcept { return _distribution; } + [[nodiscard]] NodeStateUpdater& getNodeStateUpdater() noexcept { return *_nodeStateUpdater; } void registerStorageComponent(StorageComponent&) override; @@ -51,7 +51,7 @@ public: virtual void setNodeStateUpdater(NodeStateUpdater& updater); virtual void setDocumentTypeRepo(std::shared_ptr<const document::DocumentTypeRepo>); virtual void setBucketIdFactory(const document::BucketIdFactory&); - virtual void setDistribution(std::shared_ptr<lib::Distribution>); + virtual void setDistribution(std::shared_ptr<const lib::Distribution>); virtual void setBucketSpacesConfig(const BucketspacesConfig&); }; diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp index d5de11c7d6f..f571f6272b8 100644 --- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp +++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp @@ -16,13 +16,14 @@ using document::BucketSpace; namespace storage { uint16_t -BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const +BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket& bucket) const { try { - // TODO use state updater bundle for everything? - auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution()); - const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - const auto &clusterState = *clusterStateBundle->getDerivedClusterState(bucket.getBucketSpace()); + const auto state_bundle = _component.getStateUpdater().getClusterStateBundle(); + assert(state_bundle && state_bundle->has_distribution_config()); + const auto* distribution = state_bundle->distribution_config_bundle()->bucket_space_distribution_or_nullptr_raw(bucket.getBucketSpace()); + assert(distribution); + const auto& clusterState = *state_bundle->getDerivedClusterState(bucket.getBucketSpace()); return (distribution->getIdealDistributorNode(clusterState, bucket.getBucketId())); // If we get exceptions there aren't any distributors, so they'll have // to explicitly fetch all bucket info eventually anyway. @@ -39,7 +40,7 @@ BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bu } bool -BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket &bucket) const +BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket& bucket) const { return (distributor == getOwnerDistributorForBucket(bucket)); } @@ -47,7 +48,7 @@ BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::B void BucketOwnershipNotifier::sendNotifyBucketToDistributor( uint16_t distributorIndex, - const document::Bucket &bucket, + const document::Bucket& bucket, const api::BucketInfo& infoToSend) { if (!infoToSend.valid()) { @@ -71,7 +72,7 @@ BucketOwnershipNotifier::sendNotifyBucketToDistributor( } void -BucketOwnershipNotifier::logNotification(const document::Bucket &bucket, +BucketOwnershipNotifier::logNotification(const document::Bucket& bucket, uint16_t sourceIndex, uint16_t currentOwnerIndex, const api::BucketInfo& newInfo) @@ -88,7 +89,7 @@ BucketOwnershipNotifier::logNotification(const document::Bucket &bucket, void BucketOwnershipNotifier::notifyIfOwnershipChanged( - const document::Bucket &bucket, + const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend) { @@ -111,7 +112,7 @@ BucketOwnershipNotifier::notifyIfOwnershipChanged( void BucketOwnershipNotifier::sendNotifyBucketToCurrentOwner( - const document::Bucket &bucket, + const document::Bucket& bucket, const api::BucketInfo& infoToSend) { uint16_t distributor(getOwnerDistributorForBucket(bucket)); @@ -134,7 +135,7 @@ NotificationGuard::~NotificationGuard() } void -NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket, +NotificationGuard::notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend) { @@ -142,7 +143,7 @@ NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket, } void -NotificationGuard::notifyAlways(const document::Bucket &bucket, +NotificationGuard::notifyAlways(const document::Bucket& bucket, const api::BucketInfo& infoToSend) { BucketToCheck bc(bucket, 0xffff, infoToSend); diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h index fc3d9209f5f..93a2c1a2104 100644 --- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h +++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h @@ -11,29 +11,29 @@ namespace storage { class BucketOwnershipNotifier { - const ServiceLayerComponent & _component; - MessageSender & _sender; + const ServiceLayerComponent& _component; + MessageSender& _sender; public: BucketOwnershipNotifier(const ServiceLayerComponent& component, MessageSender& sender) : _component(component), _sender(sender) {} - bool distributorOwns(uint16_t distributor, const document::Bucket &bucket) const; - void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); - void sendNotifyBucketToCurrentOwner(const document::Bucket &bucket, const api::BucketInfo& infoToSend); + [[nodiscard]] bool distributorOwns(uint16_t distributor, const document::Bucket& bucket) const; + void notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); + void sendNotifyBucketToCurrentOwner(const document::Bucket& bucket, const api::BucketInfo& infoToSend); private: enum IndexMeta { FAILED_TO_RESOLVE = 0xffff }; - void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket &bucket, + void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket& bucket, const api::BucketInfo& infoToSend); // Returns either index or FAILED_TO_RESOLVE uint16_t getOwnerDistributorForBucket(const document::Bucket &bucket) const; - void logNotification(const document::Bucket &bucket, uint16_t sourceIndex, + void logNotification(const document::Bucket& bucket, uint16_t sourceIndex, uint16_t currentOwnerIndex, const api::BucketInfo& newInfo); }; @@ -60,7 +60,7 @@ class NotificationGuard BucketOwnershipNotifier& _notifier; std::vector<BucketToCheck> _bucketsToCheck; public: - NotificationGuard(BucketOwnershipNotifier& notifier) + explicit NotificationGuard(BucketOwnershipNotifier& notifier) : _notifier(notifier), _bucketsToCheck() {} @@ -69,8 +69,8 @@ public: ~NotificationGuard(); - void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); - void notifyAlways(const document::Bucket &bucket, const api::BucketInfo& infoToSend); + void notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend); + void notifyAlways(const document::Bucket& bucket, const api::BucketInfo& infoToSend); }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 495497d507d..e44490388e2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -887,9 +887,9 @@ bool FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept { for (const auto& elem : _component.getBucketSpaceRepo()) { - const ContentBucketSpace& bucket_space = *elem.second; - auto derived_cluster_state = bucket_space.getClusterState(); - if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) { + const auto space_state_and_distr = elem.second->state_and_distribution(); + const auto& derived_cluster_state = space_state_and_distr->cluster_state(); + if (!derived_cluster_state.getNodeState(node).getState().oneOf("m")) { return false; } } @@ -915,8 +915,7 @@ FileStorManager::maybe_log_received_cluster_state() { if (LOG_WOULD_LOG(debug)) { auto cluster_state_bundle = _component.getStateUpdater().getClusterStateBundle(); - auto baseline_state = cluster_state_bundle->getBaselineClusterState(); - LOG(debug, "FileStorManager received baseline cluster state '%s'", baseline_state->toString().c_str()); + LOG(debug, "FileStorManager received baseline cluster state '%s'", cluster_state_bundle->toString().c_str()); } } @@ -951,16 +950,17 @@ FileStorManager::updateState() void FileStorManager::storageDistributionChanged() { - updateState(); } void FileStorManager::propagateClusterStates() { auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - for (const auto &elem : _component.getBucketSpaceRepo()) { - // TODO also distribution! bundle and repo must be 1-1 - elem.second->setClusterState(clusterStateBundle->getDerivedClusterState(elem.first)); + assert(clusterStateBundle->has_distribution_config()); + for (const auto& elem : _component.getBucketSpaceRepo()) { + elem.second->set_state_and_distribution(std::make_shared<ClusterStateAndDistribution>( + clusterStateBundle->getDerivedClusterState(elem.first), + clusterStateBundle->bucket_space_distribution_or_nullptr(elem.first))); } } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index 25829f3d391..8702cb1272b 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -31,10 +31,11 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( _stateLock(), _currentState(), // Not set yet, so ownership will not be valid _currentOwnership(std::make_shared<OwnershipState>( - _component.getBucketSpaceRepo(), _currentState)), + _currentState, lib::DistributionConfigBundle::of(_component.getDistribution()))), _abortQueuedAndPendingOnStateChange(false), _abortMutatingIdealStateOps(false), - _abortMutatingExternalLoadOps(false) + _abortMutatingExternalLoadOps(false), + _receiving_distribution_config_from_cc(false) { on_configure(bootstrap_config); _component.registerMetric(_metrics); @@ -61,16 +62,29 @@ ChangedBucketOwnershipHandler::reloadClusterState() { std::lock_guard guard(_stateLock); const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle(); - setCurrentOwnershipWithStateNoLock(*clusterStateBundle); + setCurrentOwnershipWithStateNoLock(std::move(clusterStateBundle)); } void -ChangedBucketOwnershipHandler::setCurrentOwnershipWithStateNoLock( - const lib::ClusterStateBundle& newState) +ChangedBucketOwnershipHandler::setCurrentOwnershipWithStateNoLock(std::shared_ptr<const lib::ClusterStateBundle> new_state) { - _currentState = std::make_shared<const lib::ClusterStateBundle>(newState); - _currentOwnership = std::make_shared<const OwnershipState>( - _component.getBucketSpaceRepo(), _currentState); + LOG(debug, "Setting new ownership state bundle: %s", new_state->toString().c_str()); + std::shared_ptr<const lib::DistributionConfigBundle> distributions; + _currentState = std::move(new_state); + // This partially duplicates distribution config fallback logic from StateManager, but that's because + // this component has the same approach to intercepting both state commands and distribution config + // changes. + // `new_state` can come straight from a SetSystemStateCommand, which may or may not have a state bundle + // with distribution config strapped to it. + if (_currentState->has_distribution_config()) { + distributions = _currentState->distribution_config_bundle(); + } else { + distributions = lib::DistributionConfigBundle::of(_component.getDistribution()); + LOG(debug, "No distribution config in bundle; using current host config of '%s'", + distributions->default_distribution().getNodeGraph().getDistributionConfigHash().c_str()); + } + _receiving_distribution_config_from_cc = _currentState->has_distribution_config(); + _currentOwnership = std::make_shared<const OwnershipState>(_currentState, std::move(distributions)); } namespace { @@ -98,17 +112,11 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner) {} ChangedBucketOwnershipHandler::Metrics::~Metrics() = default; -ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo, - std::shared_ptr<const lib::ClusterStateBundle> state) - : _distributions(), - _state(std::move(state)) +ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(std::shared_ptr<const lib::ClusterStateBundle> state, + std::shared_ptr<const lib::DistributionConfigBundle> distributions) + : _state(std::move(state)), + _distributions(std::move(distributions)) { - for (const auto &elem : contentBucketSpaceRepo) { - auto distribution = elem.second->getDistribution(); - if (distribution) { - _distributions.emplace(elem.first, std::move(distribution)); - } - } } @@ -123,19 +131,15 @@ ChangedBucketOwnershipHandler::OwnershipState::getBaselineState() const } uint16_t -ChangedBucketOwnershipHandler::OwnershipState::ownerOf( - const document::Bucket& bucket) const +ChangedBucketOwnershipHandler::OwnershipState::ownerOf(const document::Bucket& bucket) const { - auto distributionItr = _distributions.find(bucket.getBucketSpace()); - assert(distributionItr != _distributions.end()); - const auto &distribution = *distributionItr->second; - const auto &derivedState = *_state->getDerivedClusterState(bucket.getBucketSpace()); + const auto* distribution = _distributions->bucket_space_distribution_or_nullptr_raw(bucket.getBucketSpace()); + assert(distribution); + const auto& derivedState = *_state->getDerivedClusterState(bucket.getBucketSpace()); try { - return distribution.getIdealDistributorNode(derivedState, bucket.getBucketId()); + return distribution->getIdealDistributorNode(derivedState, bucket.getBucketId()); } catch (lib::TooFewBucketBitsInUseException& e) { - LOGBP(debug, - "Too few bucket bits used for %s to be assigned to " - "a distributor.", + LOGBP(debug, "Too few bucket bits used for %s to be assigned to a distributor.", bucket.toString().c_str()); } catch (lib::NoDistributorsAvailableException& e) { LOGBP(warning, @@ -144,11 +148,9 @@ ChangedBucketOwnershipHandler::OwnershipState::ownerOf( "for available distributors before reaching this code path! " "Cluster state is '%s', distribution is '%s'", derivedState.toString().c_str(), - distribution.toString().c_str()); + distribution->toString().c_str()); } catch (const std::exception& e) { - LOG(error, - "Got unknown exception while resolving distributor: %s", - e.what()); + LOG(error, "Got unknown exception while resolving distributor: %s", e.what()); } return FAILED_TO_RESOLVE; } @@ -162,9 +164,7 @@ ChangedBucketOwnershipHandler::OwnershipState::storageNodeUp(document::BucketSpa } void -ChangedBucketOwnershipHandler::logTransition( - const lib::ClusterState& currentState, - const lib::ClusterState& newState) const +ChangedBucketOwnershipHandler::logTransition(const lib::ClusterState& currentState, const lib::ClusterState& newState) { LOG(debug, "State transition '%s' -> '%s' changes distributor bucket ownership, " @@ -269,7 +269,7 @@ public: { std::lock_guard guard(_owner._stateLock); old_ownership = _owner._currentOwnership; - _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle()); + _owner.setCurrentOwnershipWithStateNoLock(_command->cluster_state_bundle_ptr()); new_ownership = _owner._currentOwnership; } assert(new_ownership->valid()); @@ -287,7 +287,7 @@ public: new_ownership->getBaselineState().toString().c_str()); return _owner.sendDown(_command);; } - _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState()); + logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState()); metrics::MetricTimer duration_timer; auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership); @@ -327,17 +327,19 @@ ChangedBucketOwnershipHandler::onSetSystemState( * Invoked whenever a distribution config change happens and is called in the * context of the config updater thread (which is why we have to lock). */ + // TODO remove this when there are no more state bundles without distribution config void ChangedBucketOwnershipHandler::storageDistributionChanged() { std::lock_guard guard(_stateLock); - _currentOwnership = std::make_shared<OwnershipState>( - _component.getBucketSpaceRepo(), _currentState); + if (!_receiving_distribution_config_from_cc) { + _currentOwnership = std::make_shared<OwnershipState>( + _currentState, lib::DistributionConfigBundle::of(_component.getDistribution())); + } } bool -ChangedBucketOwnershipHandler::isMutatingIdealStateOperation( - const api::StorageMessage& msg) const +ChangedBucketOwnershipHandler::isMutatingIdealStateOperation(const api::StorageMessage& msg) { switch (msg.getType().getId()) { case api::MessageType::CREATEBUCKET_ID: @@ -357,8 +359,7 @@ ChangedBucketOwnershipHandler::isMutatingIdealStateOperation( bool -ChangedBucketOwnershipHandler::isMutatingExternalOperation( - const api::StorageMessage& msg) const +ChangedBucketOwnershipHandler::isMutatingExternalOperation(const api::StorageMessage& msg) { switch (msg.getType().getId()) { case api::MessageType::PUT_ID: @@ -418,8 +419,7 @@ ChangedBucketOwnershipHandler::abortOperation(api::StorageCommand& cmd) } bool -ChangedBucketOwnershipHandler::isMutatingCommandAndNeedsChecking( - const api::StorageMessage& msg) const +ChangedBucketOwnershipHandler::isMutatingCommandAndNeedsChecking(const api::StorageMessage& msg) const { if (enabledIdealStateAborting() && isMutatingIdealStateOperation(msg)) { return true; diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h index 801534385f7..2c32704d64b 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h @@ -25,6 +25,7 @@ namespace lib { class ClusterState; class ClusterStateBundle; class Distribution; + class DistributionConfigBundle; } /** @@ -77,20 +78,20 @@ public: */ class OwnershipState { using BucketSpace = document::BucketSpace; - std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions; std::shared_ptr<const lib::ClusterStateBundle> _state; + std::shared_ptr<const lib::DistributionConfigBundle> _distributions; public: using SP = std::shared_ptr<OwnershipState>; using CSP = std::shared_ptr<const OwnershipState>; - OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo, - std::shared_ptr<const lib::ClusterStateBundle> state); + OwnershipState(std::shared_ptr<const lib::ClusterStateBundle> state, + std::shared_ptr<const lib::DistributionConfigBundle> distributions); ~OwnershipState(); static const uint16_t FAILED_TO_RESOLVE = 0xffff; [[nodiscard]] bool valid() const noexcept { - return (!_distributions.empty() && _state); + return (_distributions && _state); } /** @@ -124,20 +125,21 @@ private: std::atomic<bool> _abortQueuedAndPendingOnStateChange; std::atomic<bool> _abortMutatingIdealStateOps; std::atomic<bool> _abortMutatingExternalLoadOps; + bool _receiving_distribution_config_from_cc; std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate> makeLazyAbortPredicate( const OwnershipState::CSP& oldOwnership, const OwnershipState::CSP& newOwnership) const; - void logTransition(const lib::ClusterState& currentState, - const lib::ClusterState& newState) const; + static void logTransition(const lib::ClusterState& currentState, + const lib::ClusterState& newState); /** * Creates a new immutable OwnershipState based on the current distribution * and the provided cluster state and assigns it to _currentOwnership. */ - void setCurrentOwnershipWithStateNoLock(const lib::ClusterStateBundle&); + void setCurrentOwnershipWithStateNoLock(std::shared_ptr<const lib::ClusterStateBundle>); /** * Grabs _stateLock and returns a shared_ptr to the current ownership @@ -147,9 +149,9 @@ private: bool isMutatingCommandAndNeedsChecking(const api::StorageMessage&) const; - bool isMutatingIdealStateOperation(const api::StorageMessage&) const; + static bool isMutatingIdealStateOperation(const api::StorageMessage&); - bool isMutatingExternalOperation(const api::StorageMessage&) const; + static bool isMutatingExternalOperation(const api::StorageMessage&); /** * Returns whether the operation in cmd has a bucket whose ownership in * the current cluster state does not match the distributor marked as diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp index a2106dce8d2..560982fb6c0 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.cpp +++ b/storage/src/vespa/storage/storageserver/statemanager.cpp @@ -29,7 +29,18 @@ using vespalib::make_string_short::fmt; namespace storage { namespace { - constexpr vespalib::duration MAX_TIMEOUT = 600s; +constexpr vespalib::duration MAX_TIMEOUT = 600s; + +[[nodiscard]] std::shared_ptr<const lib::ClusterStateBundle> +make_bootstrap_state_bundle(std::shared_ptr<const lib::Distribution> config) { + return std::make_shared<const lib::ClusterStateBundle>( + std::make_shared<lib::ClusterState>(), + lib::ClusterStateBundle::BucketSpaceStateMapping{}, + std::nullopt, + lib::DistributionConfigBundle::of(std::move(config)), + false); +} + } struct StateManager::StateManagerMetrics : metrics::MetricSet { @@ -62,7 +73,8 @@ StateManager::StateManager(StorageComponentRegister& compReg, _listenerLock(), _nodeState(std::make_shared<lib::NodeState>(_component.getNodeType(), lib::State::DOWN)), _nextNodeState(), - _systemState(std::make_shared<const ClusterStateBundle>(lib::ClusterState())), + _configured_distribution(_component.getDistribution()), + _systemState(make_bootstrap_state_bundle(_configured_distribution)), _nextSystemState(), _reported_host_info_cluster_state_version(0), _stateListeners(), @@ -81,6 +93,7 @@ StateManager::StateManager(StorageComponentRegister& compReg, _noThreadTestMode(testMode), _grabbedExternalLock(false), _require_strictly_increasing_cluster_state_versions(false), + _receiving_distribution_config_from_cc(false), _notifyingListeners(false), _requested_almost_immediate_node_state_replies(false) { @@ -170,8 +183,7 @@ lib::NodeState::CSP StateManager::getCurrentNodeState() const { std::lock_guard lock(_stateLock); - return std::make_shared<const lib::NodeState> - (_systemState->getBaselineClusterState()->getNodeState(thisNode())); + return std::make_shared<const lib::NodeState>(_systemState->getBaselineClusterState()->getNodeState(thisNode())); } std::shared_ptr<const lib::ClusterStateBundle> @@ -181,6 +193,28 @@ StateManager::getClusterStateBundle() const return _systemState; } +// TODO remove when distribution config is only received from cluster controller +void +StateManager::storageDistributionChanged() +{ + { + std::lock_guard lock(_stateLock); + _configured_distribution = _component.getDistribution(); + if (_receiving_distribution_config_from_cc) { + return; // nothing more to do + } + // Avoid losing any pending state if this callback happens in the middle of a + // state update. This edge case is practically impossible to unit test today... + const auto patch_state = _nextSystemState ? _nextSystemState : _systemState; + _nextSystemState = patch_state->clone_with_new_distribution( + lib::DistributionConfigBundle::of(_configured_distribution)); + } + // We've assembled a new state bundle based on the (non-distribution carrying) state + // bundle from the cluster controller and our own internal config. Propagate it as one + // unit to the internal components. + notifyStateListeners(); +} + void StateManager::addStateListener(StateListener& listener) { @@ -316,12 +350,12 @@ using BucketSpaceToTransitionString = std::unordered_map<document::BucketSpace, document::BucketSpace::hash>; void -considerInsertDerivedTransition(const lib::State ¤tBaseline, - const lib::State &newBaseline, - const lib::State ¤tDerived, - const lib::State &newDerived, - const document::BucketSpace &bucketSpace, - BucketSpaceToTransitionString &transitions) +considerInsertDerivedTransition(const lib::State& currentBaseline, + const lib::State& newBaseline, + const lib::State& currentDerived, + const lib::State& newDerived, + const document::BucketSpace& bucketSpace, + BucketSpaceToTransitionString& transitions) { bool considerDerivedTransition = ((currentDerived != newDerived) && ((currentDerived != currentBaseline) || (newDerived != newBaseline))); @@ -333,28 +367,28 @@ considerInsertDerivedTransition(const lib::State ¤tBaseline, } BucketSpaceToTransitionString -calculateDerivedClusterStateTransitions(const ClusterStateBundle ¤tState, - const ClusterStateBundle &newState, +calculateDerivedClusterStateTransitions(const ClusterStateBundle& currentState, + const ClusterStateBundle& newState, const lib::Node node) { BucketSpaceToTransitionString result; - const lib::State ¤tBaseline = currentState.getBaselineClusterState()->getNodeState(node).getState(); - const lib::State &newBaseline = newState.getBaselineClusterState()->getNodeState(node).getState(); - for (const auto &entry : currentState.getDerivedClusterStates()) { - const lib::State ¤tDerived = entry.second->getNodeState(node).getState(); - const lib::State &newDerived = newState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); + const lib::State& currentBaseline = currentState.getBaselineClusterState()->getNodeState(node).getState(); + const lib::State& newBaseline = newState.getBaselineClusterState()->getNodeState(node).getState(); + for (const auto& entry : currentState.getDerivedClusterStates()) { + const lib::State& currentDerived = entry.second->getNodeState(node).getState(); + const lib::State& newDerived = newState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); considerInsertDerivedTransition(currentBaseline, newBaseline, currentDerived, newDerived, entry.first, result); } - for (const auto &entry : newState.getDerivedClusterStates()) { - const lib::State &newDerived = entry.second->getNodeState(node).getState(); - const lib::State ¤tDerived = currentState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); + for (const auto& entry : newState.getDerivedClusterStates()) { + const lib::State& newDerived = entry.second->getNodeState(node).getState(); + const lib::State& currentDerived = currentState.getDerivedClusterState(entry.first)->getNodeState(node).getState(); considerInsertDerivedTransition(currentBaseline, newBaseline, currentDerived, newDerived, entry.first, result); } return result; } vespalib::string -transitionsToString(const BucketSpaceToTransitionString &transitions) +transitionsToString(const BucketSpaceToTransitionString& transitions) { if (transitions.empty()) { return ""; @@ -362,7 +396,7 @@ transitionsToString(const BucketSpaceToTransitionString &transitions) vespalib::asciistream stream; stream << "["; bool first = true; - for (const auto &entry : transitions) { + for (const auto& entry : transitions) { if (!first) { stream << ", "; } @@ -435,7 +469,7 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd) } void -StateManager::mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex> &, uint16_t controller_index) { +StateManager::mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex>&, uint16_t controller_index) { _controllers_observed_explicit_node_state.emplace(controller_index); } @@ -483,7 +517,16 @@ StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBun } } _last_accepted_cluster_state_time = now; - _nextSystemState = std::move(c); + _receiving_distribution_config_from_cc = c->has_distribution_config(); + if (!c->has_distribution_config()) { + LOG(debug, "Next state bundle '%s' does not have distribution config; patching in existing config '%s'", + c->toString().c_str(), _configured_distribution->getNodeGraph().getDistributionConfigHash().c_str()); + _nextSystemState = c->clone_with_new_distribution(lib::DistributionConfigBundle::of(_configured_distribution)); + } else { + LOG(debug, "Next state bundle is '%s'", c->toString().c_str()); + // TODO print what's changed in distribution config? + _nextSystemState = std::move(c); + } } notifyStateListeners(); return std::nullopt; diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h index d116f968731..d7a139f7ced 100644 --- a/storage/src/vespa/storage/storageserver/statemanager.h +++ b/storage/src/vespa/storage/storageserver/statemanager.h @@ -55,6 +55,7 @@ class StateManager : public NodeStateUpdater, std::mutex _listenerLock; std::shared_ptr<lib::NodeState> _nodeState; std::shared_ptr<lib::NodeState> _nextNodeState; + std::shared_ptr<const lib::Distribution> _configured_distribution; // From config system, not from CC std::shared_ptr<const ClusterStateBundle> _systemState; std::shared_ptr<const ClusterStateBundle> _nextSystemState; uint32_t _reported_host_info_cluster_state_version; @@ -78,6 +79,7 @@ class StateManager : public NodeStateUpdater, bool _noThreadTestMode; bool _grabbedExternalLock; bool _require_strictly_increasing_cluster_state_versions; + bool _receiving_distribution_config_from_cc; std::atomic<bool> _notifyingListeners; std::atomic<bool> _requested_almost_immediate_node_state_replies; @@ -102,6 +104,8 @@ public: lib::NodeState::CSP getCurrentNodeState() const override; std::shared_ptr<const ClusterStateBundle> getClusterStateBundle() const override; + void storageDistributionChanged() override; + void addStateListener(StateListener&) override; void removeStateListener(StateListener&) override; |