aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-06-20 09:16:44 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-06-24 14:01:26 +0000
commite7fdc8f5010e18cd5599a016156a5c202df0f8db (patch)
treeccc2915d5e6135578b73f2f421b3b2b0766df37b /storage
parentefd20bf3e2556975d177ceaaf9a1908cb7239fe1 (diff)
Handle cluster state bundle with distribution config on content node
If a cluster state bundle contains distribution config, this is internally propagated via the `StateManager` component to all registered state listeners. One such state listener is `FileStorManager`, which updates the content node-internal bucket space repository. All `SetSystemStateCommand` and internal config-aware components (`StateManager` and `ChangedBucketOwnershipHandler`) now explicitly track whether the cluster controller provides distribution config, or if the internally provided config should be used (including fallback to internal config if necessary).
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp29
-rw-r--r--storage/src/tests/common/testnodestateupdater.cpp17
-rw-r--r--storage/src/tests/common/testnodestateupdater.h9
-rw-r--r--storage/src/tests/common/teststorageapp.cpp10
-rw-r--r--storage/src/tests/common/teststorageapp.h3
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp54
-rw-r--r--storage/src/tests/storageserver/statemanagertest.cpp123
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp12
-rw-r--r--storage/src/vespa/storage/common/content_bucket_space.cpp29
-rw-r--r--storage/src/vespa/storage/common/content_bucket_space.h8
-rw-r--r--storage/src/vespa/storage/common/storagecomponent.h2
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp6
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h2
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp2
-rw-r--r--storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h32
-rw-r--r--storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp25
-rw-r--r--storage/src/vespa/storage/persistence/bucketownershipnotifier.h20
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp18
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp90
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h20
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.cpp91
-rw-r--r--storage/src/vespa/storage/storageserver/statemanager.h4
22 files changed, 394 insertions, 212 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index f41dae89eec..a20902422b0 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -1,9 +1,10 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <tests/common/dummystoragelink.h>
+#include <tests/common/storage_config_set.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
-#include <tests/common/storage_config_set.h>
+#include <vespa/config-stor-distribution.h>
#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/config/config-documenttypes.h>
#include <vespa/document/datatype/documenttype.h>
@@ -25,7 +26,6 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/stllike/asciistream.h>
#include <vespa/vespalib/testkit/test_path.h>
-#include <vespa/config-stor-distribution.h>
#include <future>
#include <vespa/log/log.h>
@@ -116,16 +116,17 @@ public:
BucketManagerTest::~BucketManagerTest() = default;
-#define ASSERT_DUMMYLINK_REPLY_COUNT(link, count) \
- if (link->getNumReplies() != count) { \
- std::ostringstream ost; \
- ost << "Expected there to be " << count << " replies in link, but " \
- << "found " << link->getNumReplies() << ":\n"; \
- for (uint32_t i=0; i<link->getNumReplies(); ++i) { \
- ost << link->getReply(i)->getType() << "\n"; \
- } \
- FAIL() << ost.str(); \
+void check_dummy_link_reply_count(const DummyStorageLink& link, size_t expected_count) {
+ if (link.getNumReplies() != expected_count) {
+ std::ostringstream ost;
+ ost << "Expected there to be " << expected_count << " replies in link, but "
+ << "found " << link.getNumReplies() << ":\n";
+ for (uint32_t i = 0; i < link.getNumReplies(); ++i) {
+ ost << link.getReply(i)->getType() << "\n";
+ }
+ FAIL() << ost.str();
}
+}
void BucketManagerTest::setupTestEnvironment()
{
@@ -313,7 +314,7 @@ TEST_F(BucketManagerTest, DISABLED_request_bucket_info_with_state) {
{
LOG(info, "Waiting for response from 3 request bucket info messages");
_top->waitForMessages(3, 5);
- ASSERT_DUMMYLINK_REPLY_COUNT(_top, 3);
+ ASSERT_NO_FATAL_FAILURE(check_dummy_link_reply_count(*_top, 3));
std::map<uint64_t, api::RequestBucketInfoReply::SP> replies;
for (uint32_t i=0; i<3; ++i) {
replies[_top->getReply(i)->getMsgId()]
@@ -357,7 +358,7 @@ TEST_F(BucketManagerTest, request_bucket_info_with_list) {
_top->sendDown(cmd);
_top->waitForMessages(1, 5);
- ASSERT_DUMMYLINK_REPLY_COUNT(_top, 1);
+ ASSERT_NO_FATAL_FAILURE(check_dummy_link_reply_count(*_top, 1));
auto reply = std::dynamic_pointer_cast<api::RequestBucketInfoReply>(_top->getReply(0));
_top->reset();
ASSERT_TRUE(reply.get());
@@ -527,7 +528,7 @@ class ConcurrentOperationFixture {
public:
explicit ConcurrentOperationFixture(BucketManagerTest& self)
: _self(self),
- _state(std::make_shared<lib::ClusterState>("distributor:1 storage:1"))
+ _state(std::make_shared<lib::ClusterState>("version:2 distributor:1 storage:1"))
{
_self.setupTestEnvironment();
_self._top->open();
diff --git a/storage/src/tests/common/testnodestateupdater.cpp b/storage/src/tests/common/testnodestateupdater.cpp
index f9671617352..5d1d7a085b9 100644
--- a/storage/src/tests/common/testnodestateupdater.cpp
+++ b/storage/src/tests/common/testnodestateupdater.cpp
@@ -25,17 +25,28 @@ TestNodeStateUpdater::getClusterStateBundle() const
}
void
+TestNodeStateUpdater::patch_distribution(std::shared_ptr<const lib::Distribution> distribution)
+{
+ _clusterStateBundle = _clusterStateBundle->clone_with_new_distribution(
+ lib::DistributionConfigBundle::of(std::move(distribution)));
+}
+
+void
TestNodeStateUpdater::setClusterState(std::shared_ptr<const lib::ClusterState> c)
{
- setClusterStateBundle(std::make_shared<const lib::ClusterStateBundle>(*c));
+ setClusterStateBundle(std::make_shared<const lib::ClusterStateBundle>(std::move(c)));
}
void
TestNodeStateUpdater::setClusterStateBundle(std::shared_ptr<const lib::ClusterStateBundle> clusterStateBundle)
{
+ auto existing_distr = _clusterStateBundle->distribution_config_bundle();
_clusterStateBundle = std::move(clusterStateBundle);
- for (uint32_t i = 0; i < _listeners.size(); ++i) {
- _listeners[i]->handleNewState();
+ if (!_clusterStateBundle->has_distribution_config() && existing_distr) {
+ _clusterStateBundle = _clusterStateBundle->clone_with_new_distribution(existing_distr);
+ }
+ for (auto* listener : _listeners) {
+ listener->handleNewState();
}
}
diff --git a/storage/src/tests/common/testnodestateupdater.h b/storage/src/tests/common/testnodestateupdater.h
index e5418c238d5..6a00e9a2264 100644
--- a/storage/src/tests/common/testnodestateupdater.h
+++ b/storage/src/tests/common/testnodestateupdater.h
@@ -1,11 +1,4 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class storage::TestNodeStateUpdater
- * \ingroup common
- *
- * \brief Test implementation of the node state updater.
- */
-
#pragma once
#include <vespa/storage/common/nodestateupdater.h>
@@ -14,6 +7,7 @@
namespace storage::lib {
class ClusterState;
class ClusterStateBundle;
+ class Distribution;
}
namespace storage {
@@ -53,6 +47,7 @@ public:
_current = std::make_shared<lib::NodeState>(state);
}
+ void patch_distribution(std::shared_ptr<const lib::Distribution> distribution);
void setClusterState(std::shared_ptr<const lib::ClusterState> c);
void setClusterStateBundle(std::shared_ptr<const lib::ClusterStateBundle> clusterStateBundle);
diff --git a/storage/src/tests/common/teststorageapp.cpp b/storage/src/tests/common/teststorageapp.cpp
index d811f100aec..b2b82a46850 100644
--- a/storage/src/tests/common/teststorageapp.cpp
+++ b/storage/src/tests/common/teststorageapp.cpp
@@ -6,10 +6,10 @@
#include <vespa/storage/config/config-stor-distributormanager.h>
#include <vespa/storage/config/config-stor-visitordispatcher.h>
#include <vespa/config-stor-distribution.h>
-#include <vespa/config-fleetcontroller.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/vdslib/state/cluster_state_bundle.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/time.h>
#include <vespa/config/subscription/configuri.h>
@@ -59,6 +59,7 @@ TestStorageApp::TestStorageApp(StorageComponentRegisterImpl::UP compReg,
auto distr = std::make_shared<lib::Distribution>(
lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount));
_compReg.setDistribution(distr);
+ _nodeStateUpdater.patch_distribution(distr);
}
TestStorageApp::~TestStorageApp() = default;
@@ -69,6 +70,7 @@ TestStorageApp::setDistribution(Redundancy redundancy, NodeCount nodeCount)
auto distr = std::make_shared<lib::Distribution>(
lib::Distribution::getDefaultDistributionConfig(redundancy, nodeCount));
_compReg.setDistribution(distr);
+ _nodeStateUpdater.patch_distribution(distr);
}
void
@@ -83,6 +85,12 @@ TestStorageApp::setClusterState(const lib::ClusterState& c)
_nodeStateUpdater.setClusterState(std::make_shared<lib::ClusterState>(c));
}
+void
+TestStorageApp::set_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle> state_bundle)
+{
+ _nodeStateUpdater.setClusterStateBundle(std::move(state_bundle));
+}
+
namespace {
NodeIndex node_index_from_config(const config::ConfigUri& uri) {
diff --git a/storage/src/tests/common/teststorageapp.h b/storage/src/tests/common/teststorageapp.h
index 04fa6996e15..c423761a9a2 100644
--- a/storage/src/tests/common/teststorageapp.h
+++ b/storage/src/tests/common/teststorageapp.h
@@ -73,6 +73,7 @@ public:
void setDistribution(Redundancy, NodeCount);
void setTypeRepo(std::shared_ptr<const document::DocumentTypeRepo> repo);
void setClusterState(const lib::ClusterState&);
+ void set_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle>);
// Utility functions for getting a hold of currently used bits. Practical
// to avoid adding extra components in the tests.
@@ -81,7 +82,7 @@ public:
std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() { return _compReg.getTypeRepo(); }
const document::BucketIdFactory& getBucketIdFactory() { return _compReg.getBucketIdFactory(); }
TestNodeStateUpdater& getStateUpdater() { return _nodeStateUpdater; }
- std::shared_ptr<lib::Distribution> & getDistribution() { return _compReg.getDistribution(); }
+ std::shared_ptr<const lib::Distribution> getDistribution() { return _compReg.getDistribution(); }
TestNodeStateUpdater& getNodeStateUpdater() { return _nodeStateUpdater; }
uint16_t getIndex() const { return _compReg.getIndex(); }
const NodeIdentity& node_identity() const noexcept { return _node_identity; }
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 8982b02f2b7..66fda885df7 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -53,6 +53,7 @@ struct ChangedBucketOwnershipHandlerTest : Test {
void applyDistribution(Redundancy, NodeCount);
void applyClusterState(const lib::ClusterState&);
+ void apply_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle>);
document::BucketId nextOwnedBucket(
uint16_t wantedOwner,
@@ -84,6 +85,21 @@ struct ChangedBucketOwnershipHandlerTest : Test {
return lib::ClusterState("distributor:4 storage:1 .0.s:d");
}
+ static std::shared_ptr<const lib::DistributionConfigBundle> make_distr_bundle(uint16_t node_count) {
+ return lib::DistributionConfigBundle::of(lib::Distribution::getDefaultDistributionConfig(1, node_count));
+ }
+
+ static std::shared_ptr<const lib::ClusterStateBundle> make_state_bundle_with_config(
+ vespalib::stringref state_str, uint16_t node_count)
+ {
+ return std::make_shared<const lib::ClusterStateBundle>(
+ std::make_shared<const lib::ClusterState>(state_str),
+ lib::ClusterStateBundle::BucketSpaceStateMapping{},
+ std::nullopt,
+ make_distr_bundle(node_count),
+ false);
+ }
+
void SetUp() override;
};
@@ -194,6 +210,7 @@ hasOnlySetSystemStateCmdQueued(DummyStorageLink& link) {
void
ChangedBucketOwnershipHandlerTest::applyDistribution(Redundancy redundancy, NodeCount nodeCount)
{
+ // TODO set distribution via state bundle instead
_app->setDistribution(redundancy, nodeCount);
_handler->storageDistributionChanged();
}
@@ -205,6 +222,13 @@ ChangedBucketOwnershipHandlerTest::applyClusterState(const lib::ClusterState& st
_handler->reloadClusterState();
}
+void
+ChangedBucketOwnershipHandlerTest::apply_cluster_state_bundle(std::shared_ptr<const lib::ClusterStateBundle> state_bundle)
+{
+ _app->set_cluster_state_bundle(std::move(state_bundle));
+ _handler->reloadClusterState();
+}
+
TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed_nodes) {
lib::ClusterState stateBefore("distributor:4 storage:1");
applyDistribution(Redundancy(1), NodeCount(4));
@@ -225,7 +249,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, enumerate_buckets_belonging_on_changed
EXPECT_TRUE(hasAbortedNoneOf(cmd, node2Buckets));
// Handler must swallow abort replies
- _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release()));
+ _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply()));
EXPECT_EQ(size_t(0), _top->getNumReplies());
}
@@ -312,7 +336,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, ownership_changed_on_distributor_up_ed
EXPECT_TRUE(hasAbortedNoneOf(cmd, node2Buckets));
// Handler must swallow abort replies
- _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply().release()));
+ _bottom->sendUp(api::StorageMessage::SP(cmd->makeReply()));
EXPECT_EQ(0, _top->getNumReplies());
}
@@ -345,6 +369,32 @@ TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_change_updates_own
sendAndExpectAbortedCreateBucket(2);
}
+TEST_F(ChangedBucketOwnershipHandlerTest, distribution_config_via_state_bundle_change_updates_ownership) {
+ apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:3 storage:1", 3));
+ // Apply new distribution config containing only 1 distributor, meaning
+ // any messages sent from >1 must be aborted.
+ // This test case is a bit dodgy since the CC should never send a state with more nodes in it than
+ // the distribution config allows for when _it_ is responsible for also sending the config.
+ apply_cluster_state_bundle(make_state_bundle_with_config("version:3 distributor:3 storage:1", 1));
+ sendAndExpectAbortedCreateBucket(2);
+}
+
+TEST_F(ChangedBucketOwnershipHandlerTest, ignore_internal_config_once_state_bundle_with_config_received) {
+ apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:1 storage:3", 1));
+ applyDistribution(Redundancy(1), NodeCount(3));
+ // Bundle config says 1 node, internal config says 3. Trust the bundle(tm).
+ sendAndExpectAbortedCreateBucket(2);
+}
+
+TEST_F(ChangedBucketOwnershipHandlerTest, revert_to_internal_config_if_distribution_no_longer_received_in_state_bundle) {
+ apply_cluster_state_bundle(make_state_bundle_with_config("version:2 distributor:1 storage:3", 3));
+
+ applyDistribution(Redundancy(1), NodeCount(1)); // not yet used
+ applyClusterState(lib::ClusterState("version:3 distributor:3 storage:3")); // no bundle config; revert to internal
+
+ sendAndExpectAbortedCreateBucket(2);
+}
+
/**
* Generate and dispatch a message of the given type with the provided
* arguments as if that message was sent from distributor 1. Messages will
diff --git a/storage/src/tests/storageserver/statemanagertest.cpp b/storage/src/tests/storageserver/statemanagertest.cpp
index 79246cb3ce1..90f13850072 100644
--- a/storage/src/tests/storageserver/statemanagertest.cpp
+++ b/storage/src/tests/storageserver/statemanagertest.cpp
@@ -10,6 +10,7 @@
#include <vespa/vdslib/state/clusterstate.h>
#include <vespa/storage/storageserver/statemanager.h>
#include <vespa/vespalib/data/slime/slime.h>
+#include <vespa/config-stor-distribution.h>
#include <vespa/vespalib/gtest/gtest.h>
using storage::lib::NodeState;
@@ -38,6 +39,23 @@ struct StateManagerTest : Test, NodeStateReporter {
return cmd;
}
+ static std::shared_ptr<const lib::ClusterStateBundle> make_state_bundle_with_config(
+ vespalib::stringref state_str, uint16_t num_nodes)
+ {
+ auto state = std::make_shared<const ClusterState>(state_str);
+ auto distr = lib::DistributionConfigBundle::of(lib::Distribution::getDefaultDistributionConfig(1, num_nodes));
+ return std::make_shared<lib::ClusterStateBundle>(std::move(state),
+ lib::ClusterStateBundle::BucketSpaceStateMapping{},
+ std::nullopt, std::move(distr), false);
+ }
+
+
+ static std::shared_ptr<api::SetSystemStateCommand> make_set_state_cmd_with_config(
+ vespalib::stringref state_str, uint16_t num_nodes)
+ {
+ return std::make_shared<api::SetSystemStateCommand>(make_state_bundle_with_config(state_str, num_nodes));
+ }
+
void get_single_reply(std::shared_ptr<api::StorageReply>& reply_out);
void get_only_ok_reply(std::shared_ptr<api::StorageReply>& reply_out);
void force_current_cluster_state_version(uint32_t version, uint16_t cc_index);
@@ -56,6 +74,10 @@ struct StateManagerTest : Test, NodeStateReporter {
void report(vespalib::JsonStream &) const override {}
void extract_cluster_state_version_from_host_info(uint32_t& version_out);
+
+ static vespalib::string to_string(const lib::Distribution::DistributionConfig& cfg) {
+ return lib::Distribution(cfg).serialized();
+ }
};
StateManagerTest::StateManagerTest()
@@ -148,26 +170,107 @@ StateManagerTest::extract_cluster_state_version_from_host_info(uint32_t& version
version_out = clusterStateVersionCursor.asLong();
}
-TEST_F(StateManagerTest, cluster_state) {
- std::shared_ptr<api::StorageReply> reply;
- // Verify initial state on startup
- auto currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
+TEST_F(StateManagerTest, cluster_state_and_config_has_expected_values_at_bootstrap) {
+ auto initial_bundle = _manager->getClusterStateBundle();
+ auto currentState = initial_bundle->getBaselineClusterState();
EXPECT_EQ("cluster:d", currentState->toString(false));
EXPECT_EQ(currentState->getVersion(), 0);
+ // Distribution config should be equal to the config the node is running with.
+ ASSERT_TRUE(initial_bundle->has_distribution_config());
+ EXPECT_EQ(to_string(initial_bundle->distribution_config_bundle()->config()),
+ _node->getComponentRegister().getDistribution()->serialized());
+
auto currentNodeState = _manager->getCurrentNodeState();
EXPECT_EQ("s:d", currentNodeState->toString(false));
+}
+
+TEST_F(StateManagerTest, can_receive_state_bundle_without_distribution_config) {
+ ClusterState send_state("version:2 distributor:1 storage:4 .2.s:m");
+ auto cmd = std::make_shared<api::SetSystemStateCommand>(send_state);
+ _upper->sendDown(cmd);
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+
+ auto current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(send_state, *current_bundle->getBaselineClusterState());
+ // Distribution config should be unchanged from boostrap.
+ ASSERT_TRUE(current_bundle->has_distribution_config());
+ EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()),
+ _node->getComponentRegister().getDistribution()->serialized());
+
+ auto current_node_state = _manager->getCurrentNodeState();
+ EXPECT_EQ("s:m", current_node_state->toString(false));
+}
+
+TEST_F(StateManagerTest, can_receive_state_bundle_with_distribution_config) {
+ auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5);
+ EXPECT_NE(to_string(cmd->getClusterStateBundle().distribution_config_bundle()->config()),
+ _node->getComponentRegister().getDistribution()->serialized());
+ _upper->sendDown(cmd);
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+
+ auto current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(*current_bundle, cmd->getClusterStateBundle()); // also compares distribution configs
+}
- ClusterState sendState("storage:4 .2.s:m");
- auto cmd = std::make_shared<api::SetSystemStateCommand>(sendState);
+TEST_F(StateManagerTest, receiving_cc_bundle_with_distribution_config_disables_node_distribution_config_propagation) {
+ auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5);
_upper->sendDown(cmd);
+ std::shared_ptr<api::StorageReply> reply;
ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+ // Explicitly setting distribution config should not propagate to the active state bundle
+ // since we've flipped to expecting config from the cluster controllers instead.
+ auto distr = std::make_shared<lib::Distribution>(lib::Distribution::getDefaultDistributionConfig(2, 7));
+ _node->getComponentRegister().setDistribution(distr);
+
+ auto current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(*current_bundle, cmd->getClusterStateBundle()); // unchanged
+}
+
+TEST_F(StateManagerTest, internal_distribution_config_is_propagated_if_none_yet_received_from_cc) {
+ _upper->sendDown(make_set_state_cmd("version:10 distributor:1 storage:4", 0));
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+
+ auto expected_bundle = make_state_bundle_with_config("version:10 distributor:1 storage:4", 7);
+ // Explicitly set internal config
+ _node->getComponentRegister().setDistribution(expected_bundle->distribution_config_bundle()->default_distribution_sp());
+ _manager->storageDistributionChanged();
+
+ auto current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(*current_bundle, *expected_bundle);
+}
+
+TEST_F(StateManagerTest, revert_to_internal_config_if_cc_no_longer_sends_distribution_config) {
+ // Initial state bundle _with_ distribution config
+ auto cmd = make_set_state_cmd_with_config("version:2 distributor:1 storage:4 .2.s:m", 5);
+ _upper->sendDown(cmd);
+ std::shared_ptr<api::StorageReply> reply;
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+
+ auto current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()),
+ to_string(cmd->getClusterStateBundle().distribution_config_bundle()->config()));
+
+ // CC then sends a new bundle _without_ config
+ _upper->sendDown(make_set_state_cmd("version:3 distributor:1 storage:4", 0));
+ ASSERT_NO_FATAL_FAILURE(get_only_ok_reply(reply));
+
+ // Config implicitly reverted to the active internal config
+ current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(to_string(current_bundle->distribution_config_bundle()->config()),
+ _node->getComponentRegister().getDistribution()->serialized());
- currentState = _manager->getClusterStateBundle()->getBaselineClusterState();
- EXPECT_EQ(sendState, *currentState);
+ // Explicitly set internal config
+ auto expected_bundle = make_state_bundle_with_config("version:3 distributor:1 storage:4", 7);
+ _node->getComponentRegister().setDistribution(expected_bundle->distribution_config_bundle()->default_distribution_sp());
+ _manager->storageDistributionChanged();
- currentNodeState = _manager->getCurrentNodeState();
- EXPECT_EQ("s:m", currentNodeState->toString(false));
+ // Internal config shall have taken effect, overriding that of the initial bundle
+ current_bundle = _manager->getClusterStateBundle();
+ EXPECT_EQ(*current_bundle, *expected_bundle);
}
TEST_F(StateManagerTest, accept_lower_state_versions_if_strict_requirement_disabled) {
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 1f20a19ec51..280afe8fb91 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -526,12 +526,12 @@ BucketManager::processRequestBucketInfoCommands(document::BucketSpace bucketSpac
using RBISP = std::shared_ptr<api::RequestBucketInfoCommand>;
std::map<uint16_t, RBISP> requests;
- // TODO fetch distribution from bundle as well
- auto distribution(_component.getBucketSpaceRepo().get(bucketSpace).getDistribution());
- auto clusterStateBundle(_component.getStateUpdater().getClusterStateBundle());
- assert(clusterStateBundle);
- lib::ClusterState::CSP clusterState(clusterStateBundle->getDerivedClusterState(bucketSpace));
- assert(clusterState.get());
+ auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
+ assert(clusterStateBundle && clusterStateBundle->has_distribution_config());
+ auto clusterState = clusterStateBundle->getDerivedClusterState(bucketSpace);
+ assert(clusterState);
+ const auto distribution = clusterStateBundle->bucket_space_distribution_or_nullptr(bucketSpace);
+ assert(distribution);
const auto our_hash = distribution->getNodeGraph().getDistributionConfigHash();
diff --git a/storage/src/vespa/storage/common/content_bucket_space.cpp b/storage/src/vespa/storage/common/content_bucket_space.cpp
index 92b5257b991..d66219794c7 100644
--- a/storage/src/vespa/storage/common/content_bucket_space.cpp
+++ b/storage/src/vespa/storage/common/content_bucket_space.cpp
@@ -10,6 +10,7 @@ ClusterStateAndDistribution::ClusterStateAndDistribution(
: _cluster_state(std::move(cluster_state)),
_distribution(std::move(distribution))
{
+ assert(_cluster_state && _distribution);
}
ClusterStateAndDistribution::~ClusterStateAndDistribution() = default;
@@ -48,34 +49,6 @@ ContentBucketSpace::state_and_distribution() const noexcept {
return _state_and_distribution;
}
-void
-ContentBucketSpace::setClusterState(std::shared_ptr<const lib::ClusterState> clusterState)
-{
- std::lock_guard guard(_lock);
- _state_and_distribution = _state_and_distribution->with_new_state(std::move(clusterState));
-}
-
-std::shared_ptr<const lib::ClusterState>
-ContentBucketSpace::getClusterState() const
-{
- std::lock_guard guard(_lock);
- return _state_and_distribution->_cluster_state;
-}
-
-void
-ContentBucketSpace::setDistribution(std::shared_ptr<const lib::Distribution> distribution)
-{
- std::lock_guard guard(_lock);
- _state_and_distribution = _state_and_distribution->with_new_distribution(std::move(distribution));
-}
-
-std::shared_ptr<const lib::Distribution>
-ContentBucketSpace::getDistribution() const
-{
- std::lock_guard guard(_lock);
- return _state_and_distribution->_distribution;
-}
-
bool
ContentBucketSpace::getNodeUpInLastNodeStateSeenByProvider() const
{
diff --git a/storage/src/vespa/storage/common/content_bucket_space.h b/storage/src/vespa/storage/common/content_bucket_space.h
index eb48640c97b..67ee4209d35 100644
--- a/storage/src/vespa/storage/common/content_bucket_space.h
+++ b/storage/src/vespa/storage/common/content_bucket_space.h
@@ -54,14 +54,6 @@ public:
void set_state_and_distribution(std::shared_ptr<const ClusterStateAndDistribution> state_and_distr) noexcept;
[[nodiscard]] std::shared_ptr<const ClusterStateAndDistribution> state_and_distribution() const noexcept;
- // TODO deprecate; only use atomic state+distribution setter
- void setClusterState(std::shared_ptr<const lib::ClusterState> clusterState);
- // TODO deprecate; only use atomic state+distribution getter
- std::shared_ptr<const lib::ClusterState> getClusterState() const;
- // TODO deprecate; only use atomic state+distribution setter
- void setDistribution(std::shared_ptr<const lib::Distribution> distribution);
- // TODO deprecate; only use atomic state+distribution getter
- std::shared_ptr<const lib::Distribution> getDistribution() const;
bool getNodeUpInLastNodeStateSeenByProvider() const;
void setNodeUpInLastNodeStateSeenByProvider(bool nodeUpInLastNodeStateSeenByProvider);
diff --git a/storage/src/vespa/storage/common/storagecomponent.h b/storage/src/vespa/storage/common/storagecomponent.h
index 677d5652fe3..ba12fc666a9 100644
--- a/storage/src/vespa/storage/common/storagecomponent.h
+++ b/storage/src/vespa/storage/common/storagecomponent.h
@@ -61,7 +61,7 @@ public:
const std::shared_ptr<const document::FieldSetRepo> fieldSetRepo;
};
using UP = std::unique_ptr<StorageComponent>;
- using DistributionSP = std::shared_ptr<lib::Distribution>;
+ using DistributionSP = std::shared_ptr<const lib::Distribution>;
/**
* Node type is supposed to be set immediately, and never be updated.
diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
index ab1cbf0b4d7..4d1daa329fd 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.cpp
@@ -23,11 +23,9 @@ ServiceLayerComponentRegisterImpl::registerServiceLayerComponent(ServiceLayerMan
}
void
-ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr<lib::Distribution> distribution)
+ServiceLayerComponentRegisterImpl::setDistribution(std::shared_ptr<const lib::Distribution> distribution)
{
- _bucketSpaceRepo.get(document::FixedBucketSpaces::default_space()).setDistribution(distribution);
- auto global_distr = lib::GlobalBucketSpaceDistributionConverter::convert_to_global(*distribution);
- _bucketSpaceRepo.get(document::FixedBucketSpaces::global_space()).setDistribution(global_distr);
+ // TODO remove this override entirely?
StorageComponentRegisterImpl::setDistribution(distribution);
}
diff --git a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h
index 1589192b92e..3478c0b3b9c 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h
+++ b/storage/src/vespa/storage/frameworkimpl/component/servicelayercomponentregisterimpl.h
@@ -34,7 +34,7 @@ public:
}
void registerServiceLayerComponent(ServiceLayerManagedComponent&) override;
- void setDistribution(std::shared_ptr<lib::Distribution> distribution) override;
+ void setDistribution(std::shared_ptr<const lib::Distribution> distribution) override;
};
} // storage
diff --git a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp
index bd0853a3524..ab1d80dc0b9 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp
+++ b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.cpp
@@ -91,7 +91,7 @@ StorageComponentRegisterImpl::setBucketIdFactory(const document::BucketIdFactory
}
void
-StorageComponentRegisterImpl::setDistribution(std::shared_ptr<lib::Distribution> distribution)
+StorageComponentRegisterImpl::setDistribution(std::shared_ptr<const lib::Distribution> distribution)
{
std::lock_guard lock(_componentLock);
_distribution = distribution;
diff --git a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h
index abb60051fe1..271b38399b6 100644
--- a/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h
+++ b/storage/src/vespa/storage/frameworkimpl/component/storagecomponentregisterimpl.h
@@ -21,16 +21,16 @@ class StorageComponentRegisterImpl
{
using BucketspacesConfig = vespa::config::content::core::internal::InternalBucketspacesType;
- std::mutex _componentLock;
- std::vector<StorageComponent*> _components;
- vespalib::string _clusterName;
- const lib::NodeType* _nodeType;
- uint16_t _index;
+ std::mutex _componentLock;
+ std::vector<StorageComponent*> _components;
+ vespalib::string _clusterName;
+ const lib::NodeType* _nodeType;
+ uint16_t _index;
std::shared_ptr<const document::DocumentTypeRepo> _docTypeRepo;
- document::BucketIdFactory _bucketIdFactory;
- std::shared_ptr<lib::Distribution> _distribution;
- NodeStateUpdater* _nodeStateUpdater;
- BucketspacesConfig _bucketSpacesConfig;
+ document::BucketIdFactory _bucketIdFactory;
+ std::shared_ptr<const lib::Distribution> _distribution;
+ NodeStateUpdater* _nodeStateUpdater;
+ BucketspacesConfig _bucketSpacesConfig;
public:
using UP = std::unique_ptr<StorageComponentRegisterImpl>;
@@ -38,12 +38,12 @@ public:
StorageComponentRegisterImpl();
~StorageComponentRegisterImpl() override;
- const lib::NodeType& getNodeType() const { return *_nodeType; }
- uint16_t getIndex() const { return _index; }
- std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() { return _docTypeRepo; }
- const document::BucketIdFactory& getBucketIdFactory() { return _bucketIdFactory; }
- std::shared_ptr<lib::Distribution> & getDistribution() { return _distribution; }
- NodeStateUpdater& getNodeStateUpdater() { return *_nodeStateUpdater; }
+ [[nodiscard]] const lib::NodeType& getNodeType() const noexcept { return *_nodeType; }
+ [[nodiscard]] uint16_t getIndex() const noexcept { return _index; }
+ [[nodiscard]] std::shared_ptr<const document::DocumentTypeRepo> getTypeRepo() const noexcept { return _docTypeRepo; }
+ [[nodiscard]] const document::BucketIdFactory& getBucketIdFactory() const noexcept { return _bucketIdFactory; }
+ [[nodiscard]] const std::shared_ptr<const lib::Distribution>& getDistribution() const noexcept { return _distribution; }
+ [[nodiscard]] NodeStateUpdater& getNodeStateUpdater() noexcept { return *_nodeStateUpdater; }
void registerStorageComponent(StorageComponent&) override;
@@ -51,7 +51,7 @@ public:
virtual void setNodeStateUpdater(NodeStateUpdater& updater);
virtual void setDocumentTypeRepo(std::shared_ptr<const document::DocumentTypeRepo>);
virtual void setBucketIdFactory(const document::BucketIdFactory&);
- virtual void setDistribution(std::shared_ptr<lib::Distribution>);
+ virtual void setDistribution(std::shared_ptr<const lib::Distribution>);
virtual void setBucketSpacesConfig(const BucketspacesConfig&);
};
diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
index d5de11c7d6f..f571f6272b8 100644
--- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
+++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.cpp
@@ -16,13 +16,14 @@ using document::BucketSpace;
namespace storage {
uint16_t
-BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bucket) const
+BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket& bucket) const
{
try {
- // TODO use state updater bundle for everything?
- auto distribution(_component.getBucketSpaceRepo().get(bucket.getBucketSpace()).getDistribution());
- const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
- const auto &clusterState = *clusterStateBundle->getDerivedClusterState(bucket.getBucketSpace());
+ const auto state_bundle = _component.getStateUpdater().getClusterStateBundle();
+ assert(state_bundle && state_bundle->has_distribution_config());
+ const auto* distribution = state_bundle->distribution_config_bundle()->bucket_space_distribution_or_nullptr_raw(bucket.getBucketSpace());
+ assert(distribution);
+ const auto& clusterState = *state_bundle->getDerivedClusterState(bucket.getBucketSpace());
return (distribution->getIdealDistributorNode(clusterState, bucket.getBucketId()));
// If we get exceptions there aren't any distributors, so they'll have
// to explicitly fetch all bucket info eventually anyway.
@@ -39,7 +40,7 @@ BucketOwnershipNotifier::getOwnerDistributorForBucket(const document::Bucket &bu
}
bool
-BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket &bucket) const
+BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::Bucket& bucket) const
{
return (distributor == getOwnerDistributorForBucket(bucket));
}
@@ -47,7 +48,7 @@ BucketOwnershipNotifier::distributorOwns(uint16_t distributor, const document::B
void
BucketOwnershipNotifier::sendNotifyBucketToDistributor(
uint16_t distributorIndex,
- const document::Bucket &bucket,
+ const document::Bucket& bucket,
const api::BucketInfo& infoToSend)
{
if (!infoToSend.valid()) {
@@ -71,7 +72,7 @@ BucketOwnershipNotifier::sendNotifyBucketToDistributor(
}
void
-BucketOwnershipNotifier::logNotification(const document::Bucket &bucket,
+BucketOwnershipNotifier::logNotification(const document::Bucket& bucket,
uint16_t sourceIndex,
uint16_t currentOwnerIndex,
const api::BucketInfo& newInfo)
@@ -88,7 +89,7 @@ BucketOwnershipNotifier::logNotification(const document::Bucket &bucket,
void
BucketOwnershipNotifier::notifyIfOwnershipChanged(
- const document::Bucket &bucket,
+ const document::Bucket& bucket,
uint16_t sourceIndex,
const api::BucketInfo& infoToSend)
{
@@ -111,7 +112,7 @@ BucketOwnershipNotifier::notifyIfOwnershipChanged(
void
BucketOwnershipNotifier::sendNotifyBucketToCurrentOwner(
- const document::Bucket &bucket,
+ const document::Bucket& bucket,
const api::BucketInfo& infoToSend)
{
uint16_t distributor(getOwnerDistributorForBucket(bucket));
@@ -134,7 +135,7 @@ NotificationGuard::~NotificationGuard()
}
void
-NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket,
+NotificationGuard::notifyIfOwnershipChanged(const document::Bucket& bucket,
uint16_t sourceIndex,
const api::BucketInfo& infoToSend)
{
@@ -142,7 +143,7 @@ NotificationGuard::notifyIfOwnershipChanged(const document::Bucket &bucket,
}
void
-NotificationGuard::notifyAlways(const document::Bucket &bucket,
+NotificationGuard::notifyAlways(const document::Bucket& bucket,
const api::BucketInfo& infoToSend)
{
BucketToCheck bc(bucket, 0xffff, infoToSend);
diff --git a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h
index fc3d9209f5f..93a2c1a2104 100644
--- a/storage/src/vespa/storage/persistence/bucketownershipnotifier.h
+++ b/storage/src/vespa/storage/persistence/bucketownershipnotifier.h
@@ -11,29 +11,29 @@ namespace storage {
class BucketOwnershipNotifier
{
- const ServiceLayerComponent & _component;
- MessageSender & _sender;
+ const ServiceLayerComponent& _component;
+ MessageSender& _sender;
public:
BucketOwnershipNotifier(const ServiceLayerComponent& component, MessageSender& sender)
: _component(component),
_sender(sender)
{}
- bool distributorOwns(uint16_t distributor, const document::Bucket &bucket) const;
- void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend);
- void sendNotifyBucketToCurrentOwner(const document::Bucket &bucket, const api::BucketInfo& infoToSend);
+ [[nodiscard]] bool distributorOwns(uint16_t distributor, const document::Bucket& bucket) const;
+ void notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend);
+ void sendNotifyBucketToCurrentOwner(const document::Bucket& bucket, const api::BucketInfo& infoToSend);
private:
enum IndexMeta {
FAILED_TO_RESOLVE = 0xffff
};
- void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket &bucket,
+ void sendNotifyBucketToDistributor(uint16_t distributorIndex, const document::Bucket& bucket,
const api::BucketInfo& infoToSend);
// Returns either index or FAILED_TO_RESOLVE
uint16_t getOwnerDistributorForBucket(const document::Bucket &bucket) const;
- void logNotification(const document::Bucket &bucket, uint16_t sourceIndex,
+ void logNotification(const document::Bucket& bucket, uint16_t sourceIndex,
uint16_t currentOwnerIndex, const api::BucketInfo& newInfo);
};
@@ -60,7 +60,7 @@ class NotificationGuard
BucketOwnershipNotifier& _notifier;
std::vector<BucketToCheck> _bucketsToCheck;
public:
- NotificationGuard(BucketOwnershipNotifier& notifier)
+ explicit NotificationGuard(BucketOwnershipNotifier& notifier)
: _notifier(notifier),
_bucketsToCheck()
{}
@@ -69,8 +69,8 @@ public:
~NotificationGuard();
- void notifyIfOwnershipChanged(const document::Bucket &bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend);
- void notifyAlways(const document::Bucket &bucket, const api::BucketInfo& infoToSend);
+ void notifyIfOwnershipChanged(const document::Bucket& bucket, uint16_t sourceIndex, const api::BucketInfo& infoToSend);
+ void notifyAlways(const document::Bucket& bucket, const api::BucketInfo& infoToSend);
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 495497d507d..e44490388e2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -887,9 +887,9 @@ bool
FileStorManager::maintenance_in_all_spaces(const lib::Node& node) const noexcept
{
for (const auto& elem : _component.getBucketSpaceRepo()) {
- const ContentBucketSpace& bucket_space = *elem.second;
- auto derived_cluster_state = bucket_space.getClusterState();
- if (!derived_cluster_state->getNodeState(node).getState().oneOf("m")) {
+ const auto space_state_and_distr = elem.second->state_and_distribution();
+ const auto& derived_cluster_state = space_state_and_distr->cluster_state();
+ if (!derived_cluster_state.getNodeState(node).getState().oneOf("m")) {
return false;
}
}
@@ -915,8 +915,7 @@ FileStorManager::maybe_log_received_cluster_state()
{
if (LOG_WOULD_LOG(debug)) {
auto cluster_state_bundle = _component.getStateUpdater().getClusterStateBundle();
- auto baseline_state = cluster_state_bundle->getBaselineClusterState();
- LOG(debug, "FileStorManager received baseline cluster state '%s'", baseline_state->toString().c_str());
+ LOG(debug, "FileStorManager received baseline cluster state '%s'", cluster_state_bundle->toString().c_str());
}
}
@@ -951,16 +950,17 @@ FileStorManager::updateState()
void
FileStorManager::storageDistributionChanged()
{
- updateState();
}
void
FileStorManager::propagateClusterStates()
{
auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
- for (const auto &elem : _component.getBucketSpaceRepo()) {
- // TODO also distribution! bundle and repo must be 1-1
- elem.second->setClusterState(clusterStateBundle->getDerivedClusterState(elem.first));
+ assert(clusterStateBundle->has_distribution_config());
+ for (const auto& elem : _component.getBucketSpaceRepo()) {
+ elem.second->set_state_and_distribution(std::make_shared<ClusterStateAndDistribution>(
+ clusterStateBundle->getDerivedClusterState(elem.first),
+ clusterStateBundle->bucket_space_distribution_or_nullptr(elem.first)));
}
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index 25829f3d391..8702cb1272b 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -31,10 +31,11 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler(
_stateLock(),
_currentState(), // Not set yet, so ownership will not be valid
_currentOwnership(std::make_shared<OwnershipState>(
- _component.getBucketSpaceRepo(), _currentState)),
+ _currentState, lib::DistributionConfigBundle::of(_component.getDistribution()))),
_abortQueuedAndPendingOnStateChange(false),
_abortMutatingIdealStateOps(false),
- _abortMutatingExternalLoadOps(false)
+ _abortMutatingExternalLoadOps(false),
+ _receiving_distribution_config_from_cc(false)
{
on_configure(bootstrap_config);
_component.registerMetric(_metrics);
@@ -61,16 +62,29 @@ ChangedBucketOwnershipHandler::reloadClusterState()
{
std::lock_guard guard(_stateLock);
const auto clusterStateBundle = _component.getStateUpdater().getClusterStateBundle();
- setCurrentOwnershipWithStateNoLock(*clusterStateBundle);
+ setCurrentOwnershipWithStateNoLock(std::move(clusterStateBundle));
}
void
-ChangedBucketOwnershipHandler::setCurrentOwnershipWithStateNoLock(
- const lib::ClusterStateBundle& newState)
+ChangedBucketOwnershipHandler::setCurrentOwnershipWithStateNoLock(std::shared_ptr<const lib::ClusterStateBundle> new_state)
{
- _currentState = std::make_shared<const lib::ClusterStateBundle>(newState);
- _currentOwnership = std::make_shared<const OwnershipState>(
- _component.getBucketSpaceRepo(), _currentState);
+ LOG(debug, "Setting new ownership state bundle: %s", new_state->toString().c_str());
+ std::shared_ptr<const lib::DistributionConfigBundle> distributions;
+ _currentState = std::move(new_state);
+ // This partially duplicates distribution config fallback logic from StateManager, but that's because
+ // this component has the same approach to intercepting both state commands and distribution config
+ // changes.
+ // `new_state` can come straight from a SetSystemStateCommand, which may or may not have a state bundle
+ // with distribution config strapped to it.
+ if (_currentState->has_distribution_config()) {
+ distributions = _currentState->distribution_config_bundle();
+ } else {
+ distributions = lib::DistributionConfigBundle::of(_component.getDistribution());
+ LOG(debug, "No distribution config in bundle; using current host config of '%s'",
+ distributions->default_distribution().getNodeGraph().getDistributionConfigHash().c_str());
+ }
+ _receiving_distribution_config_from_cc = _currentState->has_distribution_config();
+ _currentOwnership = std::make_shared<const OwnershipState>(_currentState, std::move(distributions));
}
namespace {
@@ -98,17 +112,11 @@ ChangedBucketOwnershipHandler::Metrics::Metrics(metrics::MetricSet* owner)
{}
ChangedBucketOwnershipHandler::Metrics::~Metrics() = default;
-ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo,
- std::shared_ptr<const lib::ClusterStateBundle> state)
- : _distributions(),
- _state(std::move(state))
+ChangedBucketOwnershipHandler::OwnershipState::OwnershipState(std::shared_ptr<const lib::ClusterStateBundle> state,
+ std::shared_ptr<const lib::DistributionConfigBundle> distributions)
+ : _state(std::move(state)),
+ _distributions(std::move(distributions))
{
- for (const auto &elem : contentBucketSpaceRepo) {
- auto distribution = elem.second->getDistribution();
- if (distribution) {
- _distributions.emplace(elem.first, std::move(distribution));
- }
- }
}
@@ -123,19 +131,15 @@ ChangedBucketOwnershipHandler::OwnershipState::getBaselineState() const
}
uint16_t
-ChangedBucketOwnershipHandler::OwnershipState::ownerOf(
- const document::Bucket& bucket) const
+ChangedBucketOwnershipHandler::OwnershipState::ownerOf(const document::Bucket& bucket) const
{
- auto distributionItr = _distributions.find(bucket.getBucketSpace());
- assert(distributionItr != _distributions.end());
- const auto &distribution = *distributionItr->second;
- const auto &derivedState = *_state->getDerivedClusterState(bucket.getBucketSpace());
+ const auto* distribution = _distributions->bucket_space_distribution_or_nullptr_raw(bucket.getBucketSpace());
+ assert(distribution);
+ const auto& derivedState = *_state->getDerivedClusterState(bucket.getBucketSpace());
try {
- return distribution.getIdealDistributorNode(derivedState, bucket.getBucketId());
+ return distribution->getIdealDistributorNode(derivedState, bucket.getBucketId());
} catch (lib::TooFewBucketBitsInUseException& e) {
- LOGBP(debug,
- "Too few bucket bits used for %s to be assigned to "
- "a distributor.",
+ LOGBP(debug, "Too few bucket bits used for %s to be assigned to a distributor.",
bucket.toString().c_str());
} catch (lib::NoDistributorsAvailableException& e) {
LOGBP(warning,
@@ -144,11 +148,9 @@ ChangedBucketOwnershipHandler::OwnershipState::ownerOf(
"for available distributors before reaching this code path! "
"Cluster state is '%s', distribution is '%s'",
derivedState.toString().c_str(),
- distribution.toString().c_str());
+ distribution->toString().c_str());
} catch (const std::exception& e) {
- LOG(error,
- "Got unknown exception while resolving distributor: %s",
- e.what());
+ LOG(error, "Got unknown exception while resolving distributor: %s", e.what());
}
return FAILED_TO_RESOLVE;
}
@@ -162,9 +164,7 @@ ChangedBucketOwnershipHandler::OwnershipState::storageNodeUp(document::BucketSpa
}
void
-ChangedBucketOwnershipHandler::logTransition(
- const lib::ClusterState& currentState,
- const lib::ClusterState& newState) const
+ChangedBucketOwnershipHandler::logTransition(const lib::ClusterState& currentState, const lib::ClusterState& newState)
{
LOG(debug,
"State transition '%s' -> '%s' changes distributor bucket ownership, "
@@ -269,7 +269,7 @@ public:
{
std::lock_guard guard(_owner._stateLock);
old_ownership = _owner._currentOwnership;
- _owner.setCurrentOwnershipWithStateNoLock(_command->getClusterStateBundle());
+ _owner.setCurrentOwnershipWithStateNoLock(_command->cluster_state_bundle_ptr());
new_ownership = _owner._currentOwnership;
}
assert(new_ownership->valid());
@@ -287,7 +287,7 @@ public:
new_ownership->getBaselineState().toString().c_str());
return _owner.sendDown(_command);;
}
- _owner.logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState());
+ logTransition(old_ownership->getBaselineState(), new_ownership->getBaselineState());
metrics::MetricTimer duration_timer;
auto predicate = _owner.makeLazyAbortPredicate(old_ownership, new_ownership);
@@ -327,17 +327,19 @@ ChangedBucketOwnershipHandler::onSetSystemState(
* Invoked whenever a distribution config change happens and is called in the
* context of the config updater thread (which is why we have to lock).
*/
+ // TODO remove this when there are no more state bundles without distribution config
void
ChangedBucketOwnershipHandler::storageDistributionChanged()
{
std::lock_guard guard(_stateLock);
- _currentOwnership = std::make_shared<OwnershipState>(
- _component.getBucketSpaceRepo(), _currentState);
+ if (!_receiving_distribution_config_from_cc) {
+ _currentOwnership = std::make_shared<OwnershipState>(
+ _currentState, lib::DistributionConfigBundle::of(_component.getDistribution()));
+ }
}
bool
-ChangedBucketOwnershipHandler::isMutatingIdealStateOperation(
- const api::StorageMessage& msg) const
+ChangedBucketOwnershipHandler::isMutatingIdealStateOperation(const api::StorageMessage& msg)
{
switch (msg.getType().getId()) {
case api::MessageType::CREATEBUCKET_ID:
@@ -357,8 +359,7 @@ ChangedBucketOwnershipHandler::isMutatingIdealStateOperation(
bool
-ChangedBucketOwnershipHandler::isMutatingExternalOperation(
- const api::StorageMessage& msg) const
+ChangedBucketOwnershipHandler::isMutatingExternalOperation(const api::StorageMessage& msg)
{
switch (msg.getType().getId()) {
case api::MessageType::PUT_ID:
@@ -418,8 +419,7 @@ ChangedBucketOwnershipHandler::abortOperation(api::StorageCommand& cmd)
}
bool
-ChangedBucketOwnershipHandler::isMutatingCommandAndNeedsChecking(
- const api::StorageMessage& msg) const
+ChangedBucketOwnershipHandler::isMutatingCommandAndNeedsChecking(const api::StorageMessage& msg) const
{
if (enabledIdealStateAborting() && isMutatingIdealStateOperation(msg)) {
return true;
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
index 801534385f7..2c32704d64b 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
@@ -25,6 +25,7 @@ namespace lib {
class ClusterState;
class ClusterStateBundle;
class Distribution;
+ class DistributionConfigBundle;
}
/**
@@ -77,20 +78,20 @@ public:
*/
class OwnershipState {
using BucketSpace = document::BucketSpace;
- std::unordered_map<BucketSpace, std::shared_ptr<const lib::Distribution>, BucketSpace::hash> _distributions;
std::shared_ptr<const lib::ClusterStateBundle> _state;
+ std::shared_ptr<const lib::DistributionConfigBundle> _distributions;
public:
using SP = std::shared_ptr<OwnershipState>;
using CSP = std::shared_ptr<const OwnershipState>;
- OwnershipState(const ContentBucketSpaceRepo &contentBucketSpaceRepo,
- std::shared_ptr<const lib::ClusterStateBundle> state);
+ OwnershipState(std::shared_ptr<const lib::ClusterStateBundle> state,
+ std::shared_ptr<const lib::DistributionConfigBundle> distributions);
~OwnershipState();
static const uint16_t FAILED_TO_RESOLVE = 0xffff;
[[nodiscard]] bool valid() const noexcept {
- return (!_distributions.empty() && _state);
+ return (_distributions && _state);
}
/**
@@ -124,20 +125,21 @@ private:
std::atomic<bool> _abortQueuedAndPendingOnStateChange;
std::atomic<bool> _abortMutatingIdealStateOps;
std::atomic<bool> _abortMutatingExternalLoadOps;
+ bool _receiving_distribution_config_from_cc;
std::unique_ptr<AbortBucketOperationsCommand::AbortPredicate>
makeLazyAbortPredicate(
const OwnershipState::CSP& oldOwnership,
const OwnershipState::CSP& newOwnership) const;
- void logTransition(const lib::ClusterState& currentState,
- const lib::ClusterState& newState) const;
+ static void logTransition(const lib::ClusterState& currentState,
+ const lib::ClusterState& newState);
/**
* Creates a new immutable OwnershipState based on the current distribution
* and the provided cluster state and assigns it to _currentOwnership.
*/
- void setCurrentOwnershipWithStateNoLock(const lib::ClusterStateBundle&);
+ void setCurrentOwnershipWithStateNoLock(std::shared_ptr<const lib::ClusterStateBundle>);
/**
* Grabs _stateLock and returns a shared_ptr to the current ownership
@@ -147,9 +149,9 @@ private:
bool isMutatingCommandAndNeedsChecking(const api::StorageMessage&) const;
- bool isMutatingIdealStateOperation(const api::StorageMessage&) const;
+ static bool isMutatingIdealStateOperation(const api::StorageMessage&);
- bool isMutatingExternalOperation(const api::StorageMessage&) const;
+ static bool isMutatingExternalOperation(const api::StorageMessage&);
/**
* Returns whether the operation in cmd has a bucket whose ownership in
* the current cluster state does not match the distributor marked as
diff --git a/storage/src/vespa/storage/storageserver/statemanager.cpp b/storage/src/vespa/storage/storageserver/statemanager.cpp
index a2106dce8d2..560982fb6c0 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.cpp
+++ b/storage/src/vespa/storage/storageserver/statemanager.cpp
@@ -29,7 +29,18 @@ using vespalib::make_string_short::fmt;
namespace storage {
namespace {
- constexpr vespalib::duration MAX_TIMEOUT = 600s;
+constexpr vespalib::duration MAX_TIMEOUT = 600s;
+
+[[nodiscard]] std::shared_ptr<const lib::ClusterStateBundle>
+make_bootstrap_state_bundle(std::shared_ptr<const lib::Distribution> config) {
+ return std::make_shared<const lib::ClusterStateBundle>(
+ std::make_shared<lib::ClusterState>(),
+ lib::ClusterStateBundle::BucketSpaceStateMapping{},
+ std::nullopt,
+ lib::DistributionConfigBundle::of(std::move(config)),
+ false);
+}
+
}
struct StateManager::StateManagerMetrics : metrics::MetricSet {
@@ -62,7 +73,8 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_listenerLock(),
_nodeState(std::make_shared<lib::NodeState>(_component.getNodeType(), lib::State::DOWN)),
_nextNodeState(),
- _systemState(std::make_shared<const ClusterStateBundle>(lib::ClusterState())),
+ _configured_distribution(_component.getDistribution()),
+ _systemState(make_bootstrap_state_bundle(_configured_distribution)),
_nextSystemState(),
_reported_host_info_cluster_state_version(0),
_stateListeners(),
@@ -81,6 +93,7 @@ StateManager::StateManager(StorageComponentRegister& compReg,
_noThreadTestMode(testMode),
_grabbedExternalLock(false),
_require_strictly_increasing_cluster_state_versions(false),
+ _receiving_distribution_config_from_cc(false),
_notifyingListeners(false),
_requested_almost_immediate_node_state_replies(false)
{
@@ -170,8 +183,7 @@ lib::NodeState::CSP
StateManager::getCurrentNodeState() const
{
std::lock_guard lock(_stateLock);
- return std::make_shared<const lib::NodeState>
- (_systemState->getBaselineClusterState()->getNodeState(thisNode()));
+ return std::make_shared<const lib::NodeState>(_systemState->getBaselineClusterState()->getNodeState(thisNode()));
}
std::shared_ptr<const lib::ClusterStateBundle>
@@ -181,6 +193,28 @@ StateManager::getClusterStateBundle() const
return _systemState;
}
+// TODO remove when distribution config is only received from cluster controller
+void
+StateManager::storageDistributionChanged()
+{
+ {
+ std::lock_guard lock(_stateLock);
+ _configured_distribution = _component.getDistribution();
+ if (_receiving_distribution_config_from_cc) {
+ return; // nothing more to do
+ }
+ // Avoid losing any pending state if this callback happens in the middle of a
+ // state update. This edge case is practically impossible to unit test today...
+ const auto patch_state = _nextSystemState ? _nextSystemState : _systemState;
+ _nextSystemState = patch_state->clone_with_new_distribution(
+ lib::DistributionConfigBundle::of(_configured_distribution));
+ }
+ // We've assembled a new state bundle based on the (non-distribution carrying) state
+ // bundle from the cluster controller and our own internal config. Propagate it as one
+ // unit to the internal components.
+ notifyStateListeners();
+}
+
void
StateManager::addStateListener(StateListener& listener)
{
@@ -316,12 +350,12 @@ using BucketSpaceToTransitionString = std::unordered_map<document::BucketSpace,
document::BucketSpace::hash>;
void
-considerInsertDerivedTransition(const lib::State &currentBaseline,
- const lib::State &newBaseline,
- const lib::State &currentDerived,
- const lib::State &newDerived,
- const document::BucketSpace &bucketSpace,
- BucketSpaceToTransitionString &transitions)
+considerInsertDerivedTransition(const lib::State& currentBaseline,
+ const lib::State& newBaseline,
+ const lib::State& currentDerived,
+ const lib::State& newDerived,
+ const document::BucketSpace& bucketSpace,
+ BucketSpaceToTransitionString& transitions)
{
bool considerDerivedTransition = ((currentDerived != newDerived) &&
((currentDerived != currentBaseline) || (newDerived != newBaseline)));
@@ -333,28 +367,28 @@ considerInsertDerivedTransition(const lib::State &currentBaseline,
}
BucketSpaceToTransitionString
-calculateDerivedClusterStateTransitions(const ClusterStateBundle &currentState,
- const ClusterStateBundle &newState,
+calculateDerivedClusterStateTransitions(const ClusterStateBundle& currentState,
+ const ClusterStateBundle& newState,
const lib::Node node)
{
BucketSpaceToTransitionString result;
- const lib::State &currentBaseline = currentState.getBaselineClusterState()->getNodeState(node).getState();
- const lib::State &newBaseline = newState.getBaselineClusterState()->getNodeState(node).getState();
- for (const auto &entry : currentState.getDerivedClusterStates()) {
- const lib::State &currentDerived = entry.second->getNodeState(node).getState();
- const lib::State &newDerived = newState.getDerivedClusterState(entry.first)->getNodeState(node).getState();
+ const lib::State& currentBaseline = currentState.getBaselineClusterState()->getNodeState(node).getState();
+ const lib::State& newBaseline = newState.getBaselineClusterState()->getNodeState(node).getState();
+ for (const auto& entry : currentState.getDerivedClusterStates()) {
+ const lib::State& currentDerived = entry.second->getNodeState(node).getState();
+ const lib::State& newDerived = newState.getDerivedClusterState(entry.first)->getNodeState(node).getState();
considerInsertDerivedTransition(currentBaseline, newBaseline, currentDerived, newDerived, entry.first, result);
}
- for (const auto &entry : newState.getDerivedClusterStates()) {
- const lib::State &newDerived = entry.second->getNodeState(node).getState();
- const lib::State &currentDerived = currentState.getDerivedClusterState(entry.first)->getNodeState(node).getState();
+ for (const auto& entry : newState.getDerivedClusterStates()) {
+ const lib::State& newDerived = entry.second->getNodeState(node).getState();
+ const lib::State& currentDerived = currentState.getDerivedClusterState(entry.first)->getNodeState(node).getState();
considerInsertDerivedTransition(currentBaseline, newBaseline, currentDerived, newDerived, entry.first, result);
}
return result;
}
vespalib::string
-transitionsToString(const BucketSpaceToTransitionString &transitions)
+transitionsToString(const BucketSpaceToTransitionString& transitions)
{
if (transitions.empty()) {
return "";
@@ -362,7 +396,7 @@ transitionsToString(const BucketSpaceToTransitionString &transitions)
vespalib::asciistream stream;
stream << "[";
bool first = true;
- for (const auto &entry : transitions) {
+ for (const auto& entry : transitions) {
if (!first) {
stream << ", ";
}
@@ -435,7 +469,7 @@ StateManager::onGetNodeState(const api::GetNodeStateCommand::SP& cmd)
}
void
-StateManager::mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex> &, uint16_t controller_index) {
+StateManager::mark_controller_as_having_observed_explicit_node_state(const std::unique_lock<std::mutex>&, uint16_t controller_index) {
_controllers_observed_explicit_node_state.emplace(controller_index);
}
@@ -483,7 +517,16 @@ StateManager::try_set_cluster_state_bundle(std::shared_ptr<const ClusterStateBun
}
}
_last_accepted_cluster_state_time = now;
- _nextSystemState = std::move(c);
+ _receiving_distribution_config_from_cc = c->has_distribution_config();
+ if (!c->has_distribution_config()) {
+ LOG(debug, "Next state bundle '%s' does not have distribution config; patching in existing config '%s'",
+ c->toString().c_str(), _configured_distribution->getNodeGraph().getDistributionConfigHash().c_str());
+ _nextSystemState = c->clone_with_new_distribution(lib::DistributionConfigBundle::of(_configured_distribution));
+ } else {
+ LOG(debug, "Next state bundle is '%s'", c->toString().c_str());
+ // TODO print what's changed in distribution config?
+ _nextSystemState = std::move(c);
+ }
}
notifyStateListeners();
return std::nullopt;
diff --git a/storage/src/vespa/storage/storageserver/statemanager.h b/storage/src/vespa/storage/storageserver/statemanager.h
index d116f968731..d7a139f7ced 100644
--- a/storage/src/vespa/storage/storageserver/statemanager.h
+++ b/storage/src/vespa/storage/storageserver/statemanager.h
@@ -55,6 +55,7 @@ class StateManager : public NodeStateUpdater,
std::mutex _listenerLock;
std::shared_ptr<lib::NodeState> _nodeState;
std::shared_ptr<lib::NodeState> _nextNodeState;
+ std::shared_ptr<const lib::Distribution> _configured_distribution; // From config system, not from CC
std::shared_ptr<const ClusterStateBundle> _systemState;
std::shared_ptr<const ClusterStateBundle> _nextSystemState;
uint32_t _reported_host_info_cluster_state_version;
@@ -78,6 +79,7 @@ class StateManager : public NodeStateUpdater,
bool _noThreadTestMode;
bool _grabbedExternalLock;
bool _require_strictly_increasing_cluster_state_versions;
+ bool _receiving_distribution_config_from_cc;
std::atomic<bool> _notifyingListeners;
std::atomic<bool> _requested_almost_immediate_node_state_replies;
@@ -102,6 +104,8 @@ public:
lib::NodeState::CSP getCurrentNodeState() const override;
std::shared_ptr<const ClusterStateBundle> getClusterStateBundle() const override;
+ void storageDistributionChanged() override;
+
void addStateListener(StateListener&) override;
void removeStateListener(StateListener&) override;