aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-10-24 22:12:02 +0200
committerGitHub <noreply@github.com>2023-10-24 22:12:02 +0200
commit3858da05eaf7eb58b5536a59a01dd4154d174424 (patch)
tree17fc31e28da6460dd562ae44f4c78b484f11ef9f
parentbc3b03f9189b2b575f1c3b1bfe0c2a79bab555ce (diff)
parent7e2bb2cbcaae5bf5eadb2ee765bb7280a458f9e7 (diff)
Merge pull request #29086 from vespa-engine/vekterli/rewire-more-service-layer-component-configs
Rewire more service layer component configs
-rw-r--r--storage/src/tests/bucketdb/bucketmanagertest.cpp27
-rw-r--r--storage/src/tests/common/testhelper.h6
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.cpp18
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp7
-rw-r--r--storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp15
-rw-r--r--storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp13
-rw-r--r--storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp10
-rw-r--r--storage/src/tests/storageserver/communicationmanagertest.cpp26
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp29
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp11
-rw-r--r--storage/src/tests/visiting/visitormanagertest.cpp9
-rw-r--r--storage/src/tests/visiting/visitortest.cpp6
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.cpp10
-rw-r--r--storage/src/vespa/storage/bucketdb/bucketmanager.h6
-rw-r--r--storage/src/vespa/storage/common/visitorfactory.h1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp24
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.h16
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp13
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h10
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp15
-rw-r--r--storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h12
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp2
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp37
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h15
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp78
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h63
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.cpp48
-rw-r--r--storage/src/vespa/storage/visiting/visitormanager.h9
-rw-r--r--storageserver/src/vespa/storageserver/app/process.h2
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp39
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h28
31 files changed, 354 insertions, 251 deletions
diff --git a/storage/src/tests/bucketdb/bucketmanagertest.cpp b/storage/src/tests/bucketdb/bucketmanagertest.cpp
index f46ff867fcc..91e901c7254 100644
--- a/storage/src/tests/bucketdb/bucketmanagertest.cpp
+++ b/storage/src/tests/bucketdb/bucketmanagertest.cpp
@@ -1,29 +1,29 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
#include <vespa/config/helper/configgetter.hpp>
-#include <vespa/document/config/documenttypes_config_fwd.h>
#include <vespa/document/config/config-documenttypes.h>
+#include <vespa/document/config/documenttypes_config_fwd.h>
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/fieldvalue/document.h>
-#include <vespa/document/update/documentupdate.h>
#include <vespa/document/repo/documenttyperepo.h>
-#include <vespa/document/test/make_document_bucket.h>
#include <vespa/document/test/make_bucket_space.h>
+#include <vespa/document/test/make_document_bucket.h>
+#include <vespa/document/update/documentupdate.h>
+#include <vespa/metrics/updatehook.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
#include <vespa/storage/common/global_bucket_space_distribution_converter.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
-#include <vespa/metrics/updatehook.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/dummystoragelink.h>
-#include <tests/common/testhelper.h>
-#include <vespa/vdslib/state/random.h>
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/vdslib/state/clusterstate.h>
-#include <vespa/vespalib/stllike/asciistream.h>
+#include <vespa/vdslib/state/random.h>
#include <vespa/vespalib/gtest/gtest.h>
+#include <vespa/vespalib/stllike/asciistream.h>
#include <future>
#include <vespa/log/log.h>
@@ -148,7 +148,9 @@ void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, bool noD
_node->setTypeRepo(repo);
_node->setupDummyPersistence();
// Set up the 3 links
- auto manager = std::make_unique<BucketManager>(config::ConfigUri(config.getConfigId()), _node->getComponentRegister());
+ auto config_uri = config::ConfigUri(config.getConfigId());
+ using vespa::config::content::core::StorServerConfig;
+ auto manager = std::make_unique<BucketManager>(*config_from<StorServerConfig>(config_uri), _node->getComponentRegister());
_manager = manager.get();
_top->push_back(std::move(manager));
if (fakePersistenceLayer) {
@@ -156,7 +158,8 @@ void BucketManagerTest::setupTestEnvironment(bool fakePersistenceLayer, bool noD
_bottom = bottom.get();
_top->push_back(std::move(bottom));
} else {
- auto bottom = std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()),
+ using vespa::config::content::StorFilestorConfig;
+ auto bottom = std::make_unique<FileStorManager>(*config_from<StorFilestorConfig>(config_uri),
_node->getPersistenceProvider(), _node->getComponentRegister(),
*_node, _node->get_host_info());
_top->push_back(std::move(bottom));
diff --git a/storage/src/tests/common/testhelper.h b/storage/src/tests/common/testhelper.h
index bfc5c7679e1..1f83e938409 100644
--- a/storage/src/tests/common/testhelper.h
+++ b/storage/src/tests/common/testhelper.h
@@ -1,5 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <vespa/config/helper/configgetter.h>
#include <vespa/messagebus/testlib/slobrok.h>
#include <vespa/vdstestlib/config/dirconfig.h>
#include <fstream>
@@ -21,6 +22,11 @@ std::string getRootFolder(vdstestlib::DirConfig & dc);
void addSlobrokConfig(vdstestlib::DirConfig& dc,
const mbus::Slobrok& slobrok);
+template <typename ConfigT>
+std::unique_ptr<ConfigT> config_from(const ::config::ConfigUri& cfg_uri) {
+ return ::config::ConfigGetter<ConfigT>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
+}
+
// Class used to print start and end of test. Enable debug when you want to see
// which test creates what output or where we get stuck
struct TestName {
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp
index c989acd5228..74d34fb0f50 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.cpp
+++ b/storage/src/tests/persistence/common/filestortestfixture.cpp
@@ -1,15 +1,17 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/persistence/messages.h>
-#include <vespa/storage/persistence/filestorage/filestormanager.h>
-#include <vespa/storageapi/message/bucket.h>
-#include <vespa/persistence/dummyimpl/dummypersistence.h>
+#include <tests/common/testhelper.h>
#include <tests/persistence/common/filestortestfixture.h>
-#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
-#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/persistence/spi/test.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/persistence/messages.h>
+#include <vespa/storageapi/message/bucket.h>
+#include <vespa/vdslib/state/clusterstate.h>
#include <sstream>
using storage::spi::test::makeSpiBucket;
@@ -73,7 +75,9 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents(
manager(nullptr)
{
injector.inject(top);
- auto fsm = std::make_unique<FileStorManager>(config::ConfigUri(fixture._config->getConfigId()), fixture._node->getPersistenceProvider(),
+ using vespa::config::content::StorFilestorConfig;
+ auto config = config_from<StorFilestorConfig>(config::ConfigUri(fixture._config->getConfigId()));
+ auto fsm = std::make_unique<FileStorManager>(*config, fixture._node->getPersistenceProvider(),
fixture._node->getComponentRegister(), *fixture._node, fixture._node->get_host_info());
manager = fsm.get();
top.push_back(std::move(fsm));
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index b09febce408..586863251c9 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -5,6 +5,7 @@
#include <tests/common/teststorageapp.h>
#include <tests/persistence/filestorage/forwardingmessagesender.h>
#include <vespa/config/common/exceptions.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/select/parser.h>
@@ -223,8 +224,10 @@ struct TestFileStorComponents {
explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false)
: manager(nullptr)
{
- auto fsm = std::make_unique<FileStorManager>(config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId()),
- test._node->getPersistenceProvider(),
+ using vespa::config::content::StorFilestorConfig;
+ auto config_uri = config::ConfigUri((use_small_config ? test.smallConfig : test.config)->getConfigId());
+ auto config = config_from<StorFilestorConfig>(config_uri);
+ auto fsm = std::make_unique<FileStorManager>(*config, test._node->getPersistenceProvider(),
test._node->getComponentRegister(), *test._node, test._node->get_host_info());
manager = fsm.get();
top.push_back(std::move(fsm));
diff --git a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
index 966382de39b..710da80972f 100644
--- a/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormodifiedbucketstest.cpp
@@ -1,11 +1,12 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storageapi/message/bucket.h>
-#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
-#include <vespa/persistence/spi/test.h>
+#include <tests/persistence/common/filestortestfixture.h>
#include <tests/persistence/common/persistenceproviderwrapper.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
-#include <tests/persistence/common/filestortestfixture.h>
+#include <vespa/persistence/spi/test.h>
+#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
+#include <vespa/storageapi/message/bucket.h>
using storage::spi::test::makeSpiBucket;
using namespace ::testing;
@@ -36,10 +37,10 @@ struct BucketCheckerInjector : FileStorTestFixture::StorageLinkInjector
_fixture(fixture)
{}
void inject(DummyStorageLink& link) const override {
+ using vespa::config::content::core::StorServerConfig;
+ auto cfg = config_from<StorServerConfig>(config::ConfigUri(_fixture._config->getConfigId()));
link.push_back(std::make_unique<ModifiedBucketChecker>(
- _node.getComponentRegister(),
- _node.getPersistenceProvider(),
- config::ConfigUri(_fixture._config->getConfigId())));
+ _node.getComponentRegister(), _node.getPersistenceProvider(), *cfg));
}
};
diff --git a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
index 8acaa9a78d3..f96ff9c012e 100644
--- a/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
+++ b/storage/src/tests/persistence/filestorage/modifiedbucketcheckertest.cpp
@@ -1,11 +1,12 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
+#include <vespa/config/common/exceptions.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
-#include <vespa/config/common/exceptions.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace ::testing;
@@ -45,9 +46,11 @@ ModifiedBucketCheckerTest::SetUp()
_node->setupDummyPersistence();
_top.reset(new DummyStorageLink);
+ using vespa::config::content::core::StorServerConfig;
+ auto bootstrap_cfg = config_from<StorServerConfig>(config::ConfigUri(_config->getConfigId()));
_handler = new ModifiedBucketChecker(_node->getComponentRegister(),
_node->getPersistenceProvider(),
- config::ConfigUri(_config->getConfigId()));
+ *bootstrap_cfg);
_top->push_back(std::unique_ptr<StorageLink>(_handler));
_bottom = new DummyStorageLink;
_handler->push_back(std::unique_ptr<StorageLink>(_bottom));
@@ -136,7 +139,7 @@ TEST_F(ModifiedBucketCheckerTest, recheck_requests_are_chunked) {
_top->open();
cfgns::StorServerConfigBuilder cfgBuilder;
cfgBuilder.bucketRecheckingChunkSize = 2;
- _handler->configure(std::make_unique<cfgns::StorServerConfig>(cfgBuilder));
+ _handler->on_configure(*std::make_unique<cfgns::StorServerConfig>(cfgBuilder));
modifyBuckets(5, 0);
_handler->tick();
@@ -172,7 +175,7 @@ TEST_F(ModifiedBucketCheckerTest, invalid_chunk_size_config_is_rejected) {
_top->open();
cfgns::StorServerConfigBuilder cfgBuilder;
cfgBuilder.bucketRecheckingChunkSize = 0;
- EXPECT_THROW(_handler->configure(std::make_unique<cfgns::StorServerConfig>(cfgBuilder)),
+ EXPECT_THROW(_handler->on_configure(*std::make_unique<cfgns::StorServerConfig>(cfgBuilder)),
config::InvalidConfigException);
}
diff --git a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
index 639b3231028..50977b5ec8b 100644
--- a/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
+++ b/storage/src/tests/storageserver/changedbucketownershiphandlertest.cpp
@@ -3,6 +3,7 @@
#include <tests/common/teststorageapp.h>
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/base/testdocman.h>
#include <vespa/storage/bucketdb/storbucketdb.h>
#include <vespa/storage/persistence/messages.h>
@@ -124,11 +125,12 @@ ChangedBucketOwnershipHandlerTest::insertBuckets(uint32_t numBuckets,
void
ChangedBucketOwnershipHandlerTest::SetUp()
{
+ using vespa::config::content::PersistenceConfig;
vdstestlib::DirConfig config(getStandardConfig(true));
_app.reset(new TestServiceLayerApp);
_top.reset(new DummyStorageLink);
- _handler = new ChangedBucketOwnershipHandler(config::ConfigUri(config.getConfigId()),
+ _handler = new ChangedBucketOwnershipHandler(*config_from<PersistenceConfig>(config::ConfigUri(config.getConfigId())),
_app->getComponentRegister());
_top->push_back(std::unique_ptr<StorageLink>(_handler));
_bottom = new DummyStorageLink;
@@ -139,7 +141,7 @@ ChangedBucketOwnershipHandlerTest::SetUp()
auto pconfig = std::make_unique<vespa::config::content::PersistenceConfigBuilder>();
pconfig->abortOutdatedMutatingIdealStateOps = true;
pconfig->abortOutdatedMutatingExternalLoadOps = true;
- _handler->configure(std::move(pconfig));
+ _handler->on_configure(*pconfig);
}
namespace {
@@ -466,7 +468,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, abort_outdated_remove_location) {
TEST_F(ChangedBucketOwnershipHandlerTest, ideal_state_aborts_are_configurable) {
auto config = std::make_unique<vespa::config::content::PersistenceConfigBuilder>();
config->abortOutdatedMutatingIdealStateOps = false;
- _handler->configure(std::move(config));
+ _handler->on_configure(*config);
// Should not abort operation, even when ownership has changed.
expectChangeAbortsMessage<api::CreateBucketCommand>(false, getBucketToAbort());
}
@@ -508,7 +510,7 @@ TEST_F(ChangedBucketOwnershipHandlerTest, external_load_op_abort_updates_metric)
TEST_F(ChangedBucketOwnershipHandlerTest, external_load_op_aborts_are_configurable) {
auto config = std::make_unique<vespa::config::content::PersistenceConfigBuilder>();
config->abortOutdatedMutatingExternalLoadOps = false;
- _handler->configure(std::move(config));
+ _handler->on_configure(*config);
// Should not abort operation, even when ownership has changed.
document::DocumentId docId("id:foo:testdoctype1::bar");
expectChangeAbortsMessage<api::RemoveCommand>(
diff --git a/storage/src/tests/storageserver/communicationmanagertest.cpp b/storage/src/tests/storageserver/communicationmanagertest.cpp
index 70b8c40722c..04322562d08 100644
--- a/storage/src/tests/storageserver/communicationmanagertest.cpp
+++ b/storage/src/tests/storageserver/communicationmanagertest.cpp
@@ -29,14 +29,6 @@ vespalib::string _storage("storage");
using CommunicationManagerConfig = vespa::config::content::core::StorCommunicationmanagerConfig;
-namespace {
-
-std::unique_ptr<CommunicationManagerConfig> config_from(const config::ConfigUri& cfg_uri) {
- return config::ConfigGetter<CommunicationManagerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
-}
-
-}
-
struct CommunicationManagerTest : Test {
static constexpr uint32_t MESSAGE_WAIT_TIME_SEC = 60;
@@ -88,8 +80,10 @@ TEST_F(CommunicationManagerTest, simple) {
auto dist_cfg_uri = config::ConfigUri(distConfig.getConfigId());
auto stor_cfg_uri = config::ConfigUri(storConfig.getConfigId());
- CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri, *config_from(dist_cfg_uri));
- CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri, *config_from(stor_cfg_uri));
+ CommunicationManager distributor(distNode.getComponentRegister(), dist_cfg_uri,
+ *config_from<CommunicationManagerConfig>(dist_cfg_uri));
+ CommunicationManager storage(storNode.getComponentRegister(), stor_cfg_uri,
+ *config_from<CommunicationManagerConfig>(stor_cfg_uri));
auto* distributorLink = new DummyStorageLink();
auto* storageLink = new DummyStorageLink();
distributor.push_back(std::unique_ptr<StorageLink>(distributorLink));
@@ -146,7 +140,8 @@ CommunicationManagerTest::doTestConfigPropagation(bool isContentNode)
}
auto cfg_uri = config::ConfigUri(config.getConfigId());
- CommunicationManager commMgr(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ CommunicationManager commMgr(node->getComponentRegister(), cfg_uri,
+ *config_from<CommunicationManagerConfig>(cfg_uri));
auto* storageLink = new DummyStorageLink();
commMgr.push_back(std::unique_ptr<StorageLink>(storageLink));
commMgr.open();
@@ -191,7 +186,8 @@ TEST_F(CommunicationManagerTest, commands_are_dequeued_in_fifo_order) {
TestServiceLayerApp storNode(storConfig.getConfigId());
auto cfg_uri = config::ConfigUri(storConfig.getConfigId());
- CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ CommunicationManager storage(storNode.getComponentRegister(), cfg_uri,
+ *config_from<CommunicationManagerConfig>(cfg_uri));
auto* storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
storage.open();
@@ -224,7 +220,8 @@ TEST_F(CommunicationManagerTest, replies_are_dequeued_in_fifo_order) {
TestServiceLayerApp storNode(storConfig.getConfigId());
auto cfg_uri = config::ConfigUri(storConfig.getConfigId());
- CommunicationManager storage(storNode.getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ CommunicationManager storage(storNode.getComponentRegister(), cfg_uri,
+ *config_from<CommunicationManagerConfig>(cfg_uri));
auto* storageLink = new DummyStorageLink();
storage.push_back(std::unique_ptr<StorageLink>(storageLink));
storage.open();
@@ -265,7 +262,8 @@ struct CommunicationManagerFixture {
node = std::make_unique<TestServiceLayerApp>(stor_config.getConfigId());
auto cfg_uri = config::ConfigUri(stor_config.getConfigId());
- comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri, *config_from(cfg_uri));
+ comm_mgr = std::make_unique<CommunicationManager>(node->getComponentRegister(), cfg_uri,
+ *config_from<CommunicationManagerConfig>(cfg_uri));
bottom_link = new DummyStorageLink();
comm_mgr->push_back(std::unique_ptr<StorageLink>(bottom_link));
comm_mgr->open();
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 8e87e07eeff..7a7f2551c2d 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -2,6 +2,7 @@
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/teststorageapp.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
#include <vespa/storage/storageserver/mergethrottler.h>
@@ -28,7 +29,9 @@ namespace storage {
namespace {
-vespalib::string _Storage("storage");
+using StorServerConfig = vespa::config::content::core::StorServerConfig;
+
+vespalib::string _storage("storage");
struct MergeBuilder {
document::BucketId _bucket;
@@ -108,7 +111,7 @@ struct MergeBuilder {
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
return cmd;
}
};
@@ -167,7 +170,9 @@ MergeThrottlerTest::~MergeThrottlerTest() = default;
void
MergeThrottlerTest::SetUp()
{
- vdstestlib::DirConfig config(getStandardConfig(true));
+ vdstestlib::DirConfig dir_config(getStandardConfig(true));
+ auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
+ auto config = config_from<StorServerConfig>(cfg_uri);
for (int i = 0; i < _storageNodeCount; ++i) {
auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
@@ -175,7 +180,7 @@ MergeThrottlerTest::SetUp()
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(::config::ConfigUri(config.getConfigId()), server->getComponentRegister());
+ MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
DummyStorageLink* bottom = new DummyStorageLink;
@@ -283,7 +288,7 @@ TEST_F(MergeThrottlerTest, chain) {
auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123);
cmd->setPriority(7);
cmd->setTimeout(54321ms);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
@@ -423,7 +428,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
_topLinks[0]->sendDown(cmd);
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -468,7 +473,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) {
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[1]->sendDown(cmd);
_bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -503,7 +508,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[1]->sendDown(cmd);
_bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -550,7 +555,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) {
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
// Send to last node, which is not the lowest index
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 3));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 3));
_topLinks[2]->sendDown(cmd);
_bottomLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -595,7 +600,7 @@ TEST_F(MergeThrottlerTest, resend_handling) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[0]->sendDown(cmd);
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -962,7 +967,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 9));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 9));
_topLinks[0]->sendDown(cmd);
// First, test that we get rejected when processing merge immediately
@@ -1145,7 +1150,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
std::vector<uint16_t> chain({0});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[0]->sendDown(cmd);
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
index 84a98385962..edb13eea5af 100644
--- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -1,11 +1,12 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <vespa/storage/storageserver/service_layer_error_listener.h>
+#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/storage/storageserver/mergethrottler.h>
+#include <vespa/storage/storageserver/service_layer_error_listener.h>
#include <vespa/storageframework/defaultimplementation/component/componentregisterimpl.h>
#include <vespa/vdstestlib/config/dirconfig.h>
-#include <tests/common/testhelper.h>
-#include <tests/common/teststorageapp.h>
#include <vespa/vespalib/gtest/gtest.h>
using namespace ::testing;
@@ -34,10 +35,12 @@ private:
};
struct Fixture {
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
+
vdstestlib::DirConfig config{getStandardConfig(true)};
TestServiceLayerApp app;
ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
- MergeThrottler merge_throttler{config::ConfigUri(config.getConfigId()), app.getComponentRegister()};
+ MergeThrottler merge_throttler{*config_from<StorServerConfig>(config::ConfigUri(config.getConfigId())), app.getComponentRegister()};
TestShutdownListener shutdown_listener;
ServiceLayerErrorListener error_listener{component, merge_throttler};
diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp
index 2b7039f36ea..4fb857f9e67 100644
--- a/storage/src/tests/visiting/visitormanagertest.cpp
+++ b/storage/src/tests/visiting/visitormanagertest.cpp
@@ -1,5 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/storageapi/message/datagram.h>
@@ -88,14 +89,18 @@ VisitorManagerTest::initializeTest(bool defer_manager_thread_start)
_node->setupDummyPersistence();
_node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1"));
_top = std::make_unique<DummyStorageLink>();
- auto vm = std::make_unique<VisitorManager>(config::ConfigUri(config.getConfigId()),
+ using vespa::config::content::core::StorVisitorConfig;
+ auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId()));
+ auto vm = std::make_unique<VisitorManager>(*bootstrap_cfg,
_node->getComponentRegister(),
*_messageSessionFactory,
VisitorFactory::Map(),
defer_manager_thread_start);
_manager = vm.get();
_top->push_back(std::move(vm));
- _top->push_back(std::make_unique<FileStorManager>(config::ConfigUri(config.getConfigId()), _node->getPersistenceProvider(),
+ using vespa::config::content::StorFilestorConfig;
+ auto filestor_cfg = config_from<StorFilestorConfig>(config::ConfigUri(config.getConfigId()));
+ _top->push_back(std::make_unique<FileStorManager>(*filestor_cfg, _node->getPersistenceProvider(),
_node->getComponentRegister(), *_node, _node->get_host_info()));
_manager->setTimeBetweenTicks(10);
_top->open();
diff --git a/storage/src/tests/visiting/visitortest.cpp b/storage/src/tests/visiting/visitortest.cpp
index 49f1bc778fc..f83b6c99d64 100644
--- a/storage/src/tests/visiting/visitortest.cpp
+++ b/storage/src/tests/visiting/visitortest.cpp
@@ -1,6 +1,7 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include <vespa/config/common/exceptions.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/fieldvalue/intfieldvalue.h>
#include <vespa/document/fieldvalue/stringfieldvalue.h>
#include <vespa/document/datatype/documenttype.h>
@@ -168,9 +169,10 @@ VisitorTest::initializeTest(const TestParams& params)
}
_node = std::make_unique<TestServiceLayerApp>(config.getConfigId());
_top = std::make_unique<DummyStorageLink>();
+ using vespa::config::content::core::StorVisitorConfig;
+ auto bootstrap_cfg = config_from<StorVisitorConfig>(config::ConfigUri(config.getConfigId()));
_top->push_back(std::unique_ptr<StorageLink>(_manager
- = new VisitorManager(config::ConfigUri(config.getConfigId()),
- _node->getComponentRegister(), *_messageSessionFactory)));
+ = new VisitorManager(*bootstrap_cfg, _node->getComponentRegister(), *_messageSessionFactory)));
_bottom = new DummyStorageLink();
_top->push_back(std::unique_ptr<StorageLink>(_bottom));
_manager->setTimeBetweenTicks(10);
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
index 8c24a7dfd4e..d12a9f72ac1 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.cpp
@@ -33,10 +33,9 @@ using namespace std::chrono_literals;
namespace storage {
-BucketManager::BucketManager(const config::ConfigUri & configUri, ServiceLayerComponentRegister& compReg)
+BucketManager::BucketManager(const StorServerConfig& bootstrap_config, ServiceLayerComponentRegister& compReg)
: StorageLink("Bucket manager"),
framework::StatusReporter("bucketdb", "Bucket database"),
- _configUri(configUri),
_workerLock(),
_workerCond(),
_clusterStateLock(),
@@ -60,8 +59,7 @@ BucketManager::BucketManager(const config::ConfigUri & configUri, ServiceLayerCo
ns.setMinUsedBits(58);
_component.getStateUpdater().setReportedNodeState(ns);
- auto server_config = config::ConfigGetter<vespa::config::content::core::StorServerConfig>::getConfig(configUri.getConfigId(), configUri.getContext());
- _simulated_processing_delay = std::chrono::milliseconds(std::max(0, server_config->simulatedBucketRequestLatencyMsec));
+ _simulated_processing_delay = std::chrono::milliseconds(std::max(0, bootstrap_config.simulatedBucketRequestLatencyMsec));
}
BucketManager::~BucketManager()
@@ -414,9 +412,7 @@ BucketManager::dump(std::ostream& out) const
void BucketManager::onOpen()
{
- if (!_configUri.empty()) {
- startWorkerThread();
- }
+ startWorkerThread();
}
void BucketManager::startWorkerThread()
diff --git a/storage/src/vespa/storage/bucketdb/bucketmanager.h b/storage/src/vespa/storage/bucketdb/bucketmanager.h
index 76d9123a519..35ccab843a9 100644
--- a/storage/src/vespa/storage/bucketdb/bucketmanager.h
+++ b/storage/src/vespa/storage/bucketdb/bucketmanager.h
@@ -10,10 +10,10 @@
#include "bucketmanagermetrics.h"
#include "storbucketdb.h"
#include <vespa/config/subscription/configuri.h>
-#include <vespa/storage/bucketdb/config-stor-bucketdb.h>
#include <vespa/storage/common/servicelayercomponent.h>
#include <vespa/storage/common/storagelinkqueued.h>
#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageframework/generic/metric/metricupdatehook.h>
#include <vespa/storageframework/generic/status/statusreporter.h>
@@ -34,6 +34,7 @@ class BucketManager : public StorageLink,
private framework::MetricUpdateHook
{
public:
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
/** Type used for message queues */
using BucketInfoRequestList = std::list<std::shared_ptr<api::RequestBucketInfoCommand>>;
using BucketInfoRequestMap = std::unordered_map<document::BucketSpace, BucketInfoRequestList, document::BucketSpace::hash>;
@@ -41,7 +42,6 @@ public:
private:
using ReplyQueue = std::vector<api::StorageReply::SP>;
using ConflictingBuckets = std::unordered_set<document::BucketId, document::BucketId::hash>;
- config::ConfigUri _configUri;
BucketInfoRequestMap _bucketInfoRequests;
/**
@@ -82,7 +82,7 @@ private:
};
public:
- BucketManager(const config::ConfigUri&, ServiceLayerComponentRegister&);
+ BucketManager(const StorServerConfig& bootstrap_config, ServiceLayerComponentRegister&);
BucketManager(const BucketManager&) = delete;
BucketManager& operator=(const BucketManager&) = delete;
~BucketManager() override;
diff --git a/storage/src/vespa/storage/common/visitorfactory.h b/storage/src/vespa/storage/common/visitorfactory.h
index 393744c4f59..1f18ace3095 100644
--- a/storage/src/vespa/storage/common/visitorfactory.h
+++ b/storage/src/vespa/storage/common/visitorfactory.h
@@ -12,6 +12,7 @@
namespace storage {
+class StorageComponent;
class Visitor;
class VisitorEnvironment {
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 97be1a510c4..6c49fcd0878 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -62,7 +62,7 @@ private:
}
FileStorManager::
-FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& provider,
+FileStorManager(const StorFilestorConfig& bootstrap_config, spi::PersistenceProvider& provider,
ServiceLayerComponentRegister& compReg, DoneInitializeHandler& init_handler,
HostInfo& hostInfoReporterRegistrar)
: StorageLinkQueued("File store manager", compReg),
@@ -75,7 +75,6 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p
_persistenceHandlers(),
_threads(),
_bucketOwnershipNotifier(std::make_unique<BucketOwnershipNotifier>(_component, *this)),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
_use_async_message_handling_on_schedule(false),
_metrics(std::make_unique<FileStorMetrics>()),
_filestorHandler(),
@@ -85,8 +84,7 @@ FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& p
_host_info_reporter(_component.getStateUpdater()),
_resource_usage_listener_registration(provider.register_resource_usage_listener(_host_info_reporter))
{
- _configFetcher->subscribe(configUri.getConfigId(), this);
- _configFetcher->start();
+ on_configure(bootstrap_config);
_component.registerMetric(*_metrics);
_component.registerStatusPage(*this);
_component.getStateUpdater().addStateListener(*this);
@@ -207,19 +205,19 @@ FileStorManager::getThreadLocalHandler() {
}
void
-FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
+FileStorManager::on_configure(const StorFilestorConfig& config)
{
// If true, this is not the first configure.
const bool liveUpdate = ! _threads.empty();
- _use_async_message_handling_on_schedule = config->useAsyncMessageHandlingOnSchedule;
- _host_info_reporter.set_noise_level(config->resourceUsageReporterNoiseLevel);
- const bool use_dynamic_throttling = ((config->asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
- (config->asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
- const bool throttle_merge_feed_ops = config->asyncOperationThrottler.throttleIndividualMergeFeedOps;
+ _use_async_message_handling_on_schedule = config.useAsyncMessageHandlingOnSchedule;
+ _host_info_reporter.set_noise_level(config.resourceUsageReporterNoiseLevel);
+ const bool use_dynamic_throttling = ((config.asyncOperationThrottlerType == StorFilestorConfig::AsyncOperationThrottlerType::DYNAMIC) ||
+ (config.asyncOperationThrottler.type == StorFilestorConfig::AsyncOperationThrottler::Type::DYNAMIC));
+ const bool throttle_merge_feed_ops = config.asyncOperationThrottler.throttleIndividualMergeFeedOps;
if (!liveUpdate) {
- _config = std::move(config);
+ _config = std::make_unique<StorFilestorConfig>(config);
uint32_t numThreads = std::max(1, _config->numThreads);
uint32_t numStripes = std::max(1u, numThreads / 2);
_metrics->initDiskMetrics(numStripes, computeAllPossibleHandlerThreads(*_config));
@@ -240,7 +238,7 @@ FileStorManager::configure(std::unique_ptr<StorFilestorConfig> config)
_bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this));
} else {
assert(_filestorHandler);
- auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(*config, _threads.size());
+ auto updated_dyn_throttle_params = dynamic_throttle_params_from_config(config, _threads.size());
_filestorHandler->reconfigure_dynamic_throttler(updated_dyn_throttle_params);
}
// TODO remove once desired dynamic throttling behavior is set in stone
@@ -828,8 +826,6 @@ void FileStorManager::onClose()
LOG(debug, "Start closing");
_bucketExecutorRegistration.reset();
_resource_usage_listener_registration.reset();
- // Avoid getting config during shutdown
- _configFetcher->close();
LOG(debug, "Closed _configFetcher.");
_filestorHandler->close();
LOG(debug, "Closed _filestorHandler.");
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
index 68491ab1e38..96cff8dfeee 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h
@@ -1,10 +1,4 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * @class storage::FileStorManager
- * @ingroup filestorage
- *
- * @version $Id$
- */
#pragma once
@@ -42,7 +36,6 @@ namespace spi { struct PersistenceProvider; }
class ContentBucketSpace;
struct FileStorManagerTest;
-class ReadBucketList;
class BucketOwnershipNotifier;
class AbortBucketOperationsCommand;
struct DoneInitializeHandler;
@@ -54,10 +47,11 @@ class ProviderErrorWrapper;
class FileStorManager : public StorageLinkQueued,
public framework::HtmlStatusReporter,
public StateListener,
- private config::IFetcherCallback<vespa::config::content::StorFilestorConfig>,
public MessageSender,
public spi::BucketExecutor
{
+ using StorFilestorConfig = vespa::config::content::StorFilestorConfig;
+
ServiceLayerComponentRegister & _compReg;
ServiceLayerComponent _component;
std::unique_ptr<spi::PersistenceProvider> _provider;
@@ -69,7 +63,6 @@ class FileStorManager : public StorageLinkQueued,
std::unique_ptr<BucketOwnershipNotifier> _bucketOwnershipNotifier;
std::unique_ptr<vespa::config::content::StorFilestorConfig> _config;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
bool _use_async_message_handling_on_schedule;
std::shared_ptr<FileStorMetrics> _metrics;
std::unique_ptr<FileStorHandler> _filestorHandler;
@@ -82,7 +75,7 @@ class FileStorManager : public StorageLinkQueued,
std::unique_ptr<vespalib::IDestructorCallback> _resource_usage_listener_registration;
public:
- FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&,
+ FileStorManager(const StorFilestorConfig&, spi::PersistenceProvider&,
ServiceLayerComponentRegister&, DoneInitializeHandler&, HostInfo&);
FileStorManager(const FileStorManager &) = delete;
FileStorManager& operator=(const FileStorManager &) = delete;
@@ -118,8 +111,9 @@ public:
const FileStorMetrics& get_metrics() const { return *_metrics; }
+ void on_configure(const vespa::config::content::StorFilestorConfig& config);
+
private:
- void configure(std::unique_ptr<vespa::config::content::StorFilestorConfig> config) override;
PersistenceHandler & createRegisteredHandler(const ServiceLayerComponent & component);
VESPA_DLL_LOCAL PersistenceHandler & getThreadLocalHandler();
diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp
index e1c6465b332..9558429bf13 100644
--- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.cpp
@@ -46,12 +46,11 @@ ModifiedBucketChecker::BucketIdListResult::reset(document::BucketSpace bucketSpa
ModifiedBucketChecker::ModifiedBucketChecker(
ServiceLayerComponentRegister& compReg,
spi::PersistenceProvider& provider,
- const config::ConfigUri& configUri)
+ const StorServerConfig& bootstrap_config)
: StorageLink("Modified bucket checker"),
_provider(provider),
_component(),
_thread(),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
_monitor(),
_stateLock(),
_bucketSpaces(),
@@ -60,8 +59,7 @@ ModifiedBucketChecker::ModifiedBucketChecker(
_maxPendingChunkSize(100),
_singleThreadMode(false)
{
- _configFetcher->subscribe<vespa::config::content::core::StorServerConfig>(configUri.getConfigId(), this);
- _configFetcher->start();
+ on_configure(bootstrap_config);
std::ostringstream threadName;
threadName << "Modified bucket checker " << static_cast<void*>(this);
@@ -75,15 +73,14 @@ ModifiedBucketChecker::~ModifiedBucketChecker()
}
void
-ModifiedBucketChecker::configure(
- std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig)
+ModifiedBucketChecker::on_configure(const vespa::config::content::core::StorServerConfig& newConfig)
{
std::lock_guard lock(_stateLock);
- if (newConfig->bucketRecheckingChunkSize < 1) {
+ if (newConfig.bucketRecheckingChunkSize < 1) {
throw config::InvalidConfigException(
"Cannot have bucket rechecking chunk size of less than 1");
}
- _maxPendingChunkSize = newConfig->bucketRecheckingChunkSize;
+ _maxPendingChunkSize = newConfig.bucketRecheckingChunkSize;
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h
index 18f03da7469..9f0111b32f9 100644
--- a/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h
+++ b/storage/src/vespa/storage/persistence/filestorage/modifiedbucketchecker.h
@@ -23,19 +23,18 @@ namespace spi { struct PersistenceProvider; }
class ModifiedBucketChecker
: public StorageLink,
public framework::Runnable,
- public Types,
- private config::IFetcherCallback<
- vespa::config::content::core::StorServerConfig>
+ public Types
{
public:
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
using UP = std::unique_ptr<ModifiedBucketChecker>;
ModifiedBucketChecker(ServiceLayerComponentRegister& compReg,
spi::PersistenceProvider& provide,
- const config::ConfigUri& configUri);
+ const StorServerConfig& bootstrap_config);
~ModifiedBucketChecker() override;
- void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig>) override;
+ void on_configure(const vespa::config::content::core::StorServerConfig&);
void run(framework::ThreadHandle& thread) override;
bool tick();
@@ -88,7 +87,6 @@ private:
spi::PersistenceProvider & _provider;
ServiceLayerComponent::UP _component;
std::unique_ptr<framework::Thread> _thread;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
std::mutex _monitor;
std::condition_variable _cond;
std::mutex _stateLock;
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
index 34040bb12c0..25829f3d391 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.cpp
@@ -22,12 +22,11 @@ LOG_SETUP(".bucketownershiphandler");
namespace storage {
ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler(
- const config::ConfigUri& configUri,
+ const PersistenceConfig& bootstrap_config,
ServiceLayerComponentRegister& compReg)
: StorageLink("Changed bucket ownership handler"),
_component(compReg, "changedbucketownershiphandler"),
_metrics(),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
_state_sync_executor(1), // single thread for sequential task execution
_stateLock(),
_currentState(), // Not set yet, so ownership will not be valid
@@ -37,25 +36,23 @@ ChangedBucketOwnershipHandler::ChangedBucketOwnershipHandler(
_abortMutatingIdealStateOps(false),
_abortMutatingExternalLoadOps(false)
{
- _configFetcher->subscribe<vespa::config::content::PersistenceConfig>(configUri.getConfigId(), this);
- _configFetcher->start();
+ on_configure(bootstrap_config);
_component.registerMetric(_metrics);
}
ChangedBucketOwnershipHandler::~ChangedBucketOwnershipHandler() = default;
void
-ChangedBucketOwnershipHandler::configure(
- std::unique_ptr<vespa::config::content::PersistenceConfig> config)
+ChangedBucketOwnershipHandler::on_configure(const vespa::config::content::PersistenceConfig& config)
{
_abortQueuedAndPendingOnStateChange.store(
- config->abortOperationsWithChangedBucketOwnership,
+ config.abortOperationsWithChangedBucketOwnership,
std::memory_order_relaxed);
_abortMutatingIdealStateOps.store(
- config->abortOutdatedMutatingIdealStateOps,
+ config.abortOutdatedMutatingIdealStateOps,
std::memory_order_relaxed);
_abortMutatingExternalLoadOps.store(
- config->abortOutdatedMutatingExternalLoadOps,
+ config.abortOutdatedMutatingExternalLoadOps,
std::memory_order_relaxed);
}
diff --git a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
index 3e20eb507f6..801534385f7 100644
--- a/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
+++ b/storage/src/vespa/storage/storageserver/changedbucketownershiphandler.h
@@ -56,10 +56,7 @@ namespace lib {
* - RemoveCommand
* - RevertCommand
*/
-class ChangedBucketOwnershipHandler
- : public StorageLink,
- private config::IFetcherCallback<vespa::config::content::PersistenceConfig>
-{
+class ChangedBucketOwnershipHandler : public StorageLink {
public:
class Metrics : public metrics::MetricSet {
public:
@@ -115,12 +112,11 @@ public:
private:
class ClusterStateSyncAndApplyTask;
- using ConfigFetcherUP = std::unique_ptr<config::ConfigFetcher>;
+ using PersistenceConfig = vespa::config::content::PersistenceConfig;
using ClusterStateBundleCSP = std::shared_ptr<const lib::ClusterStateBundle>;
ServiceLayerComponent _component;
Metrics _metrics;
- ConfigFetcherUP _configFetcher;
vespalib::ThreadStackExecutor _state_sync_executor;
mutable std::mutex _stateLock;
ClusterStateBundleCSP _currentState;
@@ -185,7 +181,7 @@ private:
bool enabledExternalLoadAborting() const;
public:
- ChangedBucketOwnershipHandler(const config::ConfigUri& configUri,
+ ChangedBucketOwnershipHandler(const PersistenceConfig& bootstrap_config,
ServiceLayerComponentRegister& compReg);
~ChangedBucketOwnershipHandler() override;
@@ -194,7 +190,7 @@ public:
bool onInternalReply(const std::shared_ptr<api::InternalReply>& reply) override;
void onClose() override;
- void configure(std::unique_ptr<vespa::config::content::PersistenceConfig>) override;
+ void on_configure(const PersistenceConfig&);
/**
* We want to ensure distribution config changes are thread safe wrt. our
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index ef7579f8085..10ee8023621 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -84,8 +84,6 @@ void
DistributorNode::createChain(IStorageChainBuilder &builder)
{
DistributorComponentRegister& dcr(_context.getComponentRegister());
- // TODO: All components in this chain should use a common thread instead of
- // each having its own configfetcher.
StorageLink::UP chain;
if (_retrievedCommunicationManager) {
builder.add(std::move(_retrievedCommunicationManager));
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 6e2f2a77d20..4cc2a7a89ab 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -179,7 +179,7 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
}
MergeThrottler::MergeThrottler(
- const config::ConfigUri & configUri,
+ const StorServerConfig& bootstrap_config,
StorageComponentRegister& compReg)
: StorageLink("Merge Throttler"),
framework::HtmlStatusReporter("merges", "Merge Throttler"),
@@ -190,7 +190,6 @@ MergeThrottler::MergeThrottler(
_queueSequence(0),
_messageLock(),
_stateLock(),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
_metrics(std::make_unique<Metrics>()),
_component(compReg, "mergethrottler"),
_thread(),
@@ -203,34 +202,33 @@ MergeThrottler::MergeThrottler(
{
_throttlePolicy->setMinWindowSize(20);
_throttlePolicy->setMaxWindowSize(20);
- _configFetcher->subscribe<StorServerConfig>(configUri.getConfigId(), this);
- _configFetcher->start();
+ on_configure(bootstrap_config);
_component.registerStatusPage(*this);
_component.registerMetric(*_metrics);
}
void
-MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig)
+MergeThrottler::on_configure(const StorServerConfig& new_config)
{
std::lock_guard lock(_stateLock);
- _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type
+ _use_dynamic_throttling = (new_config.mergeThrottlingPolicy.type
== StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC);
- if (newConfig->maxMergesPerNode < 1) {
+ if (new_config.maxMergesPerNode < 1) {
throw config::InvalidConfigException("Cannot have a max merge count of less than 1");
}
- if (newConfig->maxMergeQueueSize < 0) {
+ if (new_config.maxMergeQueueSize < 0) {
throw config::InvalidConfigException("Max merge queue size cannot be less than 0");
}
- if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
+ if (new_config.resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0");
}
if (_use_dynamic_throttling) {
- auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1);
- auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1);
+ auto min_win_sz = std::max(new_config.mergeThrottlingPolicy.minWindowSize, 1);
+ auto max_win_sz = std::max(new_config.mergeThrottlingPolicy.maxWindowSize, 1);
if (min_win_sz > max_win_sz) {
min_win_sz = max_win_sz;
}
- auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement);
+ auto win_sz_increment = std::max(1.0, new_config.mergeThrottlingPolicy.windowSizeIncrement);
_throttlePolicy->setMinWindowSize(min_win_sz);
_throttlePolicy->setMaxWindowSize(max_win_sz);
_throttlePolicy->setWindowSizeIncrement(win_sz_increment);
@@ -238,15 +236,14 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
min_win_sz, max_win_sz, win_sz_increment);
} else {
// Use legacy config values when static throttling is enabled.
- _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode);
- _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode);
+ _throttlePolicy->setMinWindowSize(new_config.maxMergesPerNode);
+ _throttlePolicy->setMaxWindowSize(new_config.maxMergesPerNode);
}
- LOG(debug, "Setting new max queue size to %d",
- newConfig->maxMergeQueueSize);
- _maxQueueSize = newConfig->maxMergeQueueSize;
+ LOG(debug, "Setting new max queue size to %d", new_config.maxMergeQueueSize);
+ _maxQueueSize = new_config.maxMergeQueueSize;
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
- std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs));
- _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges;
+ std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
+ _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
}
MergeThrottler::~MergeThrottler()
@@ -275,8 +272,6 @@ MergeThrottler::onOpen()
void
MergeThrottler::onClose()
{
- // Avoid getting config on shutdown
- _configFetcher->close();
{
std::lock_guard guard(_messageLock);
// Note: used to prevent taking locks in different order if onFlush
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 840f1df1177..5362c2f6df8 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -35,10 +35,11 @@ class AbortBucketOperationsCommand;
class MergeThrottler : public framework::Runnable,
public StorageLink,
- public framework::HtmlStatusReporter,
- private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>
+ public framework::HtmlStatusReporter
{
public:
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
+
class MergeFailureMetrics : public metrics::MetricSet {
public:
metrics::SumMetric<metrics::LongCountMetric> sum;
@@ -172,7 +173,6 @@ private:
mutable std::mutex _messageLock;
std::condition_variable _messageCond;
mutable std::mutex _stateLock;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
// Messages pending to be processed by the worker thread
std::vector<api::StorageMessage::SP> _messagesDown;
std::vector<api::StorageMessage::SP> _messagesUp;
@@ -190,7 +190,7 @@ public:
* windowSizeIncrement used for allowing unit tests to start out with more
* than 1 as their window size.
*/
- MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&);
+ MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&);
~MergeThrottler() override;
/** Implements document::Runnable::run */
@@ -204,6 +204,8 @@ public:
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override;
+ void on_configure(const StorServerConfig& new_config);
+
/*
* When invoked, merges to the node will be BUSY-bounced by the throttler
* for a configurable period of time instead of being processed.
@@ -282,11 +284,6 @@ private:
[[nodiscard]] bool isChainCompleted() const noexcept;
};
- /**
- * Callback method for config system (IFetcherCallback)
- */
- void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) override;
-
// NOTE: unless explicitly specified, all the below functions require
// _sync lock to be held upon call (usually implicitly via MessageGuard)
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 3faa8a3ddec..0cce2c27e95 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -24,19 +24,33 @@ LOG_SETUP(".node.servicelayer");
namespace storage {
+ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs() = default;
+ServiceLayerNode::ServiceLayerBootstrapConfigs::~ServiceLayerBootstrapConfigs() = default;
+ServiceLayerNode::ServiceLayerBootstrapConfigs::ServiceLayerBootstrapConfigs(ServiceLayerBootstrapConfigs&&) noexcept = default;
+ServiceLayerNode::ServiceLayerBootstrapConfigs&
+ServiceLayerNode::ServiceLayerBootstrapConfigs::operator=(ServiceLayerBootstrapConfigs&&) noexcept = default;
+
ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
ServiceLayerNodeContext& context,
- BootstrapConfigs bootstrap_configs,
+ ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
const VisitorFactory::Map& externalVisitors)
- : StorageNode(configUri, context, std::move(bootstrap_configs), generationFetcher, std::make_unique<HostInfo>()),
+ : StorageNode(configUri, context, std::move(bootstrap_configs.storage_bootstrap_configs),
+ generationFetcher, std::make_unique<HostInfo>()),
_context(context),
_persistenceProvider(persistenceProvider),
_externalVisitors(externalVisitors),
+ _persistence_bootstrap_config(std::move(bootstrap_configs.persistence_cfg)),
+ _visitor_bootstrap_config(std::move(bootstrap_configs.visitor_cfg)),
+ _filestor_bootstrap_config(std::move(bootstrap_configs.filestor_cfg)),
_bouncer(nullptr),
_bucket_manager(nullptr),
+ _changed_bucket_ownership_handler(nullptr),
_fileStorManager(nullptr),
+ _merge_throttler(nullptr),
+ _visitor_manager(nullptr),
+ _modified_bucket_checker(nullptr),
_init_has_been_called(false)
{
}
@@ -158,18 +172,25 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config());
_bouncer = bouncer.get();
builder.add(std::move(bouncer));
- auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg);
- auto merge_throttler = merge_throttler_up.get();
+ auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg);
+ _merge_throttler = merge_throttler_up.get();
builder.add(std::move(merge_throttler_up));
- builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg));
- auto bucket_manager = std::make_unique<BucketManager>(_configUri, _context.getComponentRegister());
+ auto bucket_ownership_handler = std::make_unique<ChangedBucketOwnershipHandler>(*_persistence_bootstrap_config, compReg);
+ _changed_bucket_ownership_handler = bucket_ownership_handler.get();
+ builder.add(std::move(bucket_ownership_handler));
+ auto bucket_manager = std::make_unique<BucketManager>(server_config(), _context.getComponentRegister());
_bucket_manager = bucket_manager.get();
builder.add(std::move(bucket_manager));
- builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(),
- static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors));
- builder.add(std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, _configUri));
+ auto visitor_manager = std::make_unique<VisitorManager>(*_visitor_bootstrap_config, _context.getComponentRegister(),
+ static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors);
+ _visitor_manager = visitor_manager.get();
+ builder.add(std::move(visitor_manager));
+ auto bucket_checker = std::make_unique<ModifiedBucketChecker>(_context.getComponentRegister(), _persistenceProvider, server_config());
+ _modified_bucket_checker = bucket_checker.get();
+ builder.add(std::move(bucket_checker));
auto state_manager = releaseStateManager();
- auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _persistenceProvider, _context.getComponentRegister(),
+ auto filstor_manager = std::make_unique<FileStorManager>(*_filestor_bootstrap_config, _persistenceProvider,
+ _context.getComponentRegister(),
getDoneInitializeHandler(), state_manager->getHostInfo());
_fileStorManager = filstor_manager.get();
builder.add(std::move(filstor_manager));
@@ -178,8 +199,43 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
// Lifetimes of all referenced components shall outlive the last call going
// through the SPI, as queues are flushed and worker threads joined when
// the storage link chain is closed prior to destruction.
- auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler);
+ auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *_merge_throttler);
_fileStorManager->error_wrapper().register_error_listener(std::move(error_listener));
+
+ // Purge config no longer needed
+ _persistence_bootstrap_config.reset();
+ _visitor_bootstrap_config.reset();
+ _filestor_bootstrap_config.reset();
+}
+
+void
+ServiceLayerNode::on_configure(const StorServerConfig& config)
+{
+ assert(_merge_throttler);
+ _merge_throttler->on_configure(config);
+ assert(_modified_bucket_checker);
+ _modified_bucket_checker->on_configure(config);
+}
+
+void
+ServiceLayerNode::on_configure(const PersistenceConfig& config)
+{
+ assert(_changed_bucket_ownership_handler);
+ _changed_bucket_ownership_handler->on_configure(config);
+}
+
+void
+ServiceLayerNode::on_configure(const StorVisitorConfig& config)
+{
+ assert(_visitor_manager);
+ _visitor_manager->on_configure(config);
+}
+
+void
+ServiceLayerNode::on_configure(const StorFilestorConfig& config)
+{
+ assert(_fileStorManager);
+ _fileStorManager->on_configure(config);
}
ResumeGuard
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index dabc1c979a5..ae39bb0805e 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -1,10 +1,4 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class storage::ServiceLayerNode
- * \ingroup storageserver
- *
- * \brief Class for setting up a service layer node.
- */
#pragma once
@@ -12,9 +6,12 @@
#include "servicelayernodecontext.h"
#include "storagenode.h"
#include "vespa/vespalib/util/jsonstream.h"
-#include <vespa/storage/visiting/visitormessagesessionfactory.h>
-#include <vespa/storage/common/visitorfactory.h>
+#include <vespa/config-persistence.h>
+#include <vespa/config-stor-filestor.h>
#include <vespa/storage/common/nodestateupdater.h>
+#include <vespa/storage/common/visitorfactory.h>
+#include <vespa/storage/visiting/config-stor-visitor.h>
+#include <vespa/storage/visiting/visitormessagesessionfactory.h>
namespace storage {
@@ -22,7 +19,11 @@ namespace spi { struct PersistenceProvider; }
class Bouncer;
class BucketManager;
+class ChangedBucketOwnershipHandler;
class FileStorManager;
+class MergeThrottler;
+class ModifiedBucketChecker;
+class VisitorManager;
class ServiceLayerNode
: public StorageNode,
@@ -30,21 +31,44 @@ class ServiceLayerNode
private NodeStateReporter
{
- ServiceLayerNodeContext& _context;
- spi::PersistenceProvider& _persistenceProvider;
- VisitorFactory::Map _externalVisitors;
-
- Bouncer* _bouncer;
- BucketManager* _bucket_manager;
- FileStorManager* _fileStorManager;
- bool _init_has_been_called;
+public:
+ using PersistenceConfig = vespa::config::content::PersistenceConfig;
+ using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig;
+ using StorFilestorConfig = vespa::config::content::StorFilestorConfig;
+private:
+ ServiceLayerNodeContext& _context;
+ spi::PersistenceProvider& _persistenceProvider;
+ VisitorFactory::Map _externalVisitors;
+ std::unique_ptr<PersistenceConfig> _persistence_bootstrap_config;
+ std::unique_ptr<StorVisitorConfig> _visitor_bootstrap_config;
+ std::unique_ptr<StorFilestorConfig> _filestor_bootstrap_config;
+ Bouncer* _bouncer;
+ BucketManager* _bucket_manager;
+ ChangedBucketOwnershipHandler* _changed_bucket_ownership_handler;
+ FileStorManager* _fileStorManager;
+ MergeThrottler* _merge_throttler;
+ VisitorManager* _visitor_manager;
+ ModifiedBucketChecker* _modified_bucket_checker;
+ bool _init_has_been_called;
public:
using UP = std::unique_ptr<ServiceLayerNode>;
+ struct ServiceLayerBootstrapConfigs {
+ BootstrapConfigs storage_bootstrap_configs;
+ std::unique_ptr<PersistenceConfig> persistence_cfg;
+ std::unique_ptr<StorVisitorConfig> visitor_cfg;
+ std::unique_ptr<StorFilestorConfig> filestor_cfg;
+
+ ServiceLayerBootstrapConfigs();
+ ~ServiceLayerBootstrapConfigs();
+ ServiceLayerBootstrapConfigs(ServiceLayerBootstrapConfigs&&) noexcept;
+ ServiceLayerBootstrapConfigs& operator=(ServiceLayerBootstrapConfigs&&) noexcept;
+ };
+
ServiceLayerNode(const config::ConfigUri & configUri,
ServiceLayerNodeContext& context,
- BootstrapConfigs bootstrap_configs,
+ ServiceLayerBootstrapConfigs bootstrap_configs,
ApplicationGenerationFetcher& generationFetcher,
spi::PersistenceProvider& persistenceProvider,
const VisitorFactory::Map& externalVisitors);
@@ -54,6 +78,11 @@ public:
*/
void init();
+ void on_configure(const StorServerConfig& config);
+ void on_configure(const PersistenceConfig& config);
+ void on_configure(const StorVisitorConfig& config);
+ void on_configure(const StorFilestorConfig& config);
+
const lib::NodeType& getNodeType() const override { return lib::NodeType::STORAGE; }
ResumeGuard pause() override;
diff --git a/storage/src/vespa/storage/visiting/visitormanager.cpp b/storage/src/vespa/storage/visiting/visitormanager.cpp
index 1c26923b15f..dc1635bc4b1 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.cpp
+++ b/storage/src/vespa/storage/visiting/visitormanager.cpp
@@ -21,7 +21,7 @@ LOG_SETUP(".visitor.manager");
namespace storage {
-VisitorManager::VisitorManager(const config::ConfigUri & configUri,
+VisitorManager::VisitorManager(const StorVisitorConfig& bootstrap_config,
StorageComponentRegister& componentRegister,
VisitorMessageSessionFactory& messageSF,
VisitorFactory::Map externalFactories,
@@ -35,7 +35,6 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
_visitorLock(),
_visitorCond(),
_visitorCounter(0),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
_metrics(std::make_shared<VisitorMetrics>()),
_maxFixedConcurrentVisitors(1),
_maxVariableConcurrentVisitors(0),
@@ -51,8 +50,7 @@ VisitorManager::VisitorManager(const config::ConfigUri & configUri,
_enforceQueueUse(false),
_visitorFactories(std::move(externalFactories))
{
- _configFetcher->subscribe<vespa::config::content::core::StorVisitorConfig>(configUri.getConfigId(), this);
- _configFetcher->start();
+ on_configure(bootstrap_config);
_component.registerMetric(*_metrics);
if (!defer_manager_thread_start) {
create_and_start_manager_thread();
@@ -94,8 +92,6 @@ VisitorManager::updateMetrics(const MetricLockGuard &)
void
VisitorManager::onClose()
{
- // Avoid getting config during shutdown
- _configFetcher->close();
{
std::lock_guard sync(_visitorLock);
for (auto& enqueued : _visitorQueue) {
@@ -118,25 +114,25 @@ VisitorManager::print(std::ostream& out, bool verbose, const std::string& indent
}
void
-VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisitorConfig> config)
+VisitorManager::on_configure(const vespa::config::content::core::StorVisitorConfig& config)
{
std::lock_guard sync(_visitorLock);
- if (config->defaultdocblocksize % 512 != 0) {
+ if (config.defaultdocblocksize % 512 != 0) {
throw config::InvalidConfigException(
- "The default docblock size needs to be a multiplum of the "
+ "The default docblock size needs to be a multiple of the "
"disk block size. (512b)");
}
// Do some sanity checking of input. Cannot haphazardly mix and match
// old and new max concurrency config values
- if (config->maxconcurrentvisitors == 0
- && config->maxconcurrentvisitorsFixed == 0)
+ if (config.maxconcurrentvisitors == 0
+ && config.maxconcurrentvisitorsFixed == 0)
{
throw config::InvalidConfigException(
"Maximum concurrent visitor count cannot be 0.");
}
- else if (config->maxconcurrentvisitorsFixed == 0
- && config->maxconcurrentvisitorsVariable != 0)
+ else if (config.maxconcurrentvisitorsFixed == 0
+ && config.maxconcurrentvisitorsVariable != 0)
{
throw config::InvalidConfigException(
"Cannot specify 'variable' parameter for max concurrent "
@@ -147,21 +143,21 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
uint32_t maxConcurrentVisitorsVariable;
// Concurrency parameter fixed takes precedence over legacy maxconcurrent
- if (config->maxconcurrentvisitorsFixed > 0) {
- maxConcurrentVisitorsFixed = config->maxconcurrentvisitorsFixed;
- maxConcurrentVisitorsVariable = config->maxconcurrentvisitorsVariable;
+ if (config.maxconcurrentvisitorsFixed > 0) {
+ maxConcurrentVisitorsFixed = config.maxconcurrentvisitorsFixed;
+ maxConcurrentVisitorsVariable = config.maxconcurrentvisitorsVariable;
} else {
- maxConcurrentVisitorsFixed = config->maxconcurrentvisitors;
+ maxConcurrentVisitorsFixed = config.maxconcurrentvisitors;
maxConcurrentVisitorsVariable = 0;
}
bool liveUpdate = !_visitorThread.empty();
if (liveUpdate) {
- if (_visitorThread.size() != static_cast<uint32_t>(config->visitorthreads)) {
+ if (_visitorThread.size() != static_cast<uint32_t>(config.visitorthreads)) {
LOG(warning, "Ignoring config change requesting %u visitor "
"threads, still running %u. Restart storage to apply "
"change.",
- config->visitorthreads,
+ config.visitorthreads,
(uint32_t) _visitorThread.size());
}
@@ -174,18 +170,18 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
maxConcurrentVisitorsFixed, maxConcurrentVisitorsVariable);
}
- if (_maxVisitorQueueSize != static_cast<uint32_t>(config->maxvisitorqueuesize)) {
+ if (_maxVisitorQueueSize != static_cast<uint32_t>(config.maxvisitorqueuesize)) {
LOG(info, "Altered max visitor queue size setting from %u to %u.",
- _maxVisitorQueueSize, config->maxvisitorqueuesize);
+ _maxVisitorQueueSize, config.maxvisitorqueuesize);
}
} else {
- if (config->visitorthreads == 0) {
+ if (config.visitorthreads == 0) {
throw config::InvalidConfigException(
"No visitor threads configured. If you don't want visitors "
"to run, don't use visitormanager.", VESPA_STRLOC);
}
- _metrics->initThreads(config->visitorthreads);
- for (int32_t i=0; i<config->visitorthreads; ++i) {
+ _metrics->initThreads(config.visitorthreads);
+ for (int32_t i=0; i<config.visitorthreads; ++i) {
_visitorThread.emplace_back(
// Naked new due to a lot of private inheritance in VisitorThread and VisitorManager
std::shared_ptr<VisitorThread>(new VisitorThread(i, _componentRegister, _messageSessionFactory,
@@ -195,9 +191,9 @@ VisitorManager::configure(std::unique_ptr<vespa::config::content::core::StorVisi
}
_maxFixedConcurrentVisitors = maxConcurrentVisitorsFixed;
_maxVariableConcurrentVisitors = maxConcurrentVisitorsVariable;
- _maxVisitorQueueSize = config->maxvisitorqueuesize;
+ _maxVisitorQueueSize = config.maxvisitorqueuesize;
- auto cmd = std::make_shared<PropagateVisitorConfig>(*config);
+ auto cmd = std::make_shared<PropagateVisitorConfig>(config);
for (auto& thread : _visitorThread) {
thread.first->processMessage(0, cmd);
}
diff --git a/storage/src/vespa/storage/visiting/visitormanager.h b/storage/src/vespa/storage/visiting/visitormanager.h
index 9fe906d4465..fefa2c218ab 100644
--- a/storage/src/vespa/storage/visiting/visitormanager.h
+++ b/storage/src/vespa/storage/visiting/visitormanager.h
@@ -44,10 +44,11 @@ class VisitorManager : public framework::Runnable,
public StorageLink,
public framework::HtmlStatusReporter,
private VisitorMessageHandler,
- private config::IFetcherCallback<vespa::config::content::core::StorVisitorConfig>,
private framework::MetricUpdateHook
{
private:
+ using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig;
+
StorageComponentRegister& _componentRegister;
VisitorMessageSessionFactory& _messageSessionFactory;
std::vector<std::pair<std::shared_ptr<VisitorThread>,
@@ -64,7 +65,6 @@ private:
mutable std::mutex _visitorLock;
std::condition_variable _visitorCond;
uint64_t _visitorCounter;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
std::shared_ptr<VisitorMetrics> _metrics;
uint32_t _maxFixedConcurrentVisitors;
uint32_t _maxVariableConcurrentVisitors;
@@ -82,7 +82,7 @@ private:
bool _enforceQueueUse;
VisitorFactory::Map _visitorFactories;
public:
- VisitorManager(const config::ConfigUri & configUri,
+ VisitorManager(const StorVisitorConfig& bootstrap_config,
StorageComponentRegister&,
VisitorMessageSessionFactory&,
VisitorFactory::Map external = VisitorFactory::Map(),
@@ -94,6 +94,8 @@ public:
uint32_t getActiveVisitorCount() const;
void setTimeBetweenTicks(uint32_t time);
+ void on_configure(const vespa::config::content::core::StorVisitorConfig&);
+
void setMaxConcurrentVisitors(uint32_t count) { // Used in unit testing
_maxFixedConcurrentVisitors = count;
_maxVariableConcurrentVisitors = 0;
@@ -122,7 +124,6 @@ public:
private:
using MonitorGuard = std::unique_lock<std::mutex>;
- void configure(std::unique_ptr<vespa::config::content::core::StorVisitorConfig>) override;
void run(framework::ThreadHandle&) override;
/**
diff --git a/storageserver/src/vespa/storageserver/app/process.h b/storageserver/src/vespa/storageserver/app/process.h
index a30611b78c8..72b399ac870 100644
--- a/storageserver/src/vespa/storageserver/app/process.h
+++ b/storageserver/src/vespa/storageserver/app/process.h
@@ -8,7 +8,7 @@
* contains the process as a library such that it can be tested and used in
* other pieces of code.
*
- * Specializations of this class will exist to add the funcionality needed for
+ * Specializations of this class will exist to add the functionality needed for
* the various process types.
*/
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index 6b713b8e3f4..bb284bfc108 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -34,6 +34,9 @@ bucket_db_options_from_config(const config::ConfigUri& config_uri) {
ServiceLayerProcess::ServiceLayerProcess(const config::ConfigUri& configUri)
: Process(configUri),
_externalVisitors(),
+ _persistence_cfg_handle(),
+ _visitor_cfg_handle(),
+ _filestor_cfg_handle(),
_node(),
_storage_chain_builder(),
_context(std::make_unique<framework::defaultimplementation::RealClock>(),
@@ -53,6 +56,10 @@ ServiceLayerProcess::shutdown()
void
ServiceLayerProcess::setupConfig(vespalib::duration subscribe_timeout)
{
+ _persistence_cfg_handle = _configSubscriber.subscribe<PersistenceConfig>(_configUri.getConfigId(), subscribe_timeout);
+ _visitor_cfg_handle = _configSubscriber.subscribe<StorVisitorConfig>(_configUri.getConfigId(), subscribe_timeout);
+ _filestor_cfg_handle = _configSubscriber.subscribe<StorFilestorConfig>(_configUri.getConfigId(), subscribe_timeout);
+ // We reuse the StorServerConfig subscription from the parent Process
Process::setupConfig(subscribe_timeout);
}
@@ -60,6 +67,18 @@ void
ServiceLayerProcess::updateConfig()
{
Process::updateConfig();
+ if (_server_cfg_handle->isChanged()) {
+ _node->on_configure(*_server_cfg_handle->getConfig());
+ }
+ if (_persistence_cfg_handle->isChanged()) {
+ _node->on_configure(*_persistence_cfg_handle->getConfig());
+ }
+ if (_visitor_cfg_handle->isChanged()) {
+ _node->on_configure(*_visitor_cfg_handle->getConfig());
+ }
+ if (_filestor_cfg_handle->isChanged()) {
+ _node->on_configure(*_filestor_cfg_handle->getConfig());
+ }
}
bool
@@ -73,15 +92,21 @@ ServiceLayerProcess::createNode()
{
add_external_visitors();
setupProvider();
- // TODO dedupe, consolidate
+
StorageNode::BootstrapConfigs bc;
bc.bucket_spaces_cfg = _bucket_spaces_cfg_handle->getConfig();
- bc.bouncer_cfg = _bouncer_cfg_handle->getConfig();
- bc.comm_mgr_cfg = _comm_mgr_cfg_handle->getConfig();
- bc.distribution_cfg = _distribution_cfg_handle->getConfig();
- bc.server_cfg = _server_cfg_handle->getConfig();
-
- _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(bc), *this, getProvider(), _externalVisitors);
+ bc.bouncer_cfg = _bouncer_cfg_handle->getConfig();
+ bc.comm_mgr_cfg = _comm_mgr_cfg_handle->getConfig();
+ bc.distribution_cfg = _distribution_cfg_handle->getConfig();
+ bc.server_cfg = _server_cfg_handle->getConfig();
+
+ ServiceLayerNode::ServiceLayerBootstrapConfigs sbc;
+ sbc.storage_bootstrap_configs = std::move(bc);
+ sbc.persistence_cfg = _persistence_cfg_handle->getConfig();
+ sbc.visitor_cfg = _visitor_cfg_handle->getConfig();
+ sbc.filestor_cfg = _filestor_cfg_handle->getConfig();
+
+ _node = std::make_unique<ServiceLayerNode>(_configUri, _context, std::move(sbc), *this, getProvider(), _externalVisitors);
if (_storage_chain_builder) {
_node->set_storage_chain_builder(std::move(_storage_chain_builder));
}
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
index 7e981748d1a..dcc56f373c4 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -1,24 +1,12 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class storage::ServiceLayerProcess
- *
- * \brief A process running a service layer.
- */
-/**
- * \class storage::MemFileServiceLayerProcess
- *
- * \brief A process running a service layer with memfile persistence provider.
- */
-/**
- * \class storage::RpcServiceLayerProcess
- *
- * \brief A process running a service layer with RPC persistence provider.
- */
#pragma once
#include "process.h"
-#include <vespa/storage/storageserver/servicelayernodecontext.h>
+#include <vespa/config-persistence.h>
+#include <vespa/config-stor-filestor.h>
#include <vespa/storage/common/visitorfactory.h>
+#include <vespa/storage/storageserver/servicelayernodecontext.h>
+#include <vespa/storage/visiting/config-stor-visitor.h>
namespace config { class ConfigUri; }
@@ -33,6 +21,14 @@ class ServiceLayerProcess : public Process {
protected:
VisitorFactory::Map _externalVisitors;
private:
+ using PersistenceConfig = vespa::config::content::PersistenceConfig;
+ using StorVisitorConfig = vespa::config::content::core::StorVisitorConfig;
+ using StorFilestorConfig = vespa::config::content::StorFilestorConfig;
+
+ std::unique_ptr<config::ConfigHandle<PersistenceConfig>> _persistence_cfg_handle;
+ std::unique_ptr<config::ConfigHandle<StorVisitorConfig>> _visitor_cfg_handle;
+ std::unique_ptr<config::ConfigHandle<StorFilestorConfig>> _filestor_cfg_handle;
+
std::unique_ptr<ServiceLayerNode> _node;
std::unique_ptr<IStorageChainBuilder> _storage_chain_builder;