diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-10-24 22:12:02 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-24 22:12:02 +0200 |
commit | 3858da05eaf7eb58b5536a59a01dd4154d174424 (patch) | |
tree | 17fc31e28da6460dd562ae44f4c78b484f11ef9f | |
parent | bc3b03f9189b2b575f1c3b1bfe0c2a79bab555ce (diff) | |
parent | 7e2bb2cbcaae5bf5eadb2ee765bb7280a458f9e7 (diff) |
Merge pull request #29086 from vespa-engine/vekterli/rewire-more-service-layer-component-configs
Rewire more service layer component configs
31 files changed, 354 insertions, 251 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp index f46ff867fcc..91e901c7254 100644 --- a/storage/src/tests/bucketdb/bucketmanagertest.cpp +++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp @@ -1,29 +1,29 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> #include <vespa/config/helper/configgetter.hpp> -#include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/config/config-documenttypes.h> +#include <vespa/document/config/documenttypes_config_fwd.h> #include <vespa/document/datatype/documenttype.h> #include <vespa/document/fieldvalue/document.h> -#include <vespa/document/update/documentupdate.h> #include <vespa/document/repo/documenttyperepo.h> -#include <vespa/document/test/make_document_bucket.h> #include <vespa/document/test/make_bucket_space.h> +#include <vespa/document/test/make_document_bucket.h> +#include <vespa/document/update/documentupdate.h> +#include <vespa/metrics/updatehook.h> #include <vespa/storage/bucketdb/bucketmanager.h> #include <vespa/storage/common/global_bucket_space_distribution_converter.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storageapi/message/bucketsplitting.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> -#include <vespa/storageapi/message/bucketsplitting.h> -#include <vespa/metrics/updatehook.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/dummystoragelink.h> -#include <tests/common/testhelper.h> -#include <vespa/vdslib/state/random.h> #include <vespa/vdslib/distribution/distribution.h> #include <vespa/vdslib/state/clusterstate.h> -#include <vespa/vespalib/stllike/asciistream.h> +#include <vespa/vdslib/state/random.h> #include <vespa/vespalib/gtest/gtest.h> +#include <vespa/vespalib/stllike/asciistream.h> #include <future> #include <vespa/log/log.h> @@ -148,7 +148,9 @@ void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, bool noD _node->setTypeRepo(repo); _node->setupDummyPersistence(); // Set up the 3 links - auto manager = std::make_unique<BucketManager>(config::ConfigUri(config.getConfigId()), _node->getComponentRegister()); + auto config_uri = config::ConfigUri(config.getConfigId()); + using vespa::config::content::core::StorServerConfig; + auto manager = std::make_unique<BucketManager>(*config_from<StorServerConfig>(config_uri), _node->getComponentRegister()); _manager = manager.get(); _top->push_back(std::move(manager)); if (fakePersistenceLayer) { @@ -156,7 +158,8 @@ void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, bool noD _bottom = bottom.get(); _top->push_back(std::move(bottom)); } else { - auto bottom = std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), + using vespa::config::content::StorFilestorConfig; + auto bottom = std::make_unique<FileStorManager>(*config_from<StorFilestorConfig>(config_uri), _node->getPersistenceProvider(), _node->getComponentRegister(), *_node, _node->get_host_info()); _top->push_back(std::move(bottom)); diff --git a/storage/src/tests/common/testhelper.h b/storage/src/tests/common/testhelper.h index bfc5c7679e1..1f83e938409 100644 --- a/storage/src/tests/common/testhelper.h +++ b/storage/src/tests/common/testhelper.h @@ -1,5 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <vespa/config/helper/configgetter.h> #include <vespa/messagebus/testlib/slobrok.h> #include <vespa/vdstestlib/config/dirconfig.h> #include <fstream> @@ -21,6 +22,11 @@ std::string getRootFolder(vdstestlib::DirConfig & dc); void addSlobrokConfig(vdstestlib::DirConfig& dc, const mbus::Slobrok& slobrok); +template <typename ConfigT> +std::unique_ptr<ConfigT> config_from(const ::config::ConfigUri& cfg_uri) { + return ::config::ConfigGetter<ConfigT>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); +} + // Class used to print start and end of test. Enable debug when you want to see // which test creates what output or where we get stuck struct TestName { diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index c989acd5228..74d34fb0f50 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -1,15 +1,17 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/persistence/messages.h> -#include <vespa/storage/persistence/filestorage/filestormanager.h> -#include <vespa/storageapi/message/bucket.h> -#include <vespa/persistence/dummyimpl/dummypersistence.h> +#include <tests/common/testhelper.h> #include <tests/persistence/common/filestortestfixture.h> -#include <vespa/document/repo/documenttyperepo.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldset/fieldsets.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> -#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/persistence/spi/test.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/persistence/messages.h> +#include <vespa/storageapi/message/bucket.h> +#include <vespa/vdslib/state/clusterstate.h> #include <sstream> using storage::spi::test::makeSpiBucket; @@ -73,7 +75,9 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents( manager(nullptr) { injector.inject(top); - auto fsm = std::make_unique<FileStorManager>(config::ConfigUri(fixture._config->getConfigId()), fixture._node->getPersistenceProvider(), + using vespa::config::content::StorFilestorConfig; + auto config = config_from<StorFilestorConfig>(config::ConfigUri(fixture._config->getConfigId())); + auto fsm = std::make_unique<FileStorManager>(*config, fixture._node->getPersistenceProvider(), fixture._node->getComponentRegister(), *fixture._node, fixture._node->get_host_info()); manager = fsm.get(); top.push_back(std::move(fsm)); diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index b09febce408..586863251c9 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -5,6 +5,7 @@ #include <tests/common/teststorageapp.h> #include <tests/persistence/filestorage/forwardingmessagesender.h> #include <vespa/config/common/exceptions.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/select/parser.h> @@ -223,8 +224,10 @@ struct TestFileStorComponents { explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false) : manager(nullptr) { - auto fsm = std::make_unique<FileStorManager>(config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId()), - test._node->getPersistenceProvider(), + using vespa::config::content::StorFilestorConfig; + auto config_uri = config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId()); + auto config = config_from<StorFilestorConfig>(config_uri); + auto fsm = std::make_unique<FileStorManager>(*config, test._node->getPersistenceProvider(), test._node->getComponentRegister(), *test._node, test._node->get_host_info()); manager = fsm.get(); top.push_back(std::move(fsm)); diff --git a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp index 966382de39b..710da80972f 100644 --- a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp @@ -1,11 +1,12 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storageapi/message/bucket.h> -#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> -#include <vespa/persistence/spi/test.h> +#include <tests/persistence/common/filestortestfixture.h> #include <tests/persistence/common/persistenceproviderwrapper.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/persistence/dummyimpl/dummypersistence.h> -#include <tests/persistence/common/filestortestfixture.h> +#include <vespa/persistence/spi/test.h> +#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> +#include <vespa/storageapi/message/bucket.h> using storage::spi::test::makeSpiBucket; using namespace ::testing; @@ -36,10 +37,10 @@ struct BucketCheckerInjector : FileStorTestFixture::StorageLinkInjector _fixture(fixture) {} void inject(DummyStorageLink& link) const override { + using vespa::config::content::core::StorServerConfig; + auto cfg = config_from<StorServerConfig>(config::ConfigUri(_fixture._config->getConfigId())); link.push_back(std::make_unique<ModifiedBucketChecker>( - _node.getComponentRegister(), - _node.getPersistenceProvider(), - config::ConfigUri(_fixture._config->getConfigId()))); + _node.getComponentRegister(), _node.getPersistenceProvider(), *cfg)); } }; diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp index 8acaa9a78d3..f96ff9c012e 100644 --- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp +++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp @@ -1,11 +1,12 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> +#include <vespa/config/common/exceptions.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> -#include <vespa/config/common/exceptions.h> #include <vespa/vespalib/gtest/gtest.h> using namespace ::testing; @@ -45,9 +46,11 @@ ModifiedBucketCheckerTest::SetUp() _node->setupDummyPersistence(); _top.reset(new DummyStorageLink); + using vespa::config::content::core::StorServerConfig; + auto bootstrap_cfg = config_from<StorServerConfig>(config::ConfigUri(_config->getConfigId())); _handler = new ModifiedBucketChecker(_node->getComponentRegister(), _node->getPersistenceProvider(), - config::ConfigUri(_config->getConfigId())); + *bootstrap_cfg); _top->push_back(std::unique_ptr<StorageLink>(_handler)); _bottom = new DummyStorageLink; _handler->push_back(std::unique_ptr<StorageLink>(_bottom)); @@ -136,7 +139,7 @@ TEST_F(ModifiedBucketCheckerTest, recheck_requests_are_chunked) { _top->open(); cfgns::StorServerConfigBuilder cfgBuilder; cfgBuilder.bucketRecheckingChunkSize = 2; - _handler->configure(std::make_unique<cfgns::StorServerConfig>(cfgBuilder)); + _handler->on_configure(*std::make_unique<cfgns::StorServerConfig>(cfgBuilder)); modifyBuckets(5, 0); _handler->tick(); @@ -172,7 +175,7 @@ TEST_F(ModifiedBucketCheckerTest, invalid_chunk_size_config_is_rejected) { _top->open(); cfgns::StorServerConfigBuilder cfgBuilder; cfgBuilder.bucketRecheckingChunkSize = 0; - EXPECT_THROW(_handler->configure(std::make_unique<cfgns::StorServerConfig>(cfgBuilder)), + EXPECT_THROW(_handler->on_configure(*std::make_unique<cfgns::StorServerConfig>(cfgBuilder)), config::InvalidConfigException); } diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp index 639b3231028..50977b5ec8b 100644 --- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp +++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp @@ -3,6 +3,7 @@ #include <tests/common/teststorageapp.h> #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/base/testdocman.h> #include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storage/persistence/messages.h> @@ -124,11 +125,12 @@ ChangedBucketOwnershipHandlerTest::insertBuckets(uint32_t numBuckets, void ChangedBucketOwnershipHandlerTest::SetUp() { + using vespa::config::content::PersistenceConfig; vdstestlib::DirConfig config(getStandardConfig(true)); _app.reset(new TestServiceLayerApp); _top.reset(new DummyStorageLink); - _handler = new ChangedBucketOwnershipHandler(config::ConfigUri(config.getConfigId()), + _handler = new ChangedBucketOwnershipHandler(*config_from<PersistenceConfig>(config::ConfigUri(config.getConfigId())), _app->getComponentRegister()); _top->push_back(std::unique_ptr<StorageLink>(_handler)); _bottom = new DummyStorageLink; @@ -139,7 +141,7 @@ ChangedBucketOwnershipHandlerTest::SetUp() auto pconfig = std::make_unique<vespa::config::content::PersistenceConfigBuilder>(); pconfig->abortOutdatedMutatingIdealStateOps = true; pconfig->abortOutdatedMutatingExternalLoadOps = true; - _handler->configure(std::move(pconfig)); + _handler->on_configure(*pconfig); } namespace { @@ -466,7 +468,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, abort_outdated_remove_location) { TEST_F(ChangedBucketOwnershipHandlerTest, ideal_state_aborts_are_configurable) { auto config = std::make_unique<vespa::config::content::PersistenceConfigBuilder>(); config->abortOutdatedMutatingIdealStateOps = false; - _handler->configure(std::move(config)); + _handler->on_configure(*config); // Should not abort operation, even when ownership has changed. expectChangeAbortsMessage<api::CreateBucketCommand>(false, getBucketToAbort()); } @@ -508,7 +510,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, external_load_op_abort_updates_metric) TEST_F(ChangedBucketOwnershipHandlerTest, external_load_op_aborts_are_configurable) { auto config = std::make_unique<vespa::config::content::PersistenceConfigBuilder>(); config->abortOutdatedMutatingExternalLoadOps = false; - _handler->configure(std::move(config)); + _handler->on_configure(*config); // Should not abort operation, even when ownership has changed. document::DocumentId docId("id:foo:testdoctype1::bar"); expectChangeAbortsMessage<api::RemoveCommand>( diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp index 70b8c40722c..04322562d08 100644 --- a/storage/src/tests/storageserver/communicationmanagertest.cpp +++ b/storage/src/tests/storageserver/communicationmanagertest.cpp @@ -29,14 +29,6 @@ vespalib::string _storage("storage"); using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig; -namespace { - -std::unique_ptr<CommunicationManagerConfig> config_from(const config::ConfigUri& cfg_uri) { - return config::ConfigGetter<CommunicationManagerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext()); -} - -} - struct CommunicationManagerTest : Test { static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60; @@ -88,8 +80,10 @@ TEST_F(CommunicationManagerTest, simple) { auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId()); auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId()); - CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, *config_from(dist_cfg_uri)); - CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri, *config_from(stor_cfg_uri)); + CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, + *config_from<CommunicationManagerConfig>(dist_cfg_uri)); + CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri, + *config_from<CommunicationManagerConfig>(stor_cfg_uri)); auto* distributorLink = new DummyStorageLink(); auto* storageLink = new DummyStorageLink(); distributor.push_back(std::unique_ptr<StorageLink>(distributorLink)); @@ -146,7 +140,8 @@ CommunicationManagerTest::doTestConfigPropagation(bool isContentNode) } auto cfg_uri = config::ConfigUri(config.getConfigId()); - CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, + *config_from<CommunicationManagerConfig>(cfg_uri)); auto* storageLink = new DummyStorageLink(); commMgr.push_back(std::unique_ptr<StorageLink>(storageLink)); commMgr.open(); @@ -191,7 +186,8 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) { TestServiceLayerApp storNode(storConfig.getConfigId()); auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); - CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, + *config_from<CommunicationManagerConfig>(cfg_uri)); auto* storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); storage.open(); @@ -224,7 +220,8 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) { TestServiceLayerApp storNode(storConfig.getConfigId()); auto cfg_uri = config::ConfigUri(storConfig.getConfigId()); - CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, + *config_from<CommunicationManagerConfig>(cfg_uri)); auto* storageLink = new DummyStorageLink(); storage.push_back(std::unique_ptr<StorageLink>(storageLink)); storage.open(); @@ -265,7 +262,8 @@ struct CommunicationManagerFixture { node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId()); auto cfg_uri = config::ConfigUri(stor_config.getConfigId()); - comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri)); + comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri, + *config_from<CommunicationManagerConfig>(cfg_uri)); bottom_link = new DummyStorageLink(); comm_mgr->push_back(std::unique_ptr<StorageLink>(bottom_link)); comm_mgr->open(); diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp index 8e87e07eeff..7a7f2551c2d 100644 --- a/storage/src/tests/storageserver/mergethrottlertest.cpp +++ b/storage/src/tests/storageserver/mergethrottlertest.cpp @@ -2,6 +2,7 @@ #include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> #include <tests/common/teststorageapp.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/test/make_document_bucket.h> #include <vespa/messagebus/dynamicthrottlepolicy.h> #include <vespa/storage/storageserver/mergethrottler.h> @@ -28,7 +29,9 @@ namespace storage { namespace { -vespalib::string _Storage("storage"); +using StorServerConfig = vespa::config::content::core::StorServerConfig; + +vespalib::string _storage("storage"); struct MergeBuilder { document::BucketId _bucket; @@ -108,7 +111,7 @@ struct MergeBuilder { auto cmd = std::make_shared<MergeBucketCommand>( makeDocumentBucket(_bucket), n, _maxTimestamp, _clusterStateVersion, _chain); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, _nodes[0])); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0])); return cmd; } }; @@ -167,7 +170,9 @@ MergeThrottlerTest::~MergeThrottlerTest() = default; void MergeThrottlerTest::SetUp() { - vdstestlib::DirConfig config(getStandardConfig(true)); + vdstestlib::DirConfig dir_config(getStandardConfig(true)); + auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId()); + auto config = config_from<StorServerConfig>(cfg_uri); for (int i = 0; i < _storageNodeCount; ++i) { auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i)); @@ -175,7 +180,7 @@ MergeThrottlerTest::SetUp() std::unique_ptr<DummyStorageLink> top; top = std::make_unique<DummyStorageLink>(); - MergeThrottler* throttler = new MergeThrottler(::config::ConfigUri(config.getConfigId()), server->getComponentRegister()); + MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister()); // MergeThrottler will be sandwiched in between two dummy links top->push_back(std::unique_ptr<StorageLink>(throttler)); DummyStorageLink* bottom = new DummyStorageLink; @@ -283,7 +288,7 @@ TEST_F(MergeThrottlerTest, chain) { auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123); cmd->setPriority(7); cmd->setTimeout(54321ms); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); const uint16_t distributorIndex = 123; cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded @@ -423,7 +428,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) { std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0)); _topLinks[0]->sendDown(cmd); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -468,7 +473,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) { auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[1]->sendDown(cmd); _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -503,7 +508,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); // Send to node 1, which is not the lowest index - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[1]->sendDown(cmd); _bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -550,7 +555,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) { auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); // Send to last node, which is not the lowest index - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 3)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 3)); _topLinks[2]->sendDown(cmd); _bottomLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -595,7 +600,7 @@ TEST_F(MergeThrottlerTest, resend_handling) { std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[0]->sendDown(cmd); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime); @@ -962,7 +967,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) { makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 9)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 9)); _topLinks[0]->sendDown(cmd); // First, test that we get rejected when processing merge immediately @@ -1145,7 +1150,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) { std::vector<uint16_t> chain({0}); auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain); - cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1)); + cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1)); _topLinks[0]->sendDown(cmd); _topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime); diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp index 84a98385962..edb13eea5af 100644 --- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp +++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp @@ -1,11 +1,12 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <vespa/storage/storageserver/service_layer_error_listener.h> +#include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/storage/storageserver/mergethrottler.h> +#include <vespa/storage/storageserver/service_layer_error_listener.h> #include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h> #include <vespa/vdstestlib/config/dirconfig.h> -#include <tests/common/testhelper.h> -#include <tests/common/teststorageapp.h> #include <vespa/vespalib/gtest/gtest.h> using namespace ::testing; @@ -34,10 +35,12 @@ private: }; struct Fixture { + using StorServerConfig = vespa::config::content::core::StorServerConfig; + vdstestlib::DirConfig config{getStandardConfig(true)}; TestServiceLayerApp app; ServiceLayerComponent component{app.getComponentRegister(), "dummy"}; - MergeThrottler merge_throttler{config::ConfigUri(config.getConfigId()), app.getComponentRegister()}; + MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()}; TestShutdownListener shutdown_listener; ServiceLayerErrorListener error_listener{component, merge_throttler}; diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index 2b7039f36ea..4fb857f9e67 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -1,5 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/storageapi/message/datagram.h> @@ -88,14 +89,18 @@ VisitorManagerTest::initializeTest(bool defer_manager_thread_start) _node->setupDummyPersistence(); _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1")); _top = std::make_unique<DummyStorageLink>(); - auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()), + using vespa::config::content::core::StorVisitorConfig; + auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId())); + auto vm = std::make_unique<VisitorManager>(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory, VisitorFactory::Map(), defer_manager_thread_start); _manager = vm.get(); _top->push_back(std::move(vm)); - _top->push_back(std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), _node->getPersistenceProvider(), + using vespa::config::content::StorFilestorConfig; + auto filestor_cfg = config_from<StorFilestorConfig>(config::ConfigUri(config.getConfigId())); + _top->push_back(std::make_unique<FileStorManager>(*filestor_cfg, _node->getPersistenceProvider(), _node->getComponentRegister(), *_node, _node->get_host_info())); _manager->setTimeBetweenTicks(10); _top->open(); diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp index 49f1bc778fc..f83b6c99d64 100644 --- a/storage/src/tests/visiting/visitortest.cpp +++ b/storage/src/tests/visiting/visitortest.cpp @@ -1,6 +1,7 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include <vespa/config/common/exceptions.h> +#include <vespa/config/helper/configgetter.hpp> #include <vespa/document/fieldvalue/intfieldvalue.h> #include <vespa/document/fieldvalue/stringfieldvalue.h> #include <vespa/document/datatype/documenttype.h> @@ -168,9 +169,10 @@ VisitorTest::initializeTest(const TestParams& params) } _node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); _top = std::make_unique<DummyStorageLink>(); + using vespa::config::content::core::StorVisitorConfig; + auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId())); _top->push_back(std::unique_ptr<StorageLink>(_manager - = new VisitorManager(config::ConfigUri(config.getConfigId()), - _node->getComponentRegister(), *_messageSessionFactory))); + = new VisitorManager(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory))); _bottom = new DummyStorageLink(); _top->push_back(std::unique_ptr<StorageLink>(_bottom)); _manager->setTimeBetweenTicks(10); diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp index 8c24a7dfd4e..d12a9f72ac1 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp @@ -33,10 +33,9 @@ using namespace std::chrono_literals; namespace storage { -BucketManager::BucketManager(const config::ConfigUri & configUri, ServiceLayerComponentRegister& compReg) +BucketManager::BucketManager(const StorServerConfig& bootstrap_config, ServiceLayerComponentRegister& compReg) : StorageLink("Bucket manager"), framework::StatusReporter("bucketdb", "Bucket database"), - _configUri(configUri), _workerLock(), _workerCond(), _clusterStateLock(), @@ -60,8 +59,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, ServiceLayerCo ns.setMinUsedBits(58); _component.getStateUpdater().setReportedNodeState(ns); - auto server_config = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(configUri.getConfigId(), configUri.getContext()); - _simulated_processing_delay = std::chrono::milliseconds(std::max(0, server_config->simulatedBucketRequestLatencyMsec)); + _simulated_processing_delay = std::chrono::milliseconds(std::max(0, bootstrap_config.simulatedBucketRequestLatencyMsec)); } BucketManager::~BucketManager() @@ -414,9 +412,7 @@ BucketManager::dump(std::ostream& out) const void BucketManager::onOpen() { - if (!_configUri.empty()) { - startWorkerThread(); - } + startWorkerThread(); } void BucketManager::startWorkerThread() diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h index 76d9123a519..35ccab843a9 100644 --- a/storage/src/vespa/storage/bucketdb/bucketmanager.h +++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h @@ -10,10 +10,10 @@ #include "bucketmanagermetrics.h" #include "storbucketdb.h" #include <vespa/config/subscription/configuri.h> -#include <vespa/storage/bucketdb/config-stor-bucketdb.h> #include <vespa/storage/common/servicelayercomponent.h> #include <vespa/storage/common/storagelinkqueued.h> #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/config/config-stor-server.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageframework/generic/metric/metricupdatehook.h> #include <vespa/storageframework/generic/status/statusreporter.h> @@ -34,6 +34,7 @@ class BucketManager : public StorageLink, private framework::MetricUpdateHook { public: + using StorServerConfig = vespa::config::content::core::StorServerConfig; /** Type used for message queues */ using BucketInfoRequestList = std::list<std::shared_ptr<api::RequestBucketInfoCommand>>; using BucketInfoRequestMap = std::unordered_map<document::BucketSpace, BucketInfoRequestList, document::BucketSpace::hash>; @@ -41,7 +42,6 @@ public: private: using ReplyQueue = std::vector<api::StorageReply::SP>; using ConflictingBuckets = std::unordered_set<document::BucketId, document::BucketId::hash>; - config::ConfigUri _configUri; BucketInfoRequestMap _bucketInfoRequests; /** @@ -82,7 +82,7 @@ private: }; public: - BucketManager(const config::ConfigUri&, ServiceLayerComponentRegister&); + BucketManager(const StorServerConfig& bootstrap_config, ServiceLayerComponentRegister&); BucketManager(const BucketManager&) = delete; BucketManager& operator=(const BucketManager&) = delete; ~BucketManager() override; diff --git a/storage/src/vespa/storage/common/visitorfactory.h b/storage/src/vespa/storage/common/visitorfactory.h index 393744c4f59..1f18ace3095 100644 --- a/storage/src/vespa/storage/common/visitorfactory.h +++ b/storage/src/vespa/storage/common/visitorfactory.h @@ -12,6 +12,7 @@ namespace storage { +class StorageComponent; class Visitor; class VisitorEnvironment { diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 97be1a510c4..6c49fcd0878 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -62,7 +62,7 @@ private: } FileStorManager:: -FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& provider, +FileStorManager(const StorFilestorConfig& bootstrap_config, spi::PersistenceProvider& provider, ServiceLayerComponentRegister& compReg, DoneInitializeHandler& init_handler, HostInfo& hostInfoReporterRegistrar) : StorageLinkQueued("File store manager", compReg), @@ -75,7 +75,6 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p _persistenceHandlers(), _threads(), _bucketOwnershipNotifier(std::make_unique<BucketOwnershipNotifier>(_component, *this)), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _use_async_message_handling_on_schedule(false), _metrics(std::make_unique<FileStorMetrics>()), _filestorHandler(), @@ -85,8 +84,7 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p _host_info_reporter(_component.getStateUpdater()), _resource_usage_listener_registration(provider.register_resource_usage_listener(_host_info_reporter)) { - _configFetcher->subscribe(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerMetric(*_metrics); _component.registerStatusPage(*this); _component.getStateUpdater().addStateListener(*this); @@ -207,19 +205,19 @@ FileStorManager::getThreadLocalHandler() { } void -FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) +FileStorManager::on_configure(const StorFilestorConfig& config) { // If true, this is not the first configure. const bool liveUpdate = ! _threads.empty(); - _use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule; - _host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel); - const bool use_dynamic_throttling = ((config->asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) || - (config->asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC)); - const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps; + _use_async_message_handling_on_schedule = config.useAsyncMessageHandlingOnSchedule; + _host_info_reporter.set_noise_level(config.resourceUsageReporterNoiseLevel); + const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) || + (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC)); + const bool throttle_merge_feed_ops = config.asyncOperationThrottler.throttleIndividualMergeFeedOps; if (!liveUpdate) { - _config = std::move(config); + _config = std::make_unique<StorFilestorConfig>(config); uint32_t numThreads = std::max(1, _config->numThreads); uint32_t numStripes = std::max(1u, numThreads / 2); _metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config)); @@ -240,7 +238,7 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config) _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); } else { assert(_filestorHandler); - auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size()); + auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size()); _filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params); } // TODO remove once desired dynamic throttling behavior is set in stone @@ -828,8 +826,6 @@ void FileStorManager::onClose() LOG(debug, "Start closing"); _bucketExecutorRegistration.reset(); _resource_usage_listener_registration.reset(); - // Avoid getting config during shutdown - _configFetcher->close(); LOG(debug, "Closed _configFetcher."); _filestorHandler->close(); LOG(debug, "Closed _filestorHandler."); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index 68491ab1e38..96cff8dfeee 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -1,10 +1,4 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * @class storage::FileStorManager - * @ingroup filestorage - * - * @version $Id$ - */ #pragma once @@ -42,7 +36,6 @@ namespace spi { struct PersistenceProvider; } class ContentBucketSpace; struct FileStorManagerTest; -class ReadBucketList; class BucketOwnershipNotifier; class AbortBucketOperationsCommand; struct DoneInitializeHandler; @@ -54,10 +47,11 @@ class ProviderErrorWrapper; class FileStorManager : public StorageLinkQueued, public framework::HtmlStatusReporter, public StateListener, - private config::IFetcherCallback<vespa::config::content::StorFilestorConfig>, public MessageSender, public spi::BucketExecutor { + using StorFilestorConfig = vespa::config::content::StorFilestorConfig; + ServiceLayerComponentRegister & _compReg; ServiceLayerComponent _component; std::unique_ptr<spi::PersistenceProvider> _provider; @@ -69,7 +63,6 @@ class FileStorManager : public StorageLinkQueued, std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier; std::unique_ptr<vespa::config::content::StorFilestorConfig> _config; - std::unique_ptr<config::ConfigFetcher> _configFetcher; bool _use_async_message_handling_on_schedule; std::shared_ptr<FileStorMetrics> _metrics; std::unique_ptr<FileStorHandler> _filestorHandler; @@ -82,7 +75,7 @@ class FileStorManager : public StorageLinkQueued, std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration; public: - FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&, + FileStorManager(const StorFilestorConfig&, spi::PersistenceProvider&, ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&); FileStorManager(const FileStorManager &) = delete; FileStorManager& operator=(const FileStorManager &) = delete; @@ -118,8 +111,9 @@ public: const FileStorMetrics& get_metrics() const { return *_metrics; } + void on_configure(const vespa::config::content::StorFilestorConfig& config); + private: - void configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) override; PersistenceHandler & createRegisteredHandler(const ServiceLayerComponent & component); VESPA_DLL_LOCAL PersistenceHandler & getThreadLocalHandler(); diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp index e1c6465b332..9558429bf13 100644 --- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp @@ -46,12 +46,11 @@ ModifiedBucketChecker::BucketIdListResult::reset(document::BucketSpace bucketSpa ModifiedBucketChecker::ModifiedBucketChecker( ServiceLayerComponentRegister& compReg, spi::PersistenceProvider& provider, - const config::ConfigUri& configUri) + const StorServerConfig& bootstrap_config) : StorageLink("Modified bucket checker"), _provider(provider), _component(), _thread(), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _monitor(), _stateLock(), _bucketSpaces(), @@ -60,8 +59,7 @@ ModifiedBucketChecker::ModifiedBucketChecker( _maxPendingChunkSize(100), _singleThreadMode(false) { - _configFetcher->subscribe<vespa::config::content::core::StorServerConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); std::ostringstream threadName; threadName << "Modified bucket checker " << static_cast<void*>(this); @@ -75,15 +73,14 @@ ModifiedBucketChecker::~ModifiedBucketChecker() } void -ModifiedBucketChecker::configure( - std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) +ModifiedBucketChecker::on_configure(const vespa::config::content::core::StorServerConfig& newConfig) { std::lock_guard lock(_stateLock); - if (newConfig->bucketRecheckingChunkSize < 1) { + if (newConfig.bucketRecheckingChunkSize < 1) { throw config::InvalidConfigException( "Cannot have bucket rechecking chunk size of less than 1"); } - _maxPendingChunkSize = newConfig->bucketRecheckingChunkSize; + _maxPendingChunkSize = newConfig.bucketRecheckingChunkSize; } diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h index 18f03da7469..9f0111b32f9 100644 --- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h +++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h @@ -23,19 +23,18 @@ namespace spi { struct PersistenceProvider; } class ModifiedBucketChecker : public StorageLink, public framework::Runnable, - public Types, - private config::IFetcherCallback< - vespa::config::content::core::StorServerConfig> + public Types { public: + using StorServerConfig = vespa::config::content::core::StorServerConfig; using UP = std::unique_ptr<ModifiedBucketChecker>; ModifiedBucketChecker(ServiceLayerComponentRegister& compReg, spi::PersistenceProvider& provide, - const config::ConfigUri& configUri); + const StorServerConfig& bootstrap_config); ~ModifiedBucketChecker() override; - void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig>) override; + void on_configure(const vespa::config::content::core::StorServerConfig&); void run(framework::ThreadHandle& thread) override; bool tick(); @@ -88,7 +87,6 @@ private: spi::PersistenceProvider & _provider; ServiceLayerComponent::UP _component; std::unique_ptr<framework::Thread> _thread; - std::unique_ptr<config::ConfigFetcher> _configFetcher; std::mutex _monitor; std::condition_variable _cond; std::mutex _stateLock; diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp index 34040bb12c0..25829f3d391 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp @@ -22,12 +22,11 @@ LOG_SETUP(".bucketownershiphandler"); namespace storage { ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( - const config::ConfigUri& configUri, + const PersistenceConfig& bootstrap_config, ServiceLayerComponentRegister& compReg) : StorageLink("Changed bucket ownership handler"), _component(compReg, "changedbucketownershiphandler"), _metrics(), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _state_sync_executor(1), // single thread for sequential task execution _stateLock(), _currentState(), // Not set yet, so ownership will not be valid @@ -37,25 +36,23 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler( _abortMutatingIdealStateOps(false), _abortMutatingExternalLoadOps(false) { - _configFetcher->subscribe<vespa::config::content::PersistenceConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerMetric(_metrics); } ChangedBucketOwnershipHandler::~ChangedBucketOwnershipHandler() = default; void -ChangedBucketOwnershipHandler::configure( - std::unique_ptr<vespa::config::content::PersistenceConfig> config) +ChangedBucketOwnershipHandler::on_configure(const vespa::config::content::PersistenceConfig& config) { _abortQueuedAndPendingOnStateChange.store( - config->abortOperationsWithChangedBucketOwnership, + config.abortOperationsWithChangedBucketOwnership, std::memory_order_relaxed); _abortMutatingIdealStateOps.store( - config->abortOutdatedMutatingIdealStateOps, + config.abortOutdatedMutatingIdealStateOps, std::memory_order_relaxed); _abortMutatingExternalLoadOps.store( - config->abortOutdatedMutatingExternalLoadOps, + config.abortOutdatedMutatingExternalLoadOps, std::memory_order_relaxed); } diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h index 3e20eb507f6..801534385f7 100644 --- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h +++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h @@ -56,10 +56,7 @@ namespace lib { * - RemoveCommand * - RevertCommand */ -class ChangedBucketOwnershipHandler - : public StorageLink, - private config::IFetcherCallback<vespa::config::content::PersistenceConfig> -{ +class ChangedBucketOwnershipHandler : public StorageLink { public: class Metrics : public metrics::MetricSet { public: @@ -115,12 +112,11 @@ public: private: class ClusterStateSyncAndApplyTask; - using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>; + using PersistenceConfig = vespa::config::content::PersistenceConfig; using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>; ServiceLayerComponent _component; Metrics _metrics; - ConfigFetcherUP _configFetcher; vespalib::ThreadStackExecutor _state_sync_executor; mutable std::mutex _stateLock; ClusterStateBundleCSP _currentState; @@ -185,7 +181,7 @@ private: bool enabledExternalLoadAborting() const; public: - ChangedBucketOwnershipHandler(const config::ConfigUri& configUri, + ChangedBucketOwnershipHandler(const PersistenceConfig& bootstrap_config, ServiceLayerComponentRegister& compReg); ~ChangedBucketOwnershipHandler() override; @@ -194,7 +190,7 @@ public: bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override; void onClose() override; - void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override; + void on_configure(const PersistenceConfig&); /** * We want to ensure distribution config changes are thread safe wrt. our diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index ef7579f8085..10ee8023621 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -84,8 +84,6 @@ void DistributorNode::createChain(IStorageChainBuilder &builder) { DistributorComponentRegister& dcr(_context.getComponentRegister()); - // TODO: All components in this chain should use a common thread instead of - // each having its own configfetcher. StorageLink::UP chain; if (_retrievedCommunicationManager) { builder.add(std::move(_retrievedCommunicationManager)); diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp index 6e2f2a77d20..4cc2a7a89ab 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp +++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp @@ -179,7 +179,7 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept } MergeThrottler::MergeThrottler( - const config::ConfigUri & configUri, + const StorServerConfig& bootstrap_config, StorageComponentRegister& compReg) : StorageLink("Merge Throttler"), framework::HtmlStatusReporter("merges", "Merge Throttler"), @@ -190,7 +190,6 @@ MergeThrottler::MergeThrottler( _queueSequence(0), _messageLock(), _stateLock(), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _metrics(std::make_unique<Metrics>()), _component(compReg, "mergethrottler"), _thread(), @@ -203,34 +202,33 @@ MergeThrottler::MergeThrottler( { _throttlePolicy->setMinWindowSize(20); _throttlePolicy->setMaxWindowSize(20); - _configFetcher->subscribe<StorServerConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerStatusPage(*this); _component.registerMetric(*_metrics); } void -MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) +MergeThrottler::on_configure(const StorServerConfig& new_config) { std::lock_guard lock(_stateLock); - _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type + _use_dynamic_throttling = (new_config.mergeThrottlingPolicy.type == StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC); - if (newConfig->maxMergesPerNode < 1) { + if (new_config.maxMergesPerNode < 1) { throw config::InvalidConfigException("Cannot have a max merge count of less than 1"); } - if (newConfig->maxMergeQueueSize < 0) { + if (new_config.maxMergeQueueSize < 0) { throw config::InvalidConfigException("Max merge queue size cannot be less than 0"); } - if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) { + if (new_config.resourceExhaustionMergeBackPressureDurationSecs < 0.0) { throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0"); } if (_use_dynamic_throttling) { - auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1); - auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1); + auto min_win_sz = std::max(new_config.mergeThrottlingPolicy.minWindowSize, 1); + auto max_win_sz = std::max(new_config.mergeThrottlingPolicy.maxWindowSize, 1); if (min_win_sz > max_win_sz) { min_win_sz = max_win_sz; } - auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement); + auto win_sz_increment = std::max(1.0, new_config.mergeThrottlingPolicy.windowSizeIncrement); _throttlePolicy->setMinWindowSize(min_win_sz); _throttlePolicy->setMaxWindowSize(max_win_sz); _throttlePolicy->setWindowSizeIncrement(win_sz_increment); @@ -238,15 +236,14 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ min_win_sz, max_win_sz, win_sz_increment); } else { // Use legacy config values when static throttling is enabled. - _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode); - _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode); + _throttlePolicy->setMinWindowSize(new_config.maxMergesPerNode); + _throttlePolicy->setMaxWindowSize(new_config.maxMergesPerNode); } - LOG(debug, "Setting new max queue size to %d", - newConfig->maxMergeQueueSize); - _maxQueueSize = newConfig->maxMergeQueueSize; + LOG(debug, "Setting new max queue size to %d", new_config.maxMergeQueueSize); + _maxQueueSize = new_config.maxMergeQueueSize; _backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>( - std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs)); - _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges; + std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs)); + _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges; } MergeThrottler::~MergeThrottler() @@ -275,8 +272,6 @@ MergeThrottler::onOpen() void MergeThrottler::onClose() { - // Avoid getting config on shutdown - _configFetcher->close(); { std::lock_guard guard(_messageLock); // Note: used to prevent taking locks in different order if onFlush diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h index 840f1df1177..5362c2f6df8 100644 --- a/storage/src/vespa/storage/storageserver/mergethrottler.h +++ b/storage/src/vespa/storage/storageserver/mergethrottler.h @@ -35,10 +35,11 @@ class AbortBucketOperationsCommand; class MergeThrottler : public framework::Runnable, public StorageLink, - public framework::HtmlStatusReporter, - private config::IFetcherCallback<vespa::config::content::core::StorServerConfig> + public framework::HtmlStatusReporter { public: + using StorServerConfig = vespa::config::content::core::StorServerConfig; + class MergeFailureMetrics : public metrics::MetricSet { public: metrics::SumMetric<metrics::LongCountMetric> sum; @@ -172,7 +173,6 @@ private: mutable std::mutex _messageLock; std::condition_variable _messageCond; mutable std::mutex _stateLock; - std::unique_ptr<config::ConfigFetcher> _configFetcher; // Messages pending to be processed by the worker thread std::vector<api::StorageMessage::SP> _messagesDown; std::vector<api::StorageMessage::SP> _messagesUp; @@ -190,7 +190,7 @@ public: * windowSizeIncrement used for allowing unit tests to start out with more * than 1 as their window size. */ - MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&); + MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&); ~MergeThrottler() override; /** Implements document::Runnable::run */ @@ -204,6 +204,8 @@ public: bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override; + void on_configure(const StorServerConfig& new_config); + /* * When invoked, merges to the node will be BUSY-bounced by the throttler * for a configurable period of time instead of being processed. @@ -282,11 +284,6 @@ private: [[nodiscard]] bool isChainCompleted() const noexcept; }; - /** - * Callback method for config system (IFetcherCallback) - */ - void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) override; - // NOTE: unless explicitly specified, all the below functions require // _sync lock to be held upon call (usually implicitly via MessageGuard) diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index 3faa8a3ddec..0cce2c27e95 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -24,19 +24,33 @@ LOG_SETUP(".node.servicelayer"); namespace storage { +ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs() = default; +ServiceLayerNode::ServiceLayerBootstrapConfigs::~ServiceLayerBootstrapConfigs() = default; +ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(ServiceLayerBootstrapConfigs&&) noexcept = default; +ServiceLayerNode::ServiceLayerBootstrapConfigs& +ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default; + ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context, - BootstrapConfigs bootstrap_configs, + ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, const VisitorFactory::Map& externalVisitors) - : StorageNode(configUri, context, std::move(bootstrap_configs), generationFetcher, std::make_unique<HostInfo>()), + : StorageNode(configUri, context, std::move(bootstrap_configs.storage_bootstrap_configs), + generationFetcher, std::make_unique<HostInfo>()), _context(context), _persistenceProvider(persistenceProvider), _externalVisitors(externalVisitors), + _persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)), + _visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)), + _filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)), _bouncer(nullptr), _bucket_manager(nullptr), + _changed_bucket_ownership_handler(nullptr), _fileStorManager(nullptr), + _merge_throttler(nullptr), + _visitor_manager(nullptr), + _modified_bucket_checker(nullptr), _init_has_been_called(false) { } @@ -158,18 +172,25 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config()); _bouncer = bouncer.get(); builder.add(std::move(bouncer)); - auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg); - auto merge_throttler = merge_throttler_up.get(); + auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg); + _merge_throttler = merge_throttler_up.get(); builder.add(std::move(merge_throttler_up)); - builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg)); - auto bucket_manager = std::make_unique<BucketManager>(_configUri, _context.getComponentRegister()); + auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg); + _changed_bucket_ownership_handler = bucket_ownership_handler.get(); + builder.add(std::move(bucket_ownership_handler)); + auto bucket_manager = std::make_unique<BucketManager>(server_config(), _context.getComponentRegister()); _bucket_manager = bucket_manager.get(); builder.add(std::move(bucket_manager)); - builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(), - static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors)); - builder.add(std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, _configUri)); + auto visitor_manager = std::make_unique<VisitorManager>(*_visitor_bootstrap_config, _context.getComponentRegister(), + static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors); + _visitor_manager = visitor_manager.get(); + builder.add(std::move(visitor_manager)); + auto bucket_checker = std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, server_config()); + _modified_bucket_checker = bucket_checker.get(); + builder.add(std::move(bucket_checker)); auto state_manager = releaseStateManager(); - auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _persistenceProvider, _context.getComponentRegister(), + auto filstor_manager = std::make_unique<FileStorManager>(*_filestor_bootstrap_config, _persistenceProvider, + _context.getComponentRegister(), getDoneInitializeHandler(), state_manager->getHostInfo()); _fileStorManager = filstor_manager.get(); builder.add(std::move(filstor_manager)); @@ -178,8 +199,43 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder) // Lifetimes of all referenced components shall outlive the last call going // through the SPI, as queues are flushed and worker threads joined when // the storage link chain is closed prior to destruction. - auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler); + auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *_merge_throttler); _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener)); + + // Purge config no longer needed + _persistence_bootstrap_config.reset(); + _visitor_bootstrap_config.reset(); + _filestor_bootstrap_config.reset(); +} + +void +ServiceLayerNode::on_configure(const StorServerConfig& config) +{ + assert(_merge_throttler); + _merge_throttler->on_configure(config); + assert(_modified_bucket_checker); + _modified_bucket_checker->on_configure(config); +} + +void +ServiceLayerNode::on_configure(const PersistenceConfig& config) +{ + assert(_changed_bucket_ownership_handler); + _changed_bucket_ownership_handler->on_configure(config); +} + +void +ServiceLayerNode::on_configure(const StorVisitorConfig& config) +{ + assert(_visitor_manager); + _visitor_manager->on_configure(config); +} + +void +ServiceLayerNode::on_configure(const StorFilestorConfig& config) +{ + assert(_fileStorManager); + _fileStorManager->on_configure(config); } ResumeGuard diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index dabc1c979a5..ae39bb0805e 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -1,10 +1,4 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::ServiceLayerNode - * \ingroup storageserver - * - * \brief Class for setting up a service layer node. - */ #pragma once @@ -12,9 +6,12 @@ #include "servicelayernodecontext.h" #include "storagenode.h" #include "vespa/vespalib/util/jsonstream.h" -#include <vespa/storage/visiting/visitormessagesessionfactory.h> -#include <vespa/storage/common/visitorfactory.h> +#include <vespa/config-persistence.h> +#include <vespa/config-stor-filestor.h> #include <vespa/storage/common/nodestateupdater.h> +#include <vespa/storage/common/visitorfactory.h> +#include <vespa/storage/visiting/config-stor-visitor.h> +#include <vespa/storage/visiting/visitormessagesessionfactory.h> namespace storage { @@ -22,7 +19,11 @@ namespace spi { struct PersistenceProvider; } class Bouncer; class BucketManager; +class ChangedBucketOwnershipHandler; class FileStorManager; +class MergeThrottler; +class ModifiedBucketChecker; +class VisitorManager; class ServiceLayerNode : public StorageNode, @@ -30,21 +31,44 @@ class ServiceLayerNode private NodeStateReporter { - ServiceLayerNodeContext& _context; - spi::PersistenceProvider& _persistenceProvider; - VisitorFactory::Map _externalVisitors; - - Bouncer* _bouncer; - BucketManager* _bucket_manager; - FileStorManager* _fileStorManager; - bool _init_has_been_called; +public: + using PersistenceConfig = vespa::config::content::PersistenceConfig; + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; + using StorFilestorConfig = vespa::config::content::StorFilestorConfig; +private: + ServiceLayerNodeContext& _context; + spi::PersistenceProvider& _persistenceProvider; + VisitorFactory::Map _externalVisitors; + std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config; + std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config; + std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config; + Bouncer* _bouncer; + BucketManager* _bucket_manager; + ChangedBucketOwnershipHandler* _changed_bucket_ownership_handler; + FileStorManager* _fileStorManager; + MergeThrottler* _merge_throttler; + VisitorManager* _visitor_manager; + ModifiedBucketChecker* _modified_bucket_checker; + bool _init_has_been_called; public: using UP = std::unique_ptr<ServiceLayerNode>; + struct ServiceLayerBootstrapConfigs { + BootstrapConfigs storage_bootstrap_configs; + std::unique_ptr<PersistenceConfig> persistence_cfg; + std::unique_ptr<StorVisitorConfig> visitor_cfg; + std::unique_ptr<StorFilestorConfig> filestor_cfg; + + ServiceLayerBootstrapConfigs(); + ~ServiceLayerBootstrapConfigs(); + ServiceLayerBootstrapConfigs(ServiceLayerBootstrapConfigs&&) noexcept; + ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept; + }; + ServiceLayerNode(const config::ConfigUri & configUri, ServiceLayerNodeContext& context, - BootstrapConfigs bootstrap_configs, + ServiceLayerBootstrapConfigs bootstrap_configs, ApplicationGenerationFetcher& generationFetcher, spi::PersistenceProvider& persistenceProvider, const VisitorFactory::Map& externalVisitors); @@ -54,6 +78,11 @@ public: */ void init(); + void on_configure(const StorServerConfig& config); + void on_configure(const PersistenceConfig& config); + void on_configure(const StorVisitorConfig& config); + void on_configure(const StorFilestorConfig& config); + const lib::NodeType& getNodeType() const override { return lib::NodeType::STORAGE; } ResumeGuard pause() override; diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp index 1c26923b15f..dc1635bc4b1 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.cpp +++ b/storage/src/vespa/storage/visiting/visitormanager.cpp @@ -21,7 +21,7 @@ LOG_SETUP(".visitor.manager"); namespace storage { -VisitorManager::VisitorManager(const config::ConfigUri & configUri, +VisitorManager::VisitorManager(const StorVisitorConfig& bootstrap_config, StorageComponentRegister& componentRegister, VisitorMessageSessionFactory& messageSF, VisitorFactory::Map externalFactories, @@ -35,7 +35,6 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _visitorLock(), _visitorCond(), _visitorCounter(0), - _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())), _metrics(std::make_shared<VisitorMetrics>()), _maxFixedConcurrentVisitors(1), _maxVariableConcurrentVisitors(0), @@ -51,8 +50,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri, _enforceQueueUse(false), _visitorFactories(std::move(externalFactories)) { - _configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this); - _configFetcher->start(); + on_configure(bootstrap_config); _component.registerMetric(*_metrics); if (!defer_manager_thread_start) { create_and_start_manager_thread(); @@ -94,8 +92,6 @@ VisitorManager::updateMetrics(const MetricLockGuard &) void VisitorManager::onClose() { - // Avoid getting config during shutdown - _configFetcher->close(); { std::lock_guard sync(_visitorLock); for (auto& enqueued : _visitorQueue) { @@ -118,25 +114,25 @@ VisitorManager::print(std::ostream& out, bool verbose, const std::string& indent } void -VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisitorConfig> config) +VisitorManager::on_configure(const vespa::config::content::core::StorVisitorConfig& config) { std::lock_guard sync(_visitorLock); - if (config->defaultdocblocksize % 512 != 0) { + if (config.defaultdocblocksize % 512 != 0) { throw config::InvalidConfigException( - "The default docblock size needs to be a multiplum of the " + "The default docblock size needs to be a multiple of the " "disk block size. (512b)"); } // Do some sanity checking of input. Cannot haphazardly mix and match // old and new max concurrency config values - if (config->maxconcurrentvisitors == 0 - && config->maxconcurrentvisitorsFixed == 0) + if (config.maxconcurrentvisitors == 0 + && config.maxconcurrentvisitorsFixed == 0) { throw config::InvalidConfigException( "Maximum concurrent visitor count cannot be 0."); } - else if (config->maxconcurrentvisitorsFixed == 0 - && config->maxconcurrentvisitorsVariable != 0) + else if (config.maxconcurrentvisitorsFixed == 0 + && config.maxconcurrentvisitorsVariable != 0) { throw config::InvalidConfigException( "Cannot specify 'variable' parameter for max concurrent " @@ -147,21 +143,21 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi uint32_t maxConcurrentVisitorsVariable; // Concurrency parameter fixed takes precedence over legacy maxconcurrent - if (config->maxconcurrentvisitorsFixed > 0) { - maxConcurrentVisitorsFixed = config->maxconcurrentvisitorsFixed; - maxConcurrentVisitorsVariable = config->maxconcurrentvisitorsVariable; + if (config.maxconcurrentvisitorsFixed > 0) { + maxConcurrentVisitorsFixed = config.maxconcurrentvisitorsFixed; + maxConcurrentVisitorsVariable = config.maxconcurrentvisitorsVariable; } else { - maxConcurrentVisitorsFixed = config->maxconcurrentvisitors; + maxConcurrentVisitorsFixed = config.maxconcurrentvisitors; maxConcurrentVisitorsVariable = 0; } bool liveUpdate = !_visitorThread.empty(); if (liveUpdate) { - if (_visitorThread.size() != static_cast<uint32_t>(config->visitorthreads)) { + if (_visitorThread.size() != static_cast<uint32_t>(config.visitorthreads)) { LOG(warning, "Ignoring config change requesting %u visitor " "threads, still running %u. Restart storage to apply " "change.", - config->visitorthreads, + config.visitorthreads, (uint32_t) _visitorThread.size()); } @@ -174,18 +170,18 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi maxConcurrentVisitorsFixed, maxConcurrentVisitorsVariable); } - if (_maxVisitorQueueSize != static_cast<uint32_t>(config->maxvisitorqueuesize)) { + if (_maxVisitorQueueSize != static_cast<uint32_t>(config.maxvisitorqueuesize)) { LOG(info, "Altered max visitor queue size setting from %u to %u.", - _maxVisitorQueueSize, config->maxvisitorqueuesize); + _maxVisitorQueueSize, config.maxvisitorqueuesize); } } else { - if (config->visitorthreads == 0) { + if (config.visitorthreads == 0) { throw config::InvalidConfigException( "No visitor threads configured. If you don't want visitors " "to run, don't use visitormanager.", VESPA_STRLOC); } - _metrics->initThreads(config->visitorthreads); - for (int32_t i=0; i<config->visitorthreads; ++i) { + _metrics->initThreads(config.visitorthreads); + for (int32_t i=0; i<config.visitorthreads; ++i) { _visitorThread.emplace_back( // Naked new due to a lot of private inheritance in VisitorThread and VisitorManager std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory, @@ -195,9 +191,9 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi } _maxFixedConcurrentVisitors = maxConcurrentVisitorsFixed; _maxVariableConcurrentVisitors = maxConcurrentVisitorsVariable; - _maxVisitorQueueSize = config->maxvisitorqueuesize; + _maxVisitorQueueSize = config.maxvisitorqueuesize; - auto cmd = std::make_shared<PropagateVisitorConfig>(*config); + auto cmd = std::make_shared<PropagateVisitorConfig>(config); for (auto& thread : _visitorThread) { thread.first->processMessage(0, cmd); } diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h index 9fe906d4465..fefa2c218ab 100644 --- a/storage/src/vespa/storage/visiting/visitormanager.h +++ b/storage/src/vespa/storage/visiting/visitormanager.h @@ -44,10 +44,11 @@ class VisitorManager : public framework::Runnable, public StorageLink, public framework::HtmlStatusReporter, private VisitorMessageHandler, - private config::IFetcherCallback<vespa::config::content::core::StorVisitorConfig>, private framework::MetricUpdateHook { private: + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; + StorageComponentRegister& _componentRegister; VisitorMessageSessionFactory& _messageSessionFactory; std::vector<std::pair<std::shared_ptr<VisitorThread>, @@ -64,7 +65,6 @@ private: mutable std::mutex _visitorLock; std::condition_variable _visitorCond; uint64_t _visitorCounter; - std::unique_ptr<config::ConfigFetcher> _configFetcher; std::shared_ptr<VisitorMetrics> _metrics; uint32_t _maxFixedConcurrentVisitors; uint32_t _maxVariableConcurrentVisitors; @@ -82,7 +82,7 @@ private: bool _enforceQueueUse; VisitorFactory::Map _visitorFactories; public: - VisitorManager(const config::ConfigUri & configUri, + VisitorManager(const StorVisitorConfig& bootstrap_config, StorageComponentRegister&, VisitorMessageSessionFactory&, VisitorFactory::Map external = VisitorFactory::Map(), @@ -94,6 +94,8 @@ public: uint32_t getActiveVisitorCount() const; void setTimeBetweenTicks(uint32_t time); + void on_configure(const vespa::config::content::core::StorVisitorConfig&); + void setMaxConcurrentVisitors(uint32_t count) { // Used in unit testing _maxFixedConcurrentVisitors = count; _maxVariableConcurrentVisitors = 0; @@ -122,7 +124,6 @@ public: private: using MonitorGuard = std::unique_lock<std::mutex>; - void configure(std::unique_ptr<vespa::config::content::core::StorVisitorConfig>) override; void run(framework::ThreadHandle&) override; /** diff --git a/storageserver/src/vespa/storageserver/app/process.h b/storageserver/src/vespa/storageserver/app/process.h index a30611b78c8..72b399ac870 100644 --- a/storageserver/src/vespa/storageserver/app/process.h +++ b/storageserver/src/vespa/storageserver/app/process.h @@ -8,7 +8,7 @@ * contains the process as a library such that it can be tested and used in * other pieces of code. * - * Specializations of this class will exist to add the funcionality needed for + * Specializations of this class will exist to add the functionality needed for * the various process types. */ diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp index 6b713b8e3f4..bb284bfc108 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp @@ -34,6 +34,9 @@ bucket_db_options_from_config(const config::ConfigUri& config_uri) { ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri) : Process(configUri), _externalVisitors(), + _persistence_cfg_handle(), + _visitor_cfg_handle(), + _filestor_cfg_handle(), _node(), _storage_chain_builder(), _context(std::make_unique<framework::defaultimplementation::RealClock>(), @@ -53,6 +56,10 @@ ServiceLayerProcess::shutdown() void ServiceLayerProcess::setupConfig(vespalib::duration subscribe_timeout) { + _persistence_cfg_handle = _configSubscriber.subscribe<PersistenceConfig>(_configUri.getConfigId(), subscribe_timeout); + _visitor_cfg_handle = _configSubscriber.subscribe<StorVisitorConfig>(_configUri.getConfigId(), subscribe_timeout); + _filestor_cfg_handle = _configSubscriber.subscribe<StorFilestorConfig>(_configUri.getConfigId(), subscribe_timeout); + // We reuse the StorServerConfig subscription from the parent Process Process::setupConfig(subscribe_timeout); } @@ -60,6 +67,18 @@ void ServiceLayerProcess::updateConfig() { Process::updateConfig(); + if (_server_cfg_handle->isChanged()) { + _node->on_configure(*_server_cfg_handle->getConfig()); + } + if (_persistence_cfg_handle->isChanged()) { + _node->on_configure(*_persistence_cfg_handle->getConfig()); + } + if (_visitor_cfg_handle->isChanged()) { + _node->on_configure(*_visitor_cfg_handle->getConfig()); + } + if (_filestor_cfg_handle->isChanged()) { + _node->on_configure(*_filestor_cfg_handle->getConfig()); + } } bool @@ -73,15 +92,21 @@ ServiceLayerProcess::createNode() { add_external_visitors(); setupProvider(); - // TODO dedupe, consolidate + StorageNode::BootstrapConfigs bc; bc.bucket_spaces_cfg = _bucket_spaces_cfg_handle->getConfig(); - bc.bouncer_cfg = _bouncer_cfg_handle->getConfig(); - bc.comm_mgr_cfg = _comm_mgr_cfg_handle->getConfig(); - bc.distribution_cfg = _distribution_cfg_handle->getConfig(); - bc.server_cfg = _server_cfg_handle->getConfig(); - - _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(bc), *this, getProvider(), _externalVisitors); + bc.bouncer_cfg = _bouncer_cfg_handle->getConfig(); + bc.comm_mgr_cfg = _comm_mgr_cfg_handle->getConfig(); + bc.distribution_cfg = _distribution_cfg_handle->getConfig(); + bc.server_cfg = _server_cfg_handle->getConfig(); + + ServiceLayerNode::ServiceLayerBootstrapConfigs sbc; + sbc.storage_bootstrap_configs = std::move(bc); + sbc.persistence_cfg = _persistence_cfg_handle->getConfig(); + sbc.visitor_cfg = _visitor_cfg_handle->getConfig(); + sbc.filestor_cfg = _filestor_cfg_handle->getConfig(); + + _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(sbc), *this, getProvider(), _externalVisitors); if (_storage_chain_builder) { _node->set_storage_chain_builder(std::move(_storage_chain_builder)); } diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h index 7e981748d1a..dcc56f373c4 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h @@ -1,24 +1,12 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -/** - * \class storage::ServiceLayerProcess - * - * \brief A process running a service layer. - */ -/** - * \class storage::MemFileServiceLayerProcess - * - * \brief A process running a service layer with memfile persistence provider. - */ -/** - * \class storage::RpcServiceLayerProcess - * - * \brief A process running a service layer with RPC persistence provider. - */ #pragma once #include "process.h" -#include <vespa/storage/storageserver/servicelayernodecontext.h> +#include <vespa/config-persistence.h> +#include <vespa/config-stor-filestor.h> #include <vespa/storage/common/visitorfactory.h> +#include <vespa/storage/storageserver/servicelayernodecontext.h> +#include <vespa/storage/visiting/config-stor-visitor.h> namespace config { class ConfigUri; } @@ -33,6 +21,14 @@ class ServiceLayerProcess : public Process { protected: VisitorFactory::Map _externalVisitors; private: + using PersistenceConfig = vespa::config::content::PersistenceConfig; + using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig; + using StorFilestorConfig = vespa::config::content::StorFilestorConfig; + + std::unique_ptr<config::ConfigHandle<PersistenceConfig>> _persistence_cfg_handle; + std::unique_ptr<config::ConfigHandle<StorVisitorConfig>> _visitor_cfg_handle; + std::unique_ptr<config::ConfigHandle<StorFilestorConfig>> _filestor_cfg_handle; + std::unique_ptr<ServiceLayerNode> _node; std::unique_ptr<IStorageChainBuilder> _storage_chain_builder; |