aboutsummaryrefslogtreecommitdiffstats
path: root/documentapi/src/vespa
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2022-02-23 15:49:59 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2022-02-23 16:28:19 +0000
commit04596050bc494e338696cae4cc3f68771424c275 (patch)
tree97926f768c5705b5e86cf5946a489ef3bd679674 /documentapi/src/vespa
parent09039a14a0f27e7d3dadfc22cb72a8d0d10f9d65 (diff)
Ensure that internal state of ExternSlobrokPolicy and ExternPolicy is consistent with respect to transport, supervisor, and mirror.
Diffstat (limited to 'documentapi/src/vespa')
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt11
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp6
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp51
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h15
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp39
-rw-r--r--documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h28
6 files changed, 62 insertions, 88 deletions
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
index 1500403d463..ed4f44b991a 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/CMakeLists.txt
@@ -2,18 +2,19 @@
vespa_add_library(documentapi_documentapipolicies OBJECT
SOURCES
andpolicy.cpp
- externslobrokpolicy.cpp
+ asyncinitializationpolicy.cpp
contentpolicy.cpp
- messagetypepolicy.cpp
+ externslobrokpolicy.cpp
documentrouteselectorpolicy.cpp
errorpolicy.cpp
externpolicy.cpp
+ loadbalancer.cpp
+ loadbalancerpolicy.cpp
localservicepolicy.cpp
+ messagetypepolicy.cpp
+ mirror_with_all.cpp
roundrobinpolicy.cpp
subsetservicepolicy.cpp
- loadbalancer.cpp
- loadbalancerpolicy.cpp
- asyncinitializationpolicy.cpp
DEPENDS
)
vespa_generate_config(documentapi_documentapipolicies ../../../../main/resources/configdefinitions/document-protocol-policies.def)
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
index 39e67408e76..5fded6a6266 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/contentpolicy.cpp
@@ -63,10 +63,10 @@ string ContentPolicy::init()
using storage::lib::Distribution;
config::ConfigUri uri(_clusterConfigId);
- if (!_configSources.empty()) {
- _configFetcher.reset(new config::ConfigFetcher(config::ServerSpec(_configSources)));
+ if (!configSources().empty()) {
+ _configFetcher = std::make_unique<config::ConfigFetcher>(config::ServerSpec(configSources()));
} else {
- _configFetcher.reset(new config::ConfigFetcher(uri.getContext()));
+ _configFetcher = std::make_unique<config::ConfigFetcher>(uri.getContext());
}
_callBack = std::make_unique<CallBack>(*this);
_configFetcher->subscribe<vespa::config::content::StorDistributionConfig>(uri.getConfigId(), static_cast<CallBack *>(_callBack.get()));
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
index ef1d795bec4..3bb11604300 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "externpolicy.h"
-#include <boost/tokenizer.hpp>
+#include "mirror_with_all.h"
+#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/documentapi/messagebus/documentprotocol.h>
#include <vespa/vespalib/util/stringfmt.h>
#include <vespa/slobrok/sbmirror.h>
@@ -14,17 +15,11 @@ LOG_SETUP(".externpolicy");
using slobrok::api::IMirrorAPI;
using slobrok::api::MirrorAPI;
-typedef boost::char_separator<char> Separator;
-typedef boost::tokenizer<Separator> Tokenizer;
-
namespace documentapi {
ExternPolicy::ExternPolicy(const string &param) :
_lock(),
- _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
- _transport(std::make_unique<FNET_Transport>()),
- _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
- _mirror(),
+ _mirrorWithAll(),
_pattern(),
_session(),
_error("Not initialized."),
@@ -46,11 +41,10 @@ ExternPolicy::ExternPolicy(const string &param) :
// Activate supervisor and register mirror.
MirrorAPI::StringList spec;
- string lst = param.substr(0, pos);
- std::string stdlst(lst);
- Tokenizer tokens(stdlst, Separator(","));
- for (Tokenizer::iterator it = tokens.begin(); it != tokens.end(); ++it) {
- spec.push_back(*it);
+ vespalib::string lst = param.substr(0, pos);
+ vespalib::StringTokenizer slobrokList(lst, ",");
+ for (uint32_t j = 0; j < slobrokList.size(); j++) {
+ spec.push_back(slobrokList[j]);
}
if (spec.empty()) {
@@ -59,14 +53,9 @@ ExternPolicy::ExternPolicy(const string &param) :
}
slobrok::ConfiguratorFactory config(spec);
- _mirror = std::make_unique<MirrorAPI>(*_orb, config);
- _started = _transport->Start(_threadPool.get());
- if (!_started) {
- _error = "Failed to start FNET supervisor.";
- return;
- } else {
- LOG(debug, "Connecting to extern slobrok mirror '%s'..", lst.c_str());
- }
+ _mirrorWithAll = std::make_unique<MirrorAndStuff>(config);
+ LOG(debug, "Connecting to extern slobrok mirror '%s'..", lst.c_str());
+
// Parse query pattern.
_pattern = param.substr(pos + 1);
@@ -81,12 +70,11 @@ ExternPolicy::ExternPolicy(const string &param) :
_error.clear();
}
-ExternPolicy::~ExternPolicy()
-{
- _mirror.reset();
- if (_started) {
- _transport->ShutDown(true);
- }
+ExternPolicy::~ExternPolicy() = default;
+
+const IMirrorAPI *
+ExternPolicy::getMirror() const {
+ return _mirrorWithAll ? _mirrorWithAll->mirror() : nullptr;
}
void
@@ -94,7 +82,7 @@ ExternPolicy::select(mbus::RoutingContext &ctx)
{
if (!_error.empty()) {
ctx.setError(DocumentProtocol::ERROR_POLICY_FAILURE, _error);
- } else if (_mirror->ready()) {
+ } else if (_mirrorWithAll->mirror()->ready()) {
mbus::Hop hop = getRecipient();
if (hop.hasDirectives()) {
mbus::Route route = ctx.getRoute();
@@ -130,15 +118,14 @@ ExternPolicy::getRecipient()
void
ExternPolicy::update()
{
- uint32_t upd = _mirror->updates();
+ uint32_t upd = _mirrorWithAll->mirror()->updates();
if (_gen != upd) {
_gen = upd;
_recipients.clear();
- IMirrorAPI::SpecList entries = _mirror->lookup(_pattern);
+ IMirrorAPI::SpecList entries = _mirrorWithAll->mirror()->lookup(_pattern);
if (!entries.empty()) {
- for (const auto & spec : entries)
- {
+ for (const auto & spec : entries) {
_recipients.push_back(mbus::Hop::parse(spec.second + _session));
}
}
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h
index 3bc229c4087..e20e34d2f77 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externpolicy.h
@@ -7,12 +7,10 @@
#include <vespa/documentapi/common.h>
#include <mutex>
-class FRT_Supervisor;
-class FNET_Transport;
-class FastOS_ThreadPool;
-
namespace documentapi {
+class MirrorAndStuff;
+
/**
* This policy implements the necessary logic to communicate with an external Vespa application and resolve its list of
* recipients using that other application's slobrok servers.
@@ -20,11 +18,8 @@ namespace documentapi {
class ExternPolicy : public mbus::IRoutingPolicy {
private:
using IMirrorAPI = slobrok::api::IMirrorAPI;
- std::mutex _lock;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- std::unique_ptr<IMirrorAPI> _mirror;
+ std::mutex _lock;
+ std::unique_ptr<MirrorAndStuff> _mirrorWithAll;
string _pattern;
string _session;
string _error;
@@ -70,7 +65,7 @@ public:
*
* @return The mirror pointer.
*/
- slobrok::api::IMirrorAPI &getMirror() { return *_mirror; }
+ const slobrok::api::IMirrorAPI *getMirror() const;
void select(mbus::RoutingContext &ctx) override;
void merge(mbus::RoutingContext &ctx) override;
};
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
index 298df88f75c..34d2b6d3369 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.cpp
@@ -1,9 +1,11 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "externslobrokpolicy.h"
+#include "mirror_with_all.h"
#include <vespa/messagebus/routing/routingcontext.h>
#include <vespa/config/common/configcontext.h>
#include <vespa/vespalib/text/stringtokenizer.h>
+#include <vespa/vespalib/util/size_literals.h>
#include <vespa/slobrok/sbmirror.h>
#include <vespa/fnet/frt/supervisor.h>
#include <vespa/fnet/transport.h>
@@ -12,15 +14,14 @@
using slobrok::api::IMirrorAPI;
using slobrok::api::MirrorAPI;
+using slobrok::ConfiguratorFactory;
namespace documentapi {
ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param)
: AsyncInitializationPolicy(param),
_firstTry(true),
- _threadPool(std::make_unique<FastOS_ThreadPool>(1024*60)),
- _transport(std::make_unique<FNET_Transport>()),
- _orb(std::make_unique<FRT_Supervisor>(_transport.get())),
+ _mirrorWithAll(),
_slobrokConfigId("client")
{
if (param.find("config") != param.end()) {
@@ -46,29 +47,24 @@ ExternSlobrokPolicy::ExternSlobrokPolicy(const std::map<string, string>& param)
}
}
-ExternSlobrokPolicy::~ExternSlobrokPolicy()
-{
- bool started = (bool)_mirror;
- _mirror.reset();
- if (started) {
- _transport->ShutDown(true);
- }
+const IMirrorAPI*
+ExternSlobrokPolicy::getMirror() const {
+ return _mirrorWithAll ? _mirrorWithAll->mirror() : nullptr;
}
+ExternSlobrokPolicy::~ExternSlobrokPolicy() = default;
+
string
ExternSlobrokPolicy::init() {
+ std::lock_guard guard(_lock);
if (_slobroks.size() != 0) {
- slobrok::ConfiguratorFactory config(_slobroks);
- _mirror = std::make_unique<MirrorAPI>(*_orb, config);
+ ConfiguratorFactory config(_slobroks);
+ _mirrorWithAll = std::make_unique<MirrorAndStuff>(config);
} else if (_configSources.size() != 0) {
- slobrok::ConfiguratorFactory config(
- config::ConfigUri(_slobrokConfigId,
- std::make_shared<config::ConfigContext>(config::ServerSpec(_configSources))));
- _mirror = std::make_unique<MirrorAPI>(*_orb, config);
- }
-
- if (_mirror.get()) {
- _transport->Start(_threadPool.get());
+ ConfiguratorFactory config(
+ config::ConfigUri(_slobrokConfigId,
+ std::make_shared<config::ConfigContext>(config::ServerSpec(_configSources))));
+ _mirrorWithAll = std::make_unique<MirrorAndStuff>(config);
}
return "";
@@ -78,7 +74,8 @@ IMirrorAPI::SpecList
ExternSlobrokPolicy::lookup(mbus::RoutingContext& context, const string& pattern) {
std::lock_guard guard(_lock);
- const IMirrorAPI& mirror(_mirror.get()? *_mirror : context.getMirror());
+ const IMirrorAPI * myMirror = getMirror();
+ const IMirrorAPI& mirror(myMirror ? *myMirror : context.getMirror());
IMirrorAPI::SpecList entries = mirror.lookup(pattern);
diff --git a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h
index aaf009e5eda..5e0332c4a40 100644
--- a/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h
+++ b/documentapi/src/vespa/documentapi/messagebus/policies/externslobrokpolicy.h
@@ -2,16 +2,12 @@
#pragma once
#include "asyncinitializationpolicy.h"
-#include <vespa/config-slobroks.h>
+#include "mirror_with_all.h"
#include <vespa/vdslib/distribution/distribution.h>
#include <vespa/slobrok/imirrorapi.h>
#include <vespa/documentapi/common.h>
#include <vespa/config/subscription/sourcespec.h>
-class FRT_Supervisor;
-class FNET_Transport;
-class FastOS_ThreadPool;
-
namespace documentapi {
/**
@@ -20,17 +16,6 @@ namespace documentapi {
*/
class ExternSlobrokPolicy : public AsyncInitializationPolicy
{
-protected:
- bool _firstTry;
- config::ServerSpec::HostSpecList _configSources;
- std::mutex _lock;
- std::unique_ptr<FastOS_ThreadPool> _threadPool;
- std::unique_ptr<FNET_Transport> _transport;
- std::unique_ptr<FRT_Supervisor> _orb;
- std::unique_ptr<slobrok::api::IMirrorAPI> _mirror;
- std::vector<std::string> _slobroks;
- string _slobrokConfigId;
-
public:
ExternSlobrokPolicy(const std::map<string, string>& params);
~ExternSlobrokPolicy() override;
@@ -39,9 +24,18 @@ public:
* @return a pointer to the slobrok mirror owned by this policy, if any.
* If the policy uses the default mirror API, NULL is returned.
*/
- const slobrok::api::IMirrorAPI* getMirror() const { return _mirror.get(); }
+ const slobrok::api::IMirrorAPI* getMirror() const;
slobrok::api::IMirrorAPI::SpecList lookup(mbus::RoutingContext &context, const string& pattern);
string init() override;
+ const config::ServerSpec::HostSpecList & configSources() const { return _configSources; }
+
+private:
+ bool _firstTry;
+ config::ServerSpec::HostSpecList _configSources;
+ std::mutex _lock;
+ std::unique_ptr<MirrorAndStuff> _mirrorWithAll;
+ std::vector<std::string> _slobroks;
+ string _slobrokConfigId;
};
}