From 6f8792b0acfa3e417d124b519b6383065a6bfd7a Mon Sep 17 00:00:00 2001 From: Arne Juul Date: Fri, 11 Jun 2021 12:57:24 +0000 Subject: add separate config subscriber for model --- configd/src/apps/sentinel/CMakeLists.txt | 1 + configd/src/apps/sentinel/config-owner.cpp | 43 +++++++++-------- configd/src/apps/sentinel/config-owner.h | 9 +++- configd/src/apps/sentinel/connectivity.cpp | 8 +-- configd/src/apps/sentinel/connectivity.h | 3 +- configd/src/apps/sentinel/env.cpp | 11 +++-- configd/src/apps/sentinel/env.h | 3 ++ configd/src/apps/sentinel/model-subscriber.cpp | 59 +++++++++++++++++++++++ configd/src/apps/sentinel/model-subscriber.h | 30 ++++++++++++ configd/src/apps/sentinel/report-connectivity.cpp | 45 ++++++----------- configd/src/apps/sentinel/report-connectivity.h | 8 ++- configd/src/apps/sentinel/rpchooks.cpp | 7 +-- configd/src/apps/sentinel/rpchooks.h | 4 +- configd/src/apps/sentinel/rpcserver.cpp | 4 +- configd/src/apps/sentinel/rpcserver.h | 3 +- 15 files changed, 166 insertions(+), 72 deletions(-) create mode 100644 configd/src/apps/sentinel/model-subscriber.cpp create mode 100644 configd/src/apps/sentinel/model-subscriber.h (limited to 'configd') diff --git a/configd/src/apps/sentinel/CMakeLists.txt b/configd/src/apps/sentinel/CMakeLists.txt index 0999314c8ce..3818f334ff2 100644 --- a/configd/src/apps/sentinel/CMakeLists.txt +++ b/configd/src/apps/sentinel/CMakeLists.txt @@ -9,6 +9,7 @@ vespa_add_executable(configd_config-sentinel_app line-splitter.cpp manager.cpp metrics.cpp + model-subscriber.cpp output-connection.cpp outward-check.cpp peer-check.cpp diff --git a/configd/src/apps/sentinel/config-owner.cpp b/configd/src/apps/sentinel/config-owner.cpp index 840c5b1add1..074f7187537 100644 --- a/configd/src/apps/sentinel/config-owner.cpp +++ b/configd/src/apps/sentinel/config-owner.cpp @@ -10,13 +10,22 @@ LOG_SETUP(".config-owner"); namespace config::sentinel { -ConfigOwner::ConfigOwner() : _subscriber() {} +ConfigOwner::ConfigOwner() = default; ConfigOwner::~ConfigOwner() = default; void ConfigOwner::subscribe(const std::string & configId, std::chrono::milliseconds timeout) { _sentinelHandle = _subscriber.subscribe(configId, timeout); + try { + _modelHandle =_modelSubscriber.subscribe("admin/model", timeout); + } catch (ConfigTimeoutException & ex) { + LOG(warning, "Timeout getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); + } catch (InvalidConfigException& ex) { + LOG(warning, "Invalid model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); + } catch (ConfigRuntimeException& ex) { + LOG(warning, "Runtime exception getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); + } } void @@ -29,6 +38,7 @@ ConfigOwner::doConfigure() const auto & app = config.application; LOG(config, "Sentinel got %zd service elements [tenant(%s), application(%s), instance(%s)] for config generation %" PRId64, config.service.size(), app.tenant.c_str(), app.name.c_str(), app.instance.c_str(), _currGeneration); + getModelConfig(); } @@ -42,29 +52,20 @@ ConfigOwner::checkForConfigUpdate() { return false; } -std::unique_ptr -ConfigOwner::fetchModelConfig(std::chrono::milliseconds timeout) -{ - std::unique_ptr modelConfig; - ConfigSubscriber tempSubscriber; - try { - ConfigHandle::UP modelHandle = - tempSubscriber.subscribe("admin/model", timeout); - if (tempSubscriber.nextGenerationNow()) { - modelConfig = modelHandle->getConfig(); +std::optional +ConfigOwner::getModelConfig() { + if (_modelHandle && _modelSubscriber.nextGenerationNow()) { + if (auto newModel = _modelHandle->getConfig()) { LOG(config, "Sentinel got model info [version %s] for %zd hosts [config generation %" PRId64 "]", - modelConfig->vespaVersion.c_str(), modelConfig->hosts.size(), - tempSubscriber.getGeneration()); + newModel->vespaVersion.c_str(), newModel->hosts.size(), _modelSubscriber.getGeneration()); + _modelConfig = std::move(newModel); } - } catch (ConfigTimeoutException & ex) { - LOG(warning, "Timeout getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); - } catch (InvalidConfigException& ex) { - LOG(warning, "Invalid model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); - } catch (ConfigRuntimeException& ex) { - LOG(warning, "Runtime exception getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); - } - return modelConfig; + if (_modelConfig) { + return ModelConfig(*_modelConfig); + } else { + return {}; + } } } diff --git a/configd/src/apps/sentinel/config-owner.h b/configd/src/apps/sentinel/config-owner.h index 2850e6b3904..7b79f65b1de 100644 --- a/configd/src/apps/sentinel/config-owner.h +++ b/configd/src/apps/sentinel/config-owner.h @@ -5,6 +5,7 @@ #include #include #include +#include using cloud::config::SentinelConfig; using cloud::config::ModelConfig; @@ -21,10 +22,14 @@ class ConfigOwner { private: ConfigSubscriber _subscriber; ConfigHandle::UP _sentinelHandle; - + int64_t _currGeneration = -1; std::unique_ptr _currConfig; + ConfigSubscriber _modelSubscriber; + ConfigHandle::UP _modelHandle; + std::unique_ptr _modelConfig; + ConfigOwner(const ConfigOwner&) = delete; ConfigOwner& operator =(const ConfigOwner&) = delete; @@ -37,7 +42,7 @@ public: bool hasConfig() const { return _currConfig.get() != nullptr; } const SentinelConfig& getConfig() const { return *_currConfig; } int64_t getGeneration() const { return _currGeneration; } - static std::unique_ptr fetchModelConfig(std::chrono::milliseconds timeout); + std::optional getModelConfig(); }; } diff --git a/configd/src/apps/sentinel/connectivity.cpp b/configd/src/apps/sentinel/connectivity.cpp index 12b645dd589..7ea548ea73d 100644 --- a/configd/src/apps/sentinel/connectivity.cpp +++ b/configd/src/apps/sentinel/connectivity.cpp @@ -118,13 +118,13 @@ SpecMap Connectivity::specsFrom(const ModelConfig &model) { return checkSpecs; } -void Connectivity::configure(const SentinelConfig::Connectivity &config) { +void Connectivity::configure(const SentinelConfig::Connectivity &config, + const ModelConfig &model) +{ _config = config; LOG(config, "connectivity.maxBadReverseCount = %d", _config.maxBadReverseCount); LOG(config, "connectivity.maxBadOutPercent = %d", _config.maxBadOutPercent); - if (auto up = ConfigOwner::fetchModelConfig(MODEL_TIMEOUT_MS)) { - _checkSpecs = specsFrom(*up); - } + _checkSpecs = specsFrom(model); } bool diff --git a/configd/src/apps/sentinel/connectivity.h b/configd/src/apps/sentinel/connectivity.h index 440d7df84c0..2ba17f5c07c 100644 --- a/configd/src/apps/sentinel/connectivity.h +++ b/configd/src/apps/sentinel/connectivity.h @@ -24,7 +24,8 @@ public: Connectivity(); ~Connectivity(); - void configure(const SentinelConfig::Connectivity &config); + void configure(const SentinelConfig::Connectivity &config, + const ModelConfig &model); bool checkConnectivity(RpcServer &rpcServer); static SpecMap specsFrom(const ModelConfig &model); private: diff --git a/configd/src/apps/sentinel/env.cpp b/configd/src/apps/sentinel/env.cpp index 5bbbfd8f0bd..58b917ca16e 100644 --- a/configd/src/apps/sentinel/env.cpp +++ b/configd/src/apps/sentinel/env.cpp @@ -36,6 +36,7 @@ constexpr int maxConnectivityRetries = 100; Env::Env() : _cfgOwner(), + _modelSubscriber("admin/model"), _rpcCommandQueue(), _rpcServer(), _stateApi(), @@ -52,6 +53,7 @@ Env::~Env() = default; void Env::boot(const std::string &configId) { LOG(debug, "Reading configuration for ID: %s", configId.c_str()); _cfgOwner.subscribe(configId, CONFIG_TIMEOUT_MS); + _modelSubscriber.start(CONFIG_TIMEOUT_MS); // subscribe() should throw if something is not OK Connectivity checker; for (int retry = 0; retry < maxConnectivityRetries; ++retry) { @@ -64,7 +66,10 @@ void Env::boot(const std::string &configId) { configId.c_str(), cfg.port.telnet, cfg.port.rpc); rpcPort(cfg.port.rpc); statePort(cfg.port.telnet); - checker.configure(cfg.connectivity); + auto model = _modelSubscriber.getModelConfig(); + if (model.has_value()) { + checker.configure(cfg.connectivity, model.value()); + } } if (checker.checkConnectivity(*_rpcServer)) { _stateApi.myHealth.setOk(); @@ -94,7 +99,7 @@ void Env::rpcPort(int port) { if (_rpcServer && port == _rpcServer->getPort()) { return; // ok already } - _rpcServer = std::make_unique(port, _rpcCommandQueue); + _rpcServer = std::make_unique(port, _rpcCommandQueue, _modelSubscriber); } void Env::statePort(int port) { @@ -116,7 +121,7 @@ void Env::statePort(int port) { void Env::notifyConfigUpdated() { vespalib::ComponentConfigProducer::Config current("sentinel", _cfgOwner.getGeneration(), "ok"); _stateApi.myComponents.addConfig(current); - + _modelSubscriber.checkForUpdates(); } void Env::respondAsEmpty() { diff --git a/configd/src/apps/sentinel/env.h b/configd/src/apps/sentinel/env.h index f71fb537068..9319caa4bd2 100644 --- a/configd/src/apps/sentinel/env.h +++ b/configd/src/apps/sentinel/env.h @@ -5,6 +5,7 @@ #include "cmdq.h" #include "config-owner.h" #include "metrics.h" +#include "model-subscriber.h" #include "rpcserver.h" #include "state-api.h" #include @@ -22,6 +23,7 @@ public: ~Env(); ConfigOwner &configOwner() { return _cfgOwner; } + ModelSubscriber &modelSubscriber() { return _modelSubscriber; } CommandQueue &commandQueue() { return _rpcCommandQueue; } StartMetrics &metrics() { return _startMetrics; } @@ -33,6 +35,7 @@ public: private: void respondAsEmpty(); ConfigOwner _cfgOwner; + ModelSubscriber _modelSubscriber; CommandQueue _rpcCommandQueue; std::unique_ptr _rpcServer; StateApi _stateApi; diff --git a/configd/src/apps/sentinel/model-subscriber.cpp b/configd/src/apps/sentinel/model-subscriber.cpp new file mode 100644 index 00000000000..f603b5fcb1b --- /dev/null +++ b/configd/src/apps/sentinel/model-subscriber.cpp @@ -0,0 +1,59 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "model-subscriber.h" +#include +#include +#include +#include +#include + +LOG_SETUP(".sentinel.model-subscriber"); + +using namespace std::chrono_literals; + +namespace config::sentinel { + +std::optional ModelSubscriber::getModelConfig() { + checkForUpdates(); + if (_modelConfig) { + return ModelConfig(*_modelConfig); + } else { + return {}; + } +} + + +ModelSubscriber::ModelSubscriber(const std::string &configId) + : _configId(configId) +{} + +ModelSubscriber::~ModelSubscriber() = default; + +void +ModelSubscriber::start(std::chrono::milliseconds timeout) { + try { + _modelHandle =_subscriber.subscribe(_configId, timeout); + } catch (ConfigTimeoutException & ex) { + LOG(warning, "Timeout getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); + } catch (InvalidConfigException& ex) { + LOG(warning, "Invalid model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); + } catch (ConfigRuntimeException& ex) { + LOG(warning, "Runtime exception getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str()); + } +} + +void +ModelSubscriber::checkForUpdates() { + if (! _modelHandle) { + start(5ms); + } + if (_modelHandle && _subscriber.nextGenerationNow()) { + if (auto newModel = _modelHandle->getConfig()) { + LOG(config, "Sentinel got model info [version %s] for %zd hosts [config generation %" PRId64 "]", + newModel->vespaVersion.c_str(), newModel->hosts.size(), _subscriber.getGeneration()); + _modelConfig = std::move(newModel); + } + } +} + +} diff --git a/configd/src/apps/sentinel/model-subscriber.h b/configd/src/apps/sentinel/model-subscriber.h new file mode 100644 index 00000000000..1777e287d4f --- /dev/null +++ b/configd/src/apps/sentinel/model-subscriber.h @@ -0,0 +1,30 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include +#include +#include + +using cloud::config::ModelConfig; + +namespace config::sentinel { + +/** + * Handles config subscription and has a snapshot of current config. + **/ +class ModelSubscriber { +private: + std::string _configId; + config::ConfigSubscriber _subscriber; + config::ConfigHandle::UP _modelHandle; + std::unique_ptr _modelConfig; +public: + ModelSubscriber(const std::string &configId); + virtual ~ModelSubscriber(); + void start(std::chrono::milliseconds timeout); + void checkForUpdates(); + std::optional getModelConfig(); +}; + +} diff --git a/configd/src/apps/sentinel/report-connectivity.cpp b/configd/src/apps/sentinel/report-connectivity.cpp index 3e5d462060f..e23db85f4e2 100644 --- a/configd/src/apps/sentinel/report-connectivity.cpp +++ b/configd/src/apps/sentinel/report-connectivity.cpp @@ -25,25 +25,26 @@ void SinglePing::startCheck(FRT_Supervisor &orb) { check = std::make_unique(*this, peerName, peerPort, orb, 2500); } -ReportConnectivity::ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb) +ReportConnectivity::ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb, ModelSubscriber &modelSubscriber) : _parentRequest(req), _orb(orb), - _result(), - _configFetcher() + _result() { - try { - _configFetcher.subscribe("admin/model", this, 2000ms); - _configFetcher.start(); - return; - } catch (ConfigTimeoutException & ex) { - LOG(warning, "Timeout getting model config: %s [no connectivity report]", ex.getMessage().c_str()); - } catch (InvalidConfigException& ex) { - LOG(warning, "Invalid model config: %s [no connectivity report]", ex.getMessage().c_str()); - } catch (ConfigRuntimeException& ex) { - LOG(warning, "Runtime exception getting model config: %s [no connectivity report]", ex.getMessage().c_str()); + auto cfg = modelSubscriber.getModelConfig(); + if (cfg.has_value()) { + auto map = Connectivity::specsFrom(cfg.value()); + LOG(debug, "making connectivity report for %zd peers", _result.size()); + _remaining = _result.size(); + for (const auto & [ hostname, port ] : map) { + _result.emplace_back(*this, hostname, port); + } + for (auto & peer : _result) { + peer.startCheck(_orb); + } + } else { + _parentRequest->SetError(FRTE_RPC_METHOD_FAILED, "failed getting model config"); + _parentRequest->Return(); } - _parentRequest->SetError(FRTE_RPC_METHOD_FAILED, "failed getting model config"); - _parentRequest->Return(); } ReportConnectivity::~ReportConnectivity() = default; @@ -58,20 +59,6 @@ void ReportConnectivity::requestDone() { finish(); } -void ReportConnectivity::configure(std::unique_ptr config) { - _configFetcher.close(); - auto map = Connectivity::specsFrom(*config); - for (const auto & [ hostname, port ] : map) { - _result.emplace_back(*this, hostname, port); - } - LOG(debug, "making connectivity report for %zd peers", _result.size()); - _remaining = _result.size(); - for (auto & peer : _result) { - peer.startCheck(_orb); - } -} - - void ReportConnectivity::finish() const { FRT_Values *dst = _parentRequest->GetReturn(); FRT_StringValue *pt_hn = dst->AddStringArray(_result.size()); diff --git a/configd/src/apps/sentinel/report-connectivity.h b/configd/src/apps/sentinel/report-connectivity.h index d28368eca6c..ed012e2e07e 100644 --- a/configd/src/apps/sentinel/report-connectivity.h +++ b/configd/src/apps/sentinel/report-connectivity.h @@ -6,6 +6,7 @@ #include #include #include +#include "model-subscriber.h" #include "peer-check.h" #include "status-callback.h" @@ -42,20 +43,17 @@ struct SinglePing : StatusCallback { }; -class ReportConnectivity : public config::IFetcherCallback +class ReportConnectivity { public: - ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb); + ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb, ModelSubscriber &modelSubscriber); ~ReportConnectivity(); void requestDone(); - /** from IFetcherCallback */ - void configure(std::unique_ptr config) override; private: void finish() const; FRT_RPCRequest *_parentRequest; FRT_Supervisor &_orb; std::vector _result; - config::ConfigFetcher _configFetcher; std::mutex _lock; size_t _remaining; }; diff --git a/configd/src/apps/sentinel/rpchooks.cpp b/configd/src/apps/sentinel/rpchooks.cpp index 98386beac3c..a8d0240e6e8 100644 --- a/configd/src/apps/sentinel/rpchooks.cpp +++ b/configd/src/apps/sentinel/rpchooks.cpp @@ -13,9 +13,10 @@ LOG_SETUP(".rpchooks"); namespace config::sentinel { -RPCHooks::RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor) +RPCHooks::RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor, ModelSubscriber &modelSubscriber) : _commands(commands), - _orb(supervisor) + _orb(supervisor), + _modelSubscriber(modelSubscriber) { initRPC(&_orb); } @@ -118,7 +119,7 @@ RPCHooks::rpc_reportConnectivity(FRT_RPCRequest *req) { LOG(debug, "got reportConnectivity"); req->Detach(); - req->getStash().create(req, _orb); + req->getStash().create(req, _orb, _modelSubscriber); } } // namespace slobrok diff --git a/configd/src/apps/sentinel/rpchooks.h b/configd/src/apps/sentinel/rpchooks.h index 5b6cf878f26..badfd560034 100644 --- a/configd/src/apps/sentinel/rpchooks.h +++ b/configd/src/apps/sentinel/rpchooks.h @@ -2,6 +2,7 @@ #pragma once +#include "model-subscriber.h" #include #include @@ -25,8 +26,9 @@ class RPCHooks : public FRT_Invokable private: CommandQueue &_commands; FRT_Supervisor &_orb; + ModelSubscriber &_modelSubscriber; public: - RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor); + RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor, ModelSubscriber &modelSubscriber); ~RPCHooks() override; private: void initRPC(FRT_Supervisor *supervisor); diff --git a/configd/src/apps/sentinel/rpcserver.cpp b/configd/src/apps/sentinel/rpcserver.cpp index 80c3c81c826..18b0dfc3630 100644 --- a/configd/src/apps/sentinel/rpcserver.cpp +++ b/configd/src/apps/sentinel/rpcserver.cpp @@ -7,9 +7,9 @@ LOG_SETUP(".rpcserver"); namespace config::sentinel { -RpcServer::RpcServer(int portNumber, CommandQueue &cmdQ) +RpcServer::RpcServer(int portNumber, CommandQueue &cmdQ, ModelSubscriber &modelSubscriber) : _server(), - _rpcHooks(cmdQ, _server.supervisor()), + _rpcHooks(cmdQ, _server.supervisor(), modelSubscriber), _port(portNumber) { if (_server.supervisor().Listen(portNumber)) { diff --git a/configd/src/apps/sentinel/rpcserver.h b/configd/src/apps/sentinel/rpcserver.h index 4c6dea00ddf..e2c7caaacb1 100644 --- a/configd/src/apps/sentinel/rpcserver.h +++ b/configd/src/apps/sentinel/rpcserver.h @@ -5,6 +5,7 @@ #include #include "cmdq.h" +#include "model-subscriber.h" #include "rpchooks.h" #include @@ -18,7 +19,7 @@ private: int _port; public: - RpcServer(int port, CommandQueue &cmdQ); + RpcServer(int port, CommandQueue &cmdQ, ModelSubscriber &modelSubscriber); ~RpcServer(); int getPort() const { return _port; } -- cgit v1.2.3