aboutsummaryrefslogtreecommitdiffstats
path: root/configd
diff options
context:
space:
mode:
authorArne Juul <arnej@verizonmedia.com>2021-06-11 12:57:24 +0000
committerArne Juul <arnej@verizonmedia.com>2021-06-11 12:57:24 +0000
commit6f8792b0acfa3e417d124b519b6383065a6bfd7a (patch)
tree1aa4920a4dca29c97fc63dfb4cb86c1b61aed2f7 /configd
parente607968dff6eee82f22f8aacc35e31d4afa2d807 (diff)
add separate config subscriber for model
Diffstat (limited to 'configd')
-rw-r--r--configd/src/apps/sentinel/CMakeLists.txt1
-rw-r--r--configd/src/apps/sentinel/config-owner.cpp43
-rw-r--r--configd/src/apps/sentinel/config-owner.h9
-rw-r--r--configd/src/apps/sentinel/connectivity.cpp8
-rw-r--r--configd/src/apps/sentinel/connectivity.h3
-rw-r--r--configd/src/apps/sentinel/env.cpp11
-rw-r--r--configd/src/apps/sentinel/env.h3
-rw-r--r--configd/src/apps/sentinel/model-subscriber.cpp59
-rw-r--r--configd/src/apps/sentinel/model-subscriber.h30
-rw-r--r--configd/src/apps/sentinel/report-connectivity.cpp45
-rw-r--r--configd/src/apps/sentinel/report-connectivity.h8
-rw-r--r--configd/src/apps/sentinel/rpchooks.cpp7
-rw-r--r--configd/src/apps/sentinel/rpchooks.h4
-rw-r--r--configd/src/apps/sentinel/rpcserver.cpp4
-rw-r--r--configd/src/apps/sentinel/rpcserver.h3
15 files changed, 166 insertions, 72 deletions
diff --git a/configd/src/apps/sentinel/CMakeLists.txt b/configd/src/apps/sentinel/CMakeLists.txt
index 0999314c8ce..3818f334ff2 100644
--- a/configd/src/apps/sentinel/CMakeLists.txt
+++ b/configd/src/apps/sentinel/CMakeLists.txt
@@ -9,6 +9,7 @@ vespa_add_executable(configd_config-sentinel_app
line-splitter.cpp
manager.cpp
metrics.cpp
+ model-subscriber.cpp
output-connection.cpp
outward-check.cpp
peer-check.cpp
diff --git a/configd/src/apps/sentinel/config-owner.cpp b/configd/src/apps/sentinel/config-owner.cpp
index 840c5b1add1..074f7187537 100644
--- a/configd/src/apps/sentinel/config-owner.cpp
+++ b/configd/src/apps/sentinel/config-owner.cpp
@@ -10,13 +10,22 @@ LOG_SETUP(".config-owner");
namespace config::sentinel {
-ConfigOwner::ConfigOwner() : _subscriber() {}
+ConfigOwner::ConfigOwner() = default;
ConfigOwner::~ConfigOwner() = default;
void
ConfigOwner::subscribe(const std::string & configId, std::chrono::milliseconds timeout) {
_sentinelHandle = _subscriber.subscribe<SentinelConfig>(configId, timeout);
+ try {
+ _modelHandle =_modelSubscriber.subscribe<ModelConfig>("admin/model", timeout);
+ } catch (ConfigTimeoutException & ex) {
+ LOG(warning, "Timeout getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
+ } catch (InvalidConfigException& ex) {
+ LOG(warning, "Invalid model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
+ } catch (ConfigRuntimeException& ex) {
+ LOG(warning, "Runtime exception getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
+ }
}
void
@@ -29,6 +38,7 @@ ConfigOwner::doConfigure()
const auto & app = config.application;
LOG(config, "Sentinel got %zd service elements [tenant(%s), application(%s), instance(%s)] for config generation %" PRId64,
config.service.size(), app.tenant.c_str(), app.name.c_str(), app.instance.c_str(), _currGeneration);
+ getModelConfig();
}
@@ -42,29 +52,20 @@ ConfigOwner::checkForConfigUpdate() {
return false;
}
-std::unique_ptr<ModelConfig>
-ConfigOwner::fetchModelConfig(std::chrono::milliseconds timeout)
-{
- std::unique_ptr<ModelConfig> modelConfig;
- ConfigSubscriber tempSubscriber;
- try {
- ConfigHandle<ModelConfig>::UP modelHandle =
- tempSubscriber.subscribe<ModelConfig>("admin/model", timeout);
- if (tempSubscriber.nextGenerationNow()) {
- modelConfig = modelHandle->getConfig();
+std::optional<ModelConfig>
+ConfigOwner::getModelConfig() {
+ if (_modelHandle && _modelSubscriber.nextGenerationNow()) {
+ if (auto newModel = _modelHandle->getConfig()) {
LOG(config, "Sentinel got model info [version %s] for %zd hosts [config generation %" PRId64 "]",
- modelConfig->vespaVersion.c_str(), modelConfig->hosts.size(),
- tempSubscriber.getGeneration());
+ newModel->vespaVersion.c_str(), newModel->hosts.size(), _modelSubscriber.getGeneration());
+ _modelConfig = std::move(newModel);
}
- } catch (ConfigTimeoutException & ex) {
- LOG(warning, "Timeout getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
- } catch (InvalidConfigException& ex) {
- LOG(warning, "Invalid model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
- } catch (ConfigRuntimeException& ex) {
- LOG(warning, "Runtime exception getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
-
}
- return modelConfig;
+ if (_modelConfig) {
+ return ModelConfig(*_modelConfig);
+ } else {
+ return {};
+ }
}
}
diff --git a/configd/src/apps/sentinel/config-owner.h b/configd/src/apps/sentinel/config-owner.h
index 2850e6b3904..7b79f65b1de 100644
--- a/configd/src/apps/sentinel/config-owner.h
+++ b/configd/src/apps/sentinel/config-owner.h
@@ -5,6 +5,7 @@
#include <vespa/config-sentinel.h>
#include <vespa/config-model.h>
#include <vespa/config/config.h>
+#include <optional>
using cloud::config::SentinelConfig;
using cloud::config::ModelConfig;
@@ -21,10 +22,14 @@ class ConfigOwner {
private:
ConfigSubscriber _subscriber;
ConfigHandle<SentinelConfig>::UP _sentinelHandle;
-
+
int64_t _currGeneration = -1;
std::unique_ptr<SentinelConfig> _currConfig;
+ ConfigSubscriber _modelSubscriber;
+ ConfigHandle<ModelConfig>::UP _modelHandle;
+ std::unique_ptr<ModelConfig> _modelConfig;
+
ConfigOwner(const ConfigOwner&) = delete;
ConfigOwner& operator =(const ConfigOwner&) = delete;
@@ -37,7 +42,7 @@ public:
bool hasConfig() const { return _currConfig.get() != nullptr; }
const SentinelConfig& getConfig() const { return *_currConfig; }
int64_t getGeneration() const { return _currGeneration; }
- static std::unique_ptr<ModelConfig> fetchModelConfig(std::chrono::milliseconds timeout);
+ std::optional<ModelConfig> getModelConfig();
};
}
diff --git a/configd/src/apps/sentinel/connectivity.cpp b/configd/src/apps/sentinel/connectivity.cpp
index 12b645dd589..7ea548ea73d 100644
--- a/configd/src/apps/sentinel/connectivity.cpp
+++ b/configd/src/apps/sentinel/connectivity.cpp
@@ -118,13 +118,13 @@ SpecMap Connectivity::specsFrom(const ModelConfig &model) {
return checkSpecs;
}
-void Connectivity::configure(const SentinelConfig::Connectivity &config) {
+void Connectivity::configure(const SentinelConfig::Connectivity &config,
+ const ModelConfig &model)
+{
_config = config;
LOG(config, "connectivity.maxBadReverseCount = %d", _config.maxBadReverseCount);
LOG(config, "connectivity.maxBadOutPercent = %d", _config.maxBadOutPercent);
- if (auto up = ConfigOwner::fetchModelConfig(MODEL_TIMEOUT_MS)) {
- _checkSpecs = specsFrom(*up);
- }
+ _checkSpecs = specsFrom(model);
}
bool
diff --git a/configd/src/apps/sentinel/connectivity.h b/configd/src/apps/sentinel/connectivity.h
index 440d7df84c0..2ba17f5c07c 100644
--- a/configd/src/apps/sentinel/connectivity.h
+++ b/configd/src/apps/sentinel/connectivity.h
@@ -24,7 +24,8 @@ public:
Connectivity();
~Connectivity();
- void configure(const SentinelConfig::Connectivity &config);
+ void configure(const SentinelConfig::Connectivity &config,
+ const ModelConfig &model);
bool checkConnectivity(RpcServer &rpcServer);
static SpecMap specsFrom(const ModelConfig &model);
private:
diff --git a/configd/src/apps/sentinel/env.cpp b/configd/src/apps/sentinel/env.cpp
index 5bbbfd8f0bd..58b917ca16e 100644
--- a/configd/src/apps/sentinel/env.cpp
+++ b/configd/src/apps/sentinel/env.cpp
@@ -36,6 +36,7 @@ constexpr int maxConnectivityRetries = 100;
Env::Env()
: _cfgOwner(),
+ _modelSubscriber("admin/model"),
_rpcCommandQueue(),
_rpcServer(),
_stateApi(),
@@ -52,6 +53,7 @@ Env::~Env() = default;
void Env::boot(const std::string &configId) {
LOG(debug, "Reading configuration for ID: %s", configId.c_str());
_cfgOwner.subscribe(configId, CONFIG_TIMEOUT_MS);
+ _modelSubscriber.start(CONFIG_TIMEOUT_MS);
// subscribe() should throw if something is not OK
Connectivity checker;
for (int retry = 0; retry < maxConnectivityRetries; ++retry) {
@@ -64,7 +66,10 @@ void Env::boot(const std::string &configId) {
configId.c_str(), cfg.port.telnet, cfg.port.rpc);
rpcPort(cfg.port.rpc);
statePort(cfg.port.telnet);
- checker.configure(cfg.connectivity);
+ auto model = _modelSubscriber.getModelConfig();
+ if (model.has_value()) {
+ checker.configure(cfg.connectivity, model.value());
+ }
}
if (checker.checkConnectivity(*_rpcServer)) {
_stateApi.myHealth.setOk();
@@ -94,7 +99,7 @@ void Env::rpcPort(int port) {
if (_rpcServer && port == _rpcServer->getPort()) {
return; // ok already
}
- _rpcServer = std::make_unique<RpcServer>(port, _rpcCommandQueue);
+ _rpcServer = std::make_unique<RpcServer>(port, _rpcCommandQueue, _modelSubscriber);
}
void Env::statePort(int port) {
@@ -116,7 +121,7 @@ void Env::statePort(int port) {
void Env::notifyConfigUpdated() {
vespalib::ComponentConfigProducer::Config current("sentinel", _cfgOwner.getGeneration(), "ok");
_stateApi.myComponents.addConfig(current);
-
+ _modelSubscriber.checkForUpdates();
}
void Env::respondAsEmpty() {
diff --git a/configd/src/apps/sentinel/env.h b/configd/src/apps/sentinel/env.h
index f71fb537068..9319caa4bd2 100644
--- a/configd/src/apps/sentinel/env.h
+++ b/configd/src/apps/sentinel/env.h
@@ -5,6 +5,7 @@
#include "cmdq.h"
#include "config-owner.h"
#include "metrics.h"
+#include "model-subscriber.h"
#include "rpcserver.h"
#include "state-api.h"
#include <vespa/vespalib/net/state_server.h>
@@ -22,6 +23,7 @@ public:
~Env();
ConfigOwner &configOwner() { return _cfgOwner; }
+ ModelSubscriber &modelSubscriber() { return _modelSubscriber; }
CommandQueue &commandQueue() { return _rpcCommandQueue; }
StartMetrics &metrics() { return _startMetrics; }
@@ -33,6 +35,7 @@ public:
private:
void respondAsEmpty();
ConfigOwner _cfgOwner;
+ ModelSubscriber _modelSubscriber;
CommandQueue _rpcCommandQueue;
std::unique_ptr<RpcServer> _rpcServer;
StateApi _stateApi;
diff --git a/configd/src/apps/sentinel/model-subscriber.cpp b/configd/src/apps/sentinel/model-subscriber.cpp
new file mode 100644
index 00000000000..f603b5fcb1b
--- /dev/null
+++ b/configd/src/apps/sentinel/model-subscriber.cpp
@@ -0,0 +1,59 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "model-subscriber.h"
+#include <vespa/vespalib/util/exceptions.h>
+#include <vespa/config/common/exceptions.h>
+#include <string>
+#include <chrono>
+#include <vespa/log/log.h>
+
+LOG_SETUP(".sentinel.model-subscriber");
+
+using namespace std::chrono_literals;
+
+namespace config::sentinel {
+
+std::optional<ModelConfig> ModelSubscriber::getModelConfig() {
+ checkForUpdates();
+ if (_modelConfig) {
+ return ModelConfig(*_modelConfig);
+ } else {
+ return {};
+ }
+}
+
+
+ModelSubscriber::ModelSubscriber(const std::string &configId)
+ : _configId(configId)
+{}
+
+ModelSubscriber::~ModelSubscriber() = default;
+
+void
+ModelSubscriber::start(std::chrono::milliseconds timeout) {
+ try {
+ _modelHandle =_subscriber.subscribe<ModelConfig>(_configId, timeout);
+ } catch (ConfigTimeoutException & ex) {
+ LOG(warning, "Timeout getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
+ } catch (InvalidConfigException& ex) {
+ LOG(warning, "Invalid model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
+ } catch (ConfigRuntimeException& ex) {
+ LOG(warning, "Runtime exception getting model config: %s [skipping connectivity checks]", ex.getMessage().c_str());
+ }
+}
+
+void
+ModelSubscriber::checkForUpdates() {
+ if (! _modelHandle) {
+ start(5ms);
+ }
+ if (_modelHandle && _subscriber.nextGenerationNow()) {
+ if (auto newModel = _modelHandle->getConfig()) {
+ LOG(config, "Sentinel got model info [version %s] for %zd hosts [config generation %" PRId64 "]",
+ newModel->vespaVersion.c_str(), newModel->hosts.size(), _subscriber.getGeneration());
+ _modelConfig = std::move(newModel);
+ }
+ }
+}
+
+}
diff --git a/configd/src/apps/sentinel/model-subscriber.h b/configd/src/apps/sentinel/model-subscriber.h
new file mode 100644
index 00000000000..1777e287d4f
--- /dev/null
+++ b/configd/src/apps/sentinel/model-subscriber.h
@@ -0,0 +1,30 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/config-model.h>
+#include <vespa/config/config.h>
+#include <optional>
+
+using cloud::config::ModelConfig;
+
+namespace config::sentinel {
+
+/**
+ * Handles config subscription and has a snapshot of current config.
+ **/
+class ModelSubscriber {
+private:
+ std::string _configId;
+ config::ConfigSubscriber _subscriber;
+ config::ConfigHandle<ModelConfig>::UP _modelHandle;
+ std::unique_ptr<ModelConfig> _modelConfig;
+public:
+ ModelSubscriber(const std::string &configId);
+ virtual ~ModelSubscriber();
+ void start(std::chrono::milliseconds timeout);
+ void checkForUpdates();
+ std::optional<ModelConfig> getModelConfig();
+};
+
+}
diff --git a/configd/src/apps/sentinel/report-connectivity.cpp b/configd/src/apps/sentinel/report-connectivity.cpp
index 3e5d462060f..e23db85f4e2 100644
--- a/configd/src/apps/sentinel/report-connectivity.cpp
+++ b/configd/src/apps/sentinel/report-connectivity.cpp
@@ -25,25 +25,26 @@ void SinglePing::startCheck(FRT_Supervisor &orb) {
check = std::make_unique<PeerCheck>(*this, peerName, peerPort, orb, 2500);
}
-ReportConnectivity::ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb)
+ReportConnectivity::ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb, ModelSubscriber &modelSubscriber)
: _parentRequest(req),
_orb(orb),
- _result(),
- _configFetcher()
+ _result()
{
- try {
- _configFetcher.subscribe<ModelConfig>("admin/model", this, 2000ms);
- _configFetcher.start();
- return;
- } catch (ConfigTimeoutException & ex) {
- LOG(warning, "Timeout getting model config: %s [no connectivity report]", ex.getMessage().c_str());
- } catch (InvalidConfigException& ex) {
- LOG(warning, "Invalid model config: %s [no connectivity report]", ex.getMessage().c_str());
- } catch (ConfigRuntimeException& ex) {
- LOG(warning, "Runtime exception getting model config: %s [no connectivity report]", ex.getMessage().c_str());
+ auto cfg = modelSubscriber.getModelConfig();
+ if (cfg.has_value()) {
+ auto map = Connectivity::specsFrom(cfg.value());
+ LOG(debug, "making connectivity report for %zd peers", _result.size());
+ _remaining = _result.size();
+ for (const auto & [ hostname, port ] : map) {
+ _result.emplace_back(*this, hostname, port);
+ }
+ for (auto & peer : _result) {
+ peer.startCheck(_orb);
+ }
+ } else {
+ _parentRequest->SetError(FRTE_RPC_METHOD_FAILED, "failed getting model config");
+ _parentRequest->Return();
}
- _parentRequest->SetError(FRTE_RPC_METHOD_FAILED, "failed getting model config");
- _parentRequest->Return();
}
ReportConnectivity::~ReportConnectivity() = default;
@@ -58,20 +59,6 @@ void ReportConnectivity::requestDone() {
finish();
}
-void ReportConnectivity::configure(std::unique_ptr<ModelConfig> config) {
- _configFetcher.close();
- auto map = Connectivity::specsFrom(*config);
- for (const auto & [ hostname, port ] : map) {
- _result.emplace_back(*this, hostname, port);
- }
- LOG(debug, "making connectivity report for %zd peers", _result.size());
- _remaining = _result.size();
- for (auto & peer : _result) {
- peer.startCheck(_orb);
- }
-}
-
-
void ReportConnectivity::finish() const {
FRT_Values *dst = _parentRequest->GetReturn();
FRT_StringValue *pt_hn = dst->AddStringArray(_result.size());
diff --git a/configd/src/apps/sentinel/report-connectivity.h b/configd/src/apps/sentinel/report-connectivity.h
index d28368eca6c..ed012e2e07e 100644
--- a/configd/src/apps/sentinel/report-connectivity.h
+++ b/configd/src/apps/sentinel/report-connectivity.h
@@ -6,6 +6,7 @@
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/config-model.h>
#include <vespa/config/helper/configfetcher.h>
+#include "model-subscriber.h"
#include "peer-check.h"
#include "status-callback.h"
@@ -42,20 +43,17 @@ struct SinglePing : StatusCallback {
};
-class ReportConnectivity : public config::IFetcherCallback<cloud::config::ModelConfig>
+class ReportConnectivity
{
public:
- ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb);
+ ReportConnectivity(FRT_RPCRequest *req, FRT_Supervisor &orb, ModelSubscriber &modelSubscriber);
~ReportConnectivity();
void requestDone();
- /** from IFetcherCallback */
- void configure(std::unique_ptr<cloud::config::ModelConfig> config) override;
private:
void finish() const;
FRT_RPCRequest *_parentRequest;
FRT_Supervisor &_orb;
std::vector<SinglePing> _result;
- config::ConfigFetcher _configFetcher;
std::mutex _lock;
size_t _remaining;
};
diff --git a/configd/src/apps/sentinel/rpchooks.cpp b/configd/src/apps/sentinel/rpchooks.cpp
index 98386beac3c..a8d0240e6e8 100644
--- a/configd/src/apps/sentinel/rpchooks.cpp
+++ b/configd/src/apps/sentinel/rpchooks.cpp
@@ -13,9 +13,10 @@ LOG_SETUP(".rpchooks");
namespace config::sentinel {
-RPCHooks::RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor)
+RPCHooks::RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor, ModelSubscriber &modelSubscriber)
: _commands(commands),
- _orb(supervisor)
+ _orb(supervisor),
+ _modelSubscriber(modelSubscriber)
{
initRPC(&_orb);
}
@@ -118,7 +119,7 @@ RPCHooks::rpc_reportConnectivity(FRT_RPCRequest *req)
{
LOG(debug, "got reportConnectivity");
req->Detach();
- req->getStash().create<ReportConnectivity>(req, _orb);
+ req->getStash().create<ReportConnectivity>(req, _orb, _modelSubscriber);
}
} // namespace slobrok
diff --git a/configd/src/apps/sentinel/rpchooks.h b/configd/src/apps/sentinel/rpchooks.h
index 5b6cf878f26..badfd560034 100644
--- a/configd/src/apps/sentinel/rpchooks.h
+++ b/configd/src/apps/sentinel/rpchooks.h
@@ -2,6 +2,7 @@
#pragma once
+#include "model-subscriber.h"
#include <vespa/fnet/frt/invokable.h>
#include <memory>
@@ -25,8 +26,9 @@ class RPCHooks : public FRT_Invokable
private:
CommandQueue &_commands;
FRT_Supervisor &_orb;
+ ModelSubscriber &_modelSubscriber;
public:
- RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor);
+ RPCHooks(CommandQueue &commands, FRT_Supervisor &supervisor, ModelSubscriber &modelSubscriber);
~RPCHooks() override;
private:
void initRPC(FRT_Supervisor *supervisor);
diff --git a/configd/src/apps/sentinel/rpcserver.cpp b/configd/src/apps/sentinel/rpcserver.cpp
index 80c3c81c826..18b0dfc3630 100644
--- a/configd/src/apps/sentinel/rpcserver.cpp
+++ b/configd/src/apps/sentinel/rpcserver.cpp
@@ -7,9 +7,9 @@ LOG_SETUP(".rpcserver");
namespace config::sentinel {
-RpcServer::RpcServer(int portNumber, CommandQueue &cmdQ)
+RpcServer::RpcServer(int portNumber, CommandQueue &cmdQ, ModelSubscriber &modelSubscriber)
: _server(),
- _rpcHooks(cmdQ, _server.supervisor()),
+ _rpcHooks(cmdQ, _server.supervisor(), modelSubscriber),
_port(portNumber)
{
if (_server.supervisor().Listen(portNumber)) {
diff --git a/configd/src/apps/sentinel/rpcserver.h b/configd/src/apps/sentinel/rpcserver.h
index 4c6dea00ddf..e2c7caaacb1 100644
--- a/configd/src/apps/sentinel/rpcserver.h
+++ b/configd/src/apps/sentinel/rpcserver.h
@@ -5,6 +5,7 @@
#include <memory>
#include "cmdq.h"
+#include "model-subscriber.h"
#include "rpchooks.h"
#include <vespa/fnet/frt/supervisor.h>
@@ -18,7 +19,7 @@ private:
int _port;
public:
- RpcServer(int port, CommandQueue &cmdQ);
+ RpcServer(int port, CommandQueue &cmdQ, ModelSubscriber &modelSubscriber);
~RpcServer();
int getPort() const { return _port; }