diff options
author | Tor Egge <Tor.Egge@online.no> | 2023-05-10 10:44:47 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@online.no> | 2023-05-10 10:44:47 +0200 |
commit | ca7f494815f3149439263526d316333b06cc7720 (patch) | |
tree | 0ebc1f024a505a432b38e31e12615e582a2c9a0a | |
parent | 32cc36e3af0b5e24fdcb27ece4e5920042a8c483 (diff) |
Pass transport and file distributor connection spec to SearchEnvironment
in preparation for using RankingAssetsBuilder when handling config
in streaming search.
9 files changed, 65 insertions, 29 deletions
diff --git a/messagebus/src/vespa/messagebus/result.h b/messagebus/src/vespa/messagebus/result.h index 8be1515b21f..4b8fbe3fa26 100644 --- a/messagebus/src/vespa/messagebus/result.h +++ b/messagebus/src/vespa/messagebus/result.h @@ -3,12 +3,11 @@ #pragma once #include "error.h" +#include "message.h" #include <memory> namespace mbus { -class Message; - /** * A Result object is used as return value when trying to send a * Message on a SourceSession. It says whether messagebus has accepted diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 281d7585274..10e49ac0f8e 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -3,6 +3,7 @@ #include <vespa/searchcore/proton/server/proton.h> #include <vespa/storage/storageserver/storagenode.h> #include <vespa/metrics/metricmanager.h> +#include <vespa/searchvisitor/searchvisitor.h> #include <vespa/vespalib/util/signalhandler.h> #include <vespa/vespalib/util/programoptions.h> #include <vespa/vespalib/util/size_literals.h> @@ -100,12 +101,15 @@ using storage::spi::PersistenceProvider; #include <vespa/storageserver/app/servicelayerprocess.h> class ProtonServiceLayerProcess : public storage::ServiceLayerProcess { - proton::Proton & _proton; + proton::Proton& _proton; + FNET_Transport& _transport; + vespalib::string _file_distributor_connection_spec; metrics::MetricManager* _metricManager; public: ProtonServiceLayerProcess(const config::ConfigUri & configUri, - proton::Proton & proton); + proton::Proton & proton, FNET_Transport& transport, + const vespalib::string& file_distributor_connection_spec); ~ProtonServiceLayerProcess() override { shutdown(); } void shutdown() override; @@ -121,12 +125,16 @@ public: _metricManager = &mm; } int64_t getGeneration() const override; + void add_external_visitors() override; }; ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & configUri, - proton::Proton & proton) + proton::Proton & proton, FNET_Transport& transport, + const vespalib::string& file_distributor_connection_spec) : ServiceLayerProcess(configUri), _proton(proton), + _transport(transport), + _file_distributor_connection_spec(file_distributor_connection_spec), _metricManager(nullptr) { setMetricManager(_proton.getMetricManager()); @@ -160,6 +168,12 @@ ProtonServiceLayerProcess::getGeneration() const return std::min(slGen, protonGen); } +void +ProtonServiceLayerProcess::add_external_visitors() +{ + _externalVisitors["searchvisitor"] = std::make_shared<streaming::SearchVisitorFactory>(_configUri, _transport, _file_distributor_connection_spec); +} + namespace { class ExitOnSignal { @@ -244,11 +258,13 @@ App::startAndRun(FNET_Transport & transport, int argc, char **argv) { ExitOnSignal exit_on_signal; proton.init(configSnapshot); } + vespalib::string file_distributor_connection_spec = configSnapshot->getFiledistributorrpcConfig().connectionspec; configSnapshot.reset(); std::unique_ptr<ProtonServiceLayerProcess> spiProton; if ( ! params.serviceidentity.empty()) { - spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton); + spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton, transport, + file_distributor_connection_spec); spiProton->setupConfig(subscribeTimeout); spiProton->createNode(); EV_STARTED("servicelayer"); diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp index ab962fc78b2..69f83c6e5f0 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp @@ -7,7 +7,6 @@ #include <vespa/storage/config/config-stor-server.h> #include <vespa/storage/storageserver/servicelayernode.h> #include <vespa/storageframework/defaultimplementation/clock/realclock.h> -#include <vespa/searchvisitor/searchvisitor.h> #include <vespa/log/log.h> LOG_SETUP(".storageserver.service_layer_process"); @@ -52,7 +51,7 @@ ServiceLayerProcess::shutdown() void ServiceLayerProcess::createNode() { - _externalVisitors["searchvisitor"] = std::make_shared<streaming::SearchVisitorFactory>(_configUri); + add_external_visitors(); setupProvider(); _node = std::make_unique<ServiceLayerNode>(_configUri, _context, *this, getProvider(), _externalVisitors); if (_storage_chain_builder) { @@ -82,4 +81,9 @@ ServiceLayerProcess::set_storage_chain_builder(std::unique_ptr<IStorageChainBuil _storage_chain_builder = std::move(builder); } +void +ServiceLayerProcess::add_external_visitors() +{ +} + } // storage diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h index f62db4c2fcf..1df7b173890 100644 --- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h +++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h @@ -30,7 +30,9 @@ class ServiceLayerNode; class IStorageChainBuilder; class ServiceLayerProcess : public Process { +protected: VisitorFactory::Map _externalVisitors; +private: std::unique_ptr<ServiceLayerNode> _node; std::unique_ptr<IStorageChainBuilder> _storage_chain_builder; @@ -51,6 +53,7 @@ public: StorageNodeContext& getContext() override; std::string getComponentName() const override; void set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder); + virtual void add_external_visitors(); }; } // storage diff --git a/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp b/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp index 43309e9fe70..9b99df32d03 100644 --- a/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp +++ b/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp @@ -4,6 +4,7 @@ #include <vespa/document/datatype/documenttype.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/fnet/transport.h> #include <vespa/persistence/spi/docentry.h> #include <vespa/searchlib/query/tree/querybuilder.h> #include <vespa/searchlib/query/tree/simplequery.h> @@ -125,6 +126,7 @@ public: framework::defaultimplementation::FakeClock _clock; StorageComponentRegisterImpl _componentRegister; std::unique_ptr<StorageComponent> _component; + FNET_Transport _transport; SearchEnvironment _env; SearchVisitorFactory _factory; std::shared_ptr<DocumentTypeRepo> _repo; @@ -159,8 +161,9 @@ public: SearchVisitorTest::SearchVisitorTest() : _componentRegister(), - _env(::config::ConfigUri("dir:cfg")), - _factory(::config::ConfigUri("dir:cfg")), + _transport(), + _env(::config::ConfigUri("dir:cfg"), _transport, ""), + _factory(::config::ConfigUri("dir:cfg"), _transport, ""), _repo(std::make_shared<DocumentTypeRepo>(readDocumenttypesConfig("cfg/documenttypes.cfg"))), _doc_type(_repo->getDocumentType("test")) { diff --git a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp index 5f4db0fe2d9..9f33c9b2ea1 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp +++ b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp @@ -15,14 +15,16 @@ namespace streaming { __thread SearchEnvironment::EnvMap * SearchEnvironment::_localEnvMap = nullptr; -SearchEnvironment::Env::Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf) +SearchEnvironment::Env::Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec) : _configId(configUri.getConfigId()), _configurer(std::make_unique<config::SimpleConfigRetriever>(createKeySet(configUri.getConfigId()), configUri.getContext()), this), _vsmAdapter(std::make_unique<VSMAdapter>(_configId, wf)), _rankManager(std::make_unique<RankManager>(_vsmAdapter.get())), - _snapshot() + _snapshot(), + _lock(), + _transport(transport), + _file_distributor_connection_spec(file_distributor_connection_spec) { - _configurer.start(); } @@ -61,10 +63,12 @@ SearchEnvironment::Env::~Env() _configurer.close(); } -SearchEnvironment::SearchEnvironment(const config::ConfigUri & configUri) : - VisitorEnvironment(), - _envMap(), - _configUri(configUri) +SearchEnvironment::SearchEnvironment(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec) + : VisitorEnvironment(), + _envMap(), + _configUri(configUri), + _transport(transport), + _file_distributor_connection_spec(file_distributor_connection_spec) { } @@ -91,7 +95,7 @@ SearchEnvironment::getEnv(const vespalib::string & searchCluster) EnvMap::iterator found = _envMap.find(searchCluster); if (found == _envMap.end()) { LOG(debug, "Init VSMAdapter with config id = '%s'", searchCluster.c_str()); - Env::SP env = std::make_shared<Env>(searchClusterUri, _wordFolder); + Env::SP env = std::make_shared<Env>(searchClusterUri, _wordFolder, _transport, _file_distributor_connection_spec); _envMap[searchCluster] = std::move(env); found = _envMap.find(searchCluster); } diff --git a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h index 2fdd11d6a77..7b9e878217b 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h +++ b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h @@ -11,6 +11,8 @@ #include <vespa/fastlib/text/normwordfolder.h> #include <mutex> +class FNET_Transport; + namespace streaming { class SearchEnvironmentSnapshot; @@ -21,19 +23,21 @@ private: class Env : public config::SimpleConfigurable { public: using SP = std::shared_ptr<Env>; - Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf); + Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec); ~Env() override; void configure(const config::ConfigSnapshot & snapshot) override; static config::ConfigKeySet createKeySet(const vespalib::string & configId); std::shared_ptr<const SearchEnvironmentSnapshot> get_snapshot(); private: - const vespalib::string _configId; - config::SimpleConfigurer _configurer; - std::unique_ptr<vsm::VSMAdapter> _vsmAdapter; - std::unique_ptr<RankManager> _rankManager; + const vespalib::string _configId; + config::SimpleConfigurer _configurer; + std::unique_ptr<vsm::VSMAdapter> _vsmAdapter; + std::unique_ptr<RankManager> _rankManager; std::shared_ptr<const SearchEnvironmentSnapshot> _snapshot; - std::mutex _lock; + std::mutex _lock; + FNET_Transport& _transport; + const vespalib::string _file_distributor_connection_spec; }; using EnvMap = vespalib::hash_map<vespalib::string, Env::SP>; using EnvMapUP = std::unique_ptr<EnvMap>; @@ -45,11 +49,13 @@ private: std::mutex _lock; Fast_NormalizeWordFolder _wordFolder; config::ConfigUri _configUri; + FNET_Transport& _transport; + vespalib::string _file_distributor_connection_spec; Env & getEnv(const vespalib::string & searchcluster); public: - SearchEnvironment(const config::ConfigUri & configUri); + SearchEnvironment(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec); ~SearchEnvironment(); std::shared_ptr<const SearchEnvironmentSnapshot> get_snapshot(const vespalib::string& search_cluster); // Should only be used by unit tests to simulate that the calling thread is finished. diff --git a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp index 94bc3da7df9..f9397c7b63f 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp +++ b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp @@ -459,11 +459,12 @@ void SearchVisitor::init(const Parameters & params) VISITOR_TRACE(6, "Completed lazy VSM adapter initialization"); } -SearchVisitorFactory::SearchVisitorFactory(const config::ConfigUri & configUri) +SearchVisitorFactory::SearchVisitorFactory(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec) : VisitorFactory(), _configUri(configUri), - _env(std::make_shared<SearchEnvironment>(_configUri)) -{} + _env(std::make_shared<SearchEnvironment>(_configUri, transport, file_distributor_connection_spec)) +{ +} SearchVisitorFactory::~SearchVisitorFactory() = default; diff --git a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h index daeb4013ebd..e4fa756e28d 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h +++ b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h @@ -496,7 +496,7 @@ class SearchVisitorFactory : public storage::VisitorFactory { storage::Visitor* makeVisitor(storage::StorageComponent&, storage::VisitorEnvironment&env, const vdslib::Parameters& params) override; public: - explicit SearchVisitorFactory(const config::ConfigUri & configUri); + explicit SearchVisitorFactory(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec); ~SearchVisitorFactory() override; }; |