aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2023-10-23 14:00:59 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2023-10-24 09:05:31 +0000
commit8fc6725988f9cc475c502412fe59391e7f150674 (patch)
tree0c1164b5685b0d262f483b891435e2a7aff086af
parent094c30a06a3e4441ee4dbeaa61de76fe1b6f54e4 (diff)
Wire config to MergeThrottler in from the outside
-rw-r--r--storage/src/tests/storageserver/mergethrottlertest.cpp33
-rw-r--r--storage/src/tests/storageserver/service_layer_error_listener_test.cpp9
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.cpp37
-rw-r--r--storage/src/vespa/storage/storageserver/mergethrottler.h15
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h10
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp4
7 files changed, 70 insertions, 52 deletions
diff --git a/storage/src/tests/storageserver/mergethrottlertest.cpp b/storage/src/tests/storageserver/mergethrottlertest.cpp
index 8e87e07eeff..990a552780b 100644
--- a/storage/src/tests/storageserver/mergethrottlertest.cpp
+++ b/storage/src/tests/storageserver/mergethrottlertest.cpp
@@ -2,6 +2,7 @@
#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
#include <tests/common/teststorageapp.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/document/test/make_document_bucket.h>
#include <vespa/messagebus/dynamicthrottlepolicy.h>
#include <vespa/storage/storageserver/mergethrottler.h>
@@ -28,7 +29,9 @@ namespace storage {
namespace {
-vespalib::string _Storage("storage");
+using StorServerConfig = vespa::config::content::core::StorServerConfig;
+
+vespalib::string _storage("storage");
struct MergeBuilder {
document::BucketId _bucket;
@@ -108,7 +111,7 @@ struct MergeBuilder {
auto cmd = std::make_shared<MergeBucketCommand>(
makeDocumentBucket(_bucket), n, _maxTimestamp,
_clusterStateVersion, _chain);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, _nodes[0]));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, _nodes[0]));
return cmd;
}
};
@@ -121,6 +124,10 @@ makeSystemStateCmd(const std::string& state)
return std::make_shared<api::SetSystemStateCommand>(lib::ClusterState(state));
}
+std::unique_ptr<StorServerConfig> config_from(const ::config::ConfigUri& cfg_uri) {
+ return ::config::ConfigGetter<StorServerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
+}
+
} // anon ns
struct MergeThrottlerTest : Test {
@@ -167,7 +174,9 @@ MergeThrottlerTest::~MergeThrottlerTest() = default;
void
MergeThrottlerTest::SetUp()
{
- vdstestlib::DirConfig config(getStandardConfig(true));
+ vdstestlib::DirConfig dir_config(getStandardConfig(true));
+ auto cfg_uri = ::config::ConfigUri(dir_config.getConfigId());
+ auto config = config_from(cfg_uri);
for (int i = 0; i < _storageNodeCount; ++i) {
auto server = std::make_unique<TestServiceLayerApp>(NodeIndex(i));
@@ -175,7 +184,7 @@ MergeThrottlerTest::SetUp()
std::unique_ptr<DummyStorageLink> top;
top = std::make_unique<DummyStorageLink>();
- MergeThrottler* throttler = new MergeThrottler(::config::ConfigUri(config.getConfigId()), server->getComponentRegister());
+ MergeThrottler* throttler = new MergeThrottler(*config, server->getComponentRegister());
// MergeThrottler will be sandwiched in between two dummy links
top->push_back(std::unique_ptr<StorageLink>(throttler));
DummyStorageLink* bottom = new DummyStorageLink;
@@ -283,7 +292,7 @@ TEST_F(MergeThrottlerTest, chain) {
auto cmd = std::make_shared<MergeBucketCommand>(bucket, nodes, UINT_MAX, 123);
cmd->setPriority(7);
cmd->setTimeout(54321ms);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
const uint16_t distributorIndex = 123;
cmd->setSourceIndex(distributorIndex); // Dummy distributor index that must be forwarded
@@ -423,7 +432,7 @@ TEST_F(MergeThrottlerTest, with_source_only_node) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {2}, {1, true}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, UINT_MAX, 123);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 0));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 0));
_topLinks[0]->sendDown(cmd);
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -468,7 +477,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior) {
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[1]->sendDown(cmd);
_bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -503,7 +512,7 @@ TEST_F(MergeThrottlerTest, legacy_42_distributor_behavior_does_not_take_ownershi
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
// Send to node 1, which is not the lowest index
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[1]->sendDown(cmd);
_bottomLinks[1]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -550,7 +559,7 @@ TEST_F(MergeThrottlerTest, end_of_chain_execution_does_not_take_ownership) {
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
// Send to last node, which is not the lowest index
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 3));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 3));
_topLinks[2]->sendDown(cmd);
_bottomLinks[2]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -595,7 +604,7 @@ TEST_F(MergeThrottlerTest, resend_handling) {
std::vector<MergeBucketCommand::Node> nodes({{0}, {1}, {2}});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[0]->sendDown(cmd);
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET, _messageWaitTime);
@@ -962,7 +971,7 @@ TEST_F(MergeThrottlerTest, unseen_merge_with_node_in_chain) {
makeDocumentBucket(BucketId(32, 0xdeadbeef)), nodes, 1234, 1, chain);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 9));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 9));
_topLinks[0]->sendDown(cmd);
// First, test that we get rejected when processing merge immediately
@@ -1145,7 +1154,7 @@ TEST_F(MergeThrottlerTest, unknown_merge_with_self_in_chain) {
std::vector<uint16_t> chain({0});
auto cmd = std::make_shared<MergeBucketCommand>(makeDocumentBucket(bid), nodes, 1234, 1, chain);
- cmd->setAddress(StorageMessageAddress::create(&_Storage, lib::NodeType::STORAGE, 1));
+ cmd->setAddress(StorageMessageAddress::create(&_storage, lib::NodeType::STORAGE, 1));
_topLinks[0]->sendDown(cmd);
_topLinks[0]->waitForMessage(MessageType::MERGEBUCKET_REPLY, _messageWaitTime);
diff --git a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
index 84a98385962..a30594f6076 100644
--- a/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
+++ b/storage/src/tests/storageserver/service_layer_error_listener_test.cpp
@@ -6,6 +6,7 @@
#include <vespa/vdstestlib/config/dirconfig.h>
#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
+#include <vespa/config/helper/configgetter.hpp>
#include <vespa/vespalib/gtest/gtest.h>
using namespace ::testing;
@@ -37,10 +38,16 @@ struct Fixture {
vdstestlib::DirConfig config{getStandardConfig(true)};
TestServiceLayerApp app;
ServiceLayerComponent component{app.getComponentRegister(), "dummy"};
- MergeThrottler merge_throttler{config::ConfigUri(config.getConfigId()), app.getComponentRegister()};
+ MergeThrottler merge_throttler{*config_from(config::ConfigUri(config.getConfigId())), app.getComponentRegister()};
TestShutdownListener shutdown_listener;
ServiceLayerErrorListener error_listener{component, merge_throttler};
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
+
+ static std::unique_ptr<StorServerConfig> config_from(const ::config::ConfigUri& cfg_uri) {
+ return ::config::ConfigGetter<StorServerConfig>::getConfig(cfg_uri.getConfigId(), cfg_uri.getContext());
+ }
+
Fixture();
~Fixture();
};
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.cpp b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
index 6e2f2a77d20..4cc2a7a89ab 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.cpp
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.cpp
@@ -179,7 +179,7 @@ MergeThrottler::MergeNodeSequence::chain_contains_this_node() const noexcept
}
MergeThrottler::MergeThrottler(
- const config::ConfigUri & configUri,
+ const StorServerConfig& bootstrap_config,
StorageComponentRegister& compReg)
: StorageLink("Merge Throttler"),
framework::HtmlStatusReporter("merges", "Merge Throttler"),
@@ -190,7 +190,6 @@ MergeThrottler::MergeThrottler(
_queueSequence(0),
_messageLock(),
_stateLock(),
- _configFetcher(std::make_unique<config::ConfigFetcher>(configUri.getContext())),
_metrics(std::make_unique<Metrics>()),
_component(compReg, "mergethrottler"),
_thread(),
@@ -203,34 +202,33 @@ MergeThrottler::MergeThrottler(
{
_throttlePolicy->setMinWindowSize(20);
_throttlePolicy->setMaxWindowSize(20);
- _configFetcher->subscribe<StorServerConfig>(configUri.getConfigId(), this);
- _configFetcher->start();
+ on_configure(bootstrap_config);
_component.registerStatusPage(*this);
_component.registerMetric(*_metrics);
}
void
-MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig)
+MergeThrottler::on_configure(const StorServerConfig& new_config)
{
std::lock_guard lock(_stateLock);
- _use_dynamic_throttling = (newConfig->mergeThrottlingPolicy.type
+ _use_dynamic_throttling = (new_config.mergeThrottlingPolicy.type
== StorServerConfig::MergeThrottlingPolicy::Type::DYNAMIC);
- if (newConfig->maxMergesPerNode < 1) {
+ if (new_config.maxMergesPerNode < 1) {
throw config::InvalidConfigException("Cannot have a max merge count of less than 1");
}
- if (newConfig->maxMergeQueueSize < 0) {
+ if (new_config.maxMergeQueueSize < 0) {
throw config::InvalidConfigException("Max merge queue size cannot be less than 0");
}
- if (newConfig->resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
+ if (new_config.resourceExhaustionMergeBackPressureDurationSecs < 0.0) {
throw config::InvalidConfigException("Merge back-pressure duration cannot be less than 0");
}
if (_use_dynamic_throttling) {
- auto min_win_sz = std::max(newConfig->mergeThrottlingPolicy.minWindowSize, 1);
- auto max_win_sz = std::max(newConfig->mergeThrottlingPolicy.maxWindowSize, 1);
+ auto min_win_sz = std::max(new_config.mergeThrottlingPolicy.minWindowSize, 1);
+ auto max_win_sz = std::max(new_config.mergeThrottlingPolicy.maxWindowSize, 1);
if (min_win_sz > max_win_sz) {
min_win_sz = max_win_sz;
}
- auto win_sz_increment = std::max(1.0, newConfig->mergeThrottlingPolicy.windowSizeIncrement);
+ auto win_sz_increment = std::max(1.0, new_config.mergeThrottlingPolicy.windowSizeIncrement);
_throttlePolicy->setMinWindowSize(min_win_sz);
_throttlePolicy->setMaxWindowSize(max_win_sz);
_throttlePolicy->setWindowSizeIncrement(win_sz_increment);
@@ -238,15 +236,14 @@ MergeThrottler::configure(std::unique_ptr<vespa::config::content::core::StorServ
min_win_sz, max_win_sz, win_sz_increment);
} else {
// Use legacy config values when static throttling is enabled.
- _throttlePolicy->setMinWindowSize(newConfig->maxMergesPerNode);
- _throttlePolicy->setMaxWindowSize(newConfig->maxMergesPerNode);
+ _throttlePolicy->setMinWindowSize(new_config.maxMergesPerNode);
+ _throttlePolicy->setMaxWindowSize(new_config.maxMergesPerNode);
}
- LOG(debug, "Setting new max queue size to %d",
- newConfig->maxMergeQueueSize);
- _maxQueueSize = newConfig->maxMergeQueueSize;
+ LOG(debug, "Setting new max queue size to %d", new_config.maxMergeQueueSize);
+ _maxQueueSize = new_config.maxMergeQueueSize;
_backpressure_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
- std::chrono::duration<double>(newConfig->resourceExhaustionMergeBackPressureDurationSecs));
- _disable_queue_limits_for_chained_merges = newConfig->disableQueueLimitsForChainedMerges;
+ std::chrono::duration<double>(new_config.resourceExhaustionMergeBackPressureDurationSecs));
+ _disable_queue_limits_for_chained_merges = new_config.disableQueueLimitsForChainedMerges;
}
MergeThrottler::~MergeThrottler()
@@ -275,8 +272,6 @@ MergeThrottler::onOpen()
void
MergeThrottler::onClose()
{
- // Avoid getting config on shutdown
- _configFetcher->close();
{
std::lock_guard guard(_messageLock);
// Note: used to prevent taking locks in different order if onFlush
diff --git a/storage/src/vespa/storage/storageserver/mergethrottler.h b/storage/src/vespa/storage/storageserver/mergethrottler.h
index 840f1df1177..5362c2f6df8 100644
--- a/storage/src/vespa/storage/storageserver/mergethrottler.h
+++ b/storage/src/vespa/storage/storageserver/mergethrottler.h
@@ -35,10 +35,11 @@ class AbortBucketOperationsCommand;
class MergeThrottler : public framework::Runnable,
public StorageLink,
- public framework::HtmlStatusReporter,
- private config::IFetcherCallback<vespa::config::content::core::StorServerConfig>
+ public framework::HtmlStatusReporter
{
public:
+ using StorServerConfig = vespa::config::content::core::StorServerConfig;
+
class MergeFailureMetrics : public metrics::MetricSet {
public:
metrics::SumMetric<metrics::LongCountMetric> sum;
@@ -172,7 +173,6 @@ private:
mutable std::mutex _messageLock;
std::condition_variable _messageCond;
mutable std::mutex _stateLock;
- std::unique_ptr<config::ConfigFetcher> _configFetcher;
// Messages pending to be processed by the worker thread
std::vector<api::StorageMessage::SP> _messagesDown;
std::vector<api::StorageMessage::SP> _messagesUp;
@@ -190,7 +190,7 @@ public:
* windowSizeIncrement used for allowing unit tests to start out with more
* than 1 as their window size.
*/
- MergeThrottler(const config::ConfigUri & configUri, StorageComponentRegister&);
+ MergeThrottler(const StorServerConfig& bootstrap_config, StorageComponentRegister&);
~MergeThrottler() override;
/** Implements document::Runnable::run */
@@ -204,6 +204,8 @@ public:
bool onSetSystemState(const std::shared_ptr<api::SetSystemStateCommand>& stateCmd) override;
+ void on_configure(const StorServerConfig& new_config);
+
/*
* When invoked, merges to the node will be BUSY-bounced by the throttler
* for a configurable period of time instead of being processed.
@@ -282,11 +284,6 @@ private:
[[nodiscard]] bool isChainCompleted() const noexcept;
};
- /**
- * Callback method for config system (IFetcherCallback)
- */
- void configure(std::unique_ptr<vespa::config::content::core::StorServerConfig> newConfig) override;
-
// NOTE: unless explicitly specified, all the below functions require
// _sync lock to be held upon call (usually implicitly via MessageGuard)
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index 3faa8a3ddec..b09621335a7 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -37,6 +37,7 @@ ServiceLayerNode::ServiceLayerNode(const config::ConfigUri & configUri,
_bouncer(nullptr),
_bucket_manager(nullptr),
_fileStorManager(nullptr),
+ _merge_throttler(nullptr),
_init_has_been_called(false)
{
}
@@ -158,8 +159,8 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
auto bouncer = std::make_unique<Bouncer>(compReg, bouncer_config());
_bouncer = bouncer.get();
builder.add(std::move(bouncer));
- auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg);
- auto merge_throttler = merge_throttler_up.get();
+ auto merge_throttler_up = std::make_unique<MergeThrottler>(server_config(), compReg);
+ _merge_throttler = merge_throttler_up.get();
builder.add(std::move(merge_throttler_up));
builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg));
auto bucket_manager = std::make_unique<BucketManager>(_configUri, _context.getComponentRegister());
@@ -178,10 +179,17 @@ ServiceLayerNode::createChain(IStorageChainBuilder &builder)
// Lifetimes of all referenced components shall outlive the last call going
// through the SPI, as queues are flushed and worker threads joined when
// the storage link chain is closed prior to destruction.
- auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler);
+ auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *_merge_throttler);
_fileStorManager->error_wrapper().register_error_listener(std::move(error_listener));
}
+void
+ServiceLayerNode::on_configure(const StorServerConfig& config)
+{
+ assert(_merge_throttler);
+ _merge_throttler->on_configure(config);
+}
+
ResumeGuard
ServiceLayerNode::pause()
{
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index dabc1c979a5..3e95b9c6d3b 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -1,10 +1,4 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-/**
- * \class storage::ServiceLayerNode
- * \ingroup storageserver
- *
- * \brief Class for setting up a service layer node.
- */
#pragma once
@@ -23,6 +17,7 @@ namespace spi { struct PersistenceProvider; }
class Bouncer;
class BucketManager;
class FileStorManager;
+class MergeThrottler;
class ServiceLayerNode
: public StorageNode,
@@ -37,6 +32,7 @@ class ServiceLayerNode
Bouncer* _bouncer;
BucketManager* _bucket_manager;
FileStorManager* _fileStorManager;
+ MergeThrottler* _merge_throttler;
bool _init_has_been_called;
public:
@@ -54,6 +50,8 @@ public:
*/
void init();
+ void on_configure(const StorServerConfig& config);
+
const lib::NodeType& getNodeType() const override { return lib::NodeType::STORAGE; }
ResumeGuard pause() override;
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index 6b713b8e3f4..9b835cee05e 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -53,6 +53,7 @@ ServiceLayerProcess::shutdown()
void
ServiceLayerProcess::setupConfig(vespalib::duration subscribe_timeout)
{
+ // We reuse the StorServerConfig subscription from the parent Process
Process::setupConfig(subscribe_timeout);
}
@@ -60,6 +61,9 @@ void
ServiceLayerProcess::updateConfig()
{
Process::updateConfig();
+ if (_server_cfg_handle->isChanged()) {
+ _node->on_configure(*_server_cfg_handle->getConfig());
+ }
}
bool