diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-23 15:49:59 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2022-02-23 16:28:19 +0000 |
commit | 04596050bc494e338696cae4cc3f68771424c275 (patch) | |
tree | 97926f768c5705b5e86cf5946a489ef3bd679674 /documentapi/src/vespa | |
parent | 09039a14a0f27e7d3dadfc22cb72a8d0d10f9d65 (diff) |
Ensure that internal state of ExternSlobrokPolicy and ExternPolicy is consistent with respect to transport, supervisor, and mirror.
Diffstat (limited to 'documentapi/src/vespa')
6 files changed, 62 insertions, 88 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt index 1500403d463..ed4f44b991a 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt +++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt @@ -2,18 +2,19 @@ vespa_add_library(documentapi_documentapipolicies OBJECT SOURCES andpolicy.cpp - externslobrokpolicy.cpp + asyncinitializationpolicy.cpp contentpolicy.cpp - messagetypepolicy.cpp + externslobrokpolicy.cpp documentrouteselectorpolicy.cpp errorpolicy.cpp externpolicy.cpp + loadbalancer.cpp + loadbalancerpolicy.cpp localservicepolicy.cpp + messagetypepolicy.cpp + mirror_with_all.cpp roundrobinpolicy.cpp subsetservicepolicy.cpp - loadbalancer.cpp - loadbalancerpolicy.cpp - asyncinitializationpolicy.cpp DEPENDS ) vespa_generate_config(documentapi_documentapipolicies ../../../../main/resources/configdefinitions/document-protocol-policies.def) diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp index 39e67408e76..5fded6a6266 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp @@ -63,10 +63,10 @@ string ContentPolicy::init() using storage::lib::Distribution; config::ConfigUri uri(_clusterConfigId); - if (!_configSources.empty()) { - _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources))); + if (!configSources().empty()) { + _configFetcher = std::make_unique<config::ConfigFetcher>(config::ServerSpec(configSources())); } else { - _configFetcher.reset(new config::ConfigFetcher(uri.getContext())); + _configFetcher = std::make_unique<config::ConfigFetcher>(uri.getContext()); } _callBack = std::make_unique<CallBack>(*this); _configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get())); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp index ef1d795bec4..3bb11604300 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "externpolicy.h" -#include <boost/tokenizer.hpp> +#include "mirror_with_all.h" +#include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/documentapi/messagebus/documentprotocol.h> #include <vespa/vespalib/util/stringfmt.h> #include <vespa/slobrok/sbmirror.h> @@ -14,17 +15,11 @@ LOG_SETUP(".externpolicy"); using slobrok::api::IMirrorAPI; using slobrok::api::MirrorAPI; -typedef boost::char_separator<char> Separator; -typedef boost::tokenizer<Separator> Tokenizer; - namespace documentapi { ExternPolicy::ExternPolicy(const string ¶m) : _lock(), - _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), - _transport(std::make_unique<FNET_Transport>()), - _orb(std::make_unique<FRT_Supervisor>(_transport.get())), - _mirror(), + _mirrorWithAll(), _pattern(), _session(), _error("Not initialized."), @@ -46,11 +41,10 @@ ExternPolicy::ExternPolicy(const string ¶m) : // Activate supervisor and register mirror. MirrorAPI::StringList spec; - string lst = param.substr(0, pos); - std::string stdlst(lst); - Tokenizer tokens(stdlst, Separator(",")); - for (Tokenizer::iterator it = tokens.begin(); it != tokens.end(); ++it) { - spec.push_back(*it); + vespalib::string lst = param.substr(0, pos); + vespalib::StringTokenizer slobrokList(lst, ","); + for (uint32_t j = 0; j < slobrokList.size(); j++) { + spec.push_back(slobrokList[j]); } if (spec.empty()) { @@ -59,14 +53,9 @@ ExternPolicy::ExternPolicy(const string ¶m) : } slobrok::ConfiguratorFactory config(spec); - _mirror = std::make_unique<MirrorAPI>(*_orb, config); - _started = _transport->Start(_threadPool.get()); - if (!_started) { - _error = "Failed to start FNET supervisor."; - return; - } else { - LOG(debug, "Connecting to extern slobrok mirror '%s'..", lst.c_str()); - } + _mirrorWithAll = std::make_unique<MirrorAndStuff>(config); + LOG(debug, "Connecting to extern slobrok mirror '%s'..", lst.c_str()); + // Parse query pattern. _pattern = param.substr(pos + 1); @@ -81,12 +70,11 @@ ExternPolicy::ExternPolicy(const string ¶m) : _error.clear(); } -ExternPolicy::~ExternPolicy() -{ - _mirror.reset(); - if (_started) { - _transport->ShutDown(true); - } +ExternPolicy::~ExternPolicy() = default; + +const IMirrorAPI * +ExternPolicy::getMirror() const { + return _mirrorWithAll ? _mirrorWithAll->mirror() : nullptr; } void @@ -94,7 +82,7 @@ ExternPolicy::select(mbus::RoutingContext &ctx) { if (!_error.empty()) { ctx.setError(DocumentProtocol::ERROR_POLICY_FAILURE, _error); - } else if (_mirror->ready()) { + } else if (_mirrorWithAll->mirror()->ready()) { mbus::Hop hop = getRecipient(); if (hop.hasDirectives()) { mbus::Route route = ctx.getRoute(); @@ -130,15 +118,14 @@ ExternPolicy::getRecipient() void ExternPolicy::update() { - uint32_t upd = _mirror->updates(); + uint32_t upd = _mirrorWithAll->mirror()->updates(); if (_gen != upd) { _gen = upd; _recipients.clear(); - IMirrorAPI::SpecList entries = _mirror->lookup(_pattern); + IMirrorAPI::SpecList entries = _mirrorWithAll->mirror()->lookup(_pattern); if (!entries.empty()) { - for (const auto & spec : entries) - { + for (const auto & spec : entries) { _recipients.push_back(mbus::Hop::parse(spec.second + _session)); } } diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h index 3bc229c4087..e20e34d2f77 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h @@ -7,12 +7,10 @@ #include <vespa/documentapi/common.h> #include <mutex> -class FRT_Supervisor; -class FNET_Transport; -class FastOS_ThreadPool; - namespace documentapi { +class MirrorAndStuff; + /** * This policy implements the necessary logic to communicate with an external Vespa application and resolve its list of * recipients using that other application's slobrok servers. @@ -20,11 +18,8 @@ namespace documentapi { class ExternPolicy : public mbus::IRoutingPolicy { private: using IMirrorAPI = slobrok::api::IMirrorAPI; - std::mutex _lock; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - std::unique_ptr<IMirrorAPI> _mirror; + std::mutex _lock; + std::unique_ptr<MirrorAndStuff> _mirrorWithAll; string _pattern; string _session; string _error; @@ -70,7 +65,7 @@ public: * * @return The mirror pointer. */ - slobrok::api::IMirrorAPI &getMirror() { return *_mirror; } + const slobrok::api::IMirrorAPI *getMirror() const; void select(mbus::RoutingContext &ctx) override; void merge(mbus::RoutingContext &ctx) override; }; diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp index 298df88f75c..34d2b6d3369 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp @@ -1,9 +1,11 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "externslobrokpolicy.h" +#include "mirror_with_all.h" #include <vespa/messagebus/routing/routingcontext.h> #include <vespa/config/common/configcontext.h> #include <vespa/vespalib/text/stringtokenizer.h> +#include <vespa/vespalib/util/size_literals.h> #include <vespa/slobrok/sbmirror.h> #include <vespa/fnet/frt/supervisor.h> #include <vespa/fnet/transport.h> @@ -12,15 +14,14 @@ using slobrok::api::IMirrorAPI; using slobrok::api::MirrorAPI; +using slobrok::ConfiguratorFactory; namespace documentapi { ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param) : AsyncInitializationPolicy(param), _firstTry(true), - _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)), - _transport(std::make_unique<FNET_Transport>()), - _orb(std::make_unique<FRT_Supervisor>(_transport.get())), + _mirrorWithAll(), _slobrokConfigId("client") { if (param.find("config") != param.end()) { @@ -46,29 +47,24 @@ ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param) } } -ExternSlobrokPolicy::~ExternSlobrokPolicy() -{ - bool started = (bool)_mirror; - _mirror.reset(); - if (started) { - _transport->ShutDown(true); - } +const IMirrorAPI* +ExternSlobrokPolicy::getMirror() const { + return _mirrorWithAll ? _mirrorWithAll->mirror() : nullptr; } +ExternSlobrokPolicy::~ExternSlobrokPolicy() = default; + string ExternSlobrokPolicy::init() { + std::lock_guard guard(_lock); if (_slobroks.size() != 0) { - slobrok::ConfiguratorFactory config(_slobroks); - _mirror = std::make_unique<MirrorAPI>(*_orb, config); + ConfiguratorFactory config(_slobroks); + _mirrorWithAll = std::make_unique<MirrorAndStuff>(config); } else if (_configSources.size() != 0) { - slobrok::ConfiguratorFactory config( - config::ConfigUri(_slobrokConfigId, - std::make_shared<config::ConfigContext>(config::ServerSpec(_configSources)))); - _mirror = std::make_unique<MirrorAPI>(*_orb, config); - } - - if (_mirror.get()) { - _transport->Start(_threadPool.get()); + ConfiguratorFactory config( + config::ConfigUri(_slobrokConfigId, + std::make_shared<config::ConfigContext>(config::ServerSpec(_configSources)))); + _mirrorWithAll = std::make_unique<MirrorAndStuff>(config); } return ""; @@ -78,7 +74,8 @@ IMirrorAPI::SpecList ExternSlobrokPolicy::lookup(mbus::RoutingContext& context, const string& pattern) { std::lock_guard guard(_lock); - const IMirrorAPI& mirror(_mirror.get()? *_mirror : context.getMirror()); + const IMirrorAPI * myMirror = getMirror(); + const IMirrorAPI& mirror(myMirror ? *myMirror : context.getMirror()); IMirrorAPI::SpecList entries = mirror.lookup(pattern); diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h index aaf009e5eda..5e0332c4a40 100644 --- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h +++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h @@ -2,16 +2,12 @@ #pragma once #include "asyncinitializationpolicy.h" -#include <vespa/config-slobroks.h> +#include "mirror_with_all.h" #include <vespa/vdslib/distribution/distribution.h> #include <vespa/slobrok/imirrorapi.h> #include <vespa/documentapi/common.h> #include <vespa/config/subscription/sourcespec.h> -class FRT_Supervisor; -class FNET_Transport; -class FastOS_ThreadPool; - namespace documentapi { /** @@ -20,17 +16,6 @@ namespace documentapi { */ class ExternSlobrokPolicy : public AsyncInitializationPolicy { -protected: - bool _firstTry; - config::ServerSpec::HostSpecList _configSources; - std::mutex _lock; - std::unique_ptr<FastOS_ThreadPool> _threadPool; - std::unique_ptr<FNET_Transport> _transport; - std::unique_ptr<FRT_Supervisor> _orb; - std::unique_ptr<slobrok::api::IMirrorAPI> _mirror; - std::vector<std::string> _slobroks; - string _slobrokConfigId; - public: ExternSlobrokPolicy(const std::map<string, string>& params); ~ExternSlobrokPolicy() override; @@ -39,9 +24,18 @@ public: * @return a pointer to the slobrok mirror owned by this policy, if any. * If the policy uses the default mirror API, NULL is returned. */ - const slobrok::api::IMirrorAPI* getMirror() const { return _mirror.get(); } + const slobrok::api::IMirrorAPI* getMirror() const; slobrok::api::IMirrorAPI::SpecList lookup(mbus::RoutingContext &context, const string& pattern); string init() override; + const config::ServerSpec::HostSpecList & configSources() const { return _configSources; } + +private: + bool _firstTry; + config::ServerSpec::HostSpecList _configSources; + std::mutex _lock; + std::unique_ptr<MirrorAndStuff> _mirrorWithAll; + std::vector<std::string> _slobroks; + string _slobrokConfigId; }; } |