summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@online.no>2023-05-10 10:44:47 +0200
committerTor Egge <Tor.Egge@online.no>2023-05-10 10:44:47 +0200
commitca7f494815f3149439263526d316333b06cc7720 (patch)
tree0ebc1f024a505a432b38e31e12615e582a2c9a0a
parent32cc36e3af0b5e24fdcb27ece4e5920042a8c483 (diff)
Pass transport and file distributor connection spec to SearchEnvironment
in preparation for using RankingAssetsBuilder when handling config in streaming search.
-rw-r--r--messagebus/src/vespa/messagebus/result.h3
-rw-r--r--searchcore/src/apps/proton/proton.cpp24
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp8
-rw-r--r--storageserver/src/vespa/storageserver/app/servicelayerprocess.h3
-rw-r--r--streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp7
-rw-r--r--streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp20
-rw-r--r--streamingvisitors/src/vespa/searchvisitor/searchenvironment.h20
-rw-r--r--streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp7
-rw-r--r--streamingvisitors/src/vespa/searchvisitor/searchvisitor.h2
9 files changed, 65 insertions, 29 deletions
diff --git a/messagebus/src/vespa/messagebus/result.h b/messagebus/src/vespa/messagebus/result.h
index 8be1515b21f..4b8fbe3fa26 100644
--- a/messagebus/src/vespa/messagebus/result.h
+++ b/messagebus/src/vespa/messagebus/result.h
@@ -3,12 +3,11 @@
#pragma once
#include "error.h"
+#include "message.h"
#include <memory>
namespace mbus {
-class Message;
-
/**
* A Result object is used as return value when trying to send a
* Message on a SourceSession. It says whether messagebus has accepted
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp
index 281d7585274..10e49ac0f8e 100644
--- a/searchcore/src/apps/proton/proton.cpp
+++ b/searchcore/src/apps/proton/proton.cpp
@@ -3,6 +3,7 @@
#include <vespa/searchcore/proton/server/proton.h>
#include <vespa/storage/storageserver/storagenode.h>
#include <vespa/metrics/metricmanager.h>
+#include <vespa/searchvisitor/searchvisitor.h>
#include <vespa/vespalib/util/signalhandler.h>
#include <vespa/vespalib/util/programoptions.h>
#include <vespa/vespalib/util/size_literals.h>
@@ -100,12 +101,15 @@ using storage::spi::PersistenceProvider;
#include <vespa/storageserver/app/servicelayerprocess.h>
class ProtonServiceLayerProcess : public storage::ServiceLayerProcess {
- proton::Proton & _proton;
+ proton::Proton& _proton;
+ FNET_Transport& _transport;
+ vespalib::string _file_distributor_connection_spec;
metrics::MetricManager* _metricManager;
public:
ProtonServiceLayerProcess(const config::ConfigUri & configUri,
- proton::Proton & proton);
+ proton::Proton & proton, FNET_Transport& transport,
+ const vespalib::string& file_distributor_connection_spec);
~ProtonServiceLayerProcess() override { shutdown(); }
void shutdown() override;
@@ -121,12 +125,16 @@ public:
_metricManager = &mm;
}
int64_t getGeneration() const override;
+ void add_external_visitors() override;
};
ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & configUri,
- proton::Proton & proton)
+ proton::Proton & proton, FNET_Transport& transport,
+ const vespalib::string& file_distributor_connection_spec)
: ServiceLayerProcess(configUri),
_proton(proton),
+ _transport(transport),
+ _file_distributor_connection_spec(file_distributor_connection_spec),
_metricManager(nullptr)
{
setMetricManager(_proton.getMetricManager());
@@ -160,6 +168,12 @@ ProtonServiceLayerProcess::getGeneration() const
return std::min(slGen, protonGen);
}
+void
+ProtonServiceLayerProcess::add_external_visitors()
+{
+ _externalVisitors["searchvisitor"] = std::make_shared<streaming::SearchVisitorFactory>(_configUri, _transport, _file_distributor_connection_spec);
+}
+
namespace {
class ExitOnSignal {
@@ -244,11 +258,13 @@ App::startAndRun(FNET_Transport & transport, int argc, char **argv) {
ExitOnSignal exit_on_signal;
proton.init(configSnapshot);
}
+ vespalib::string file_distributor_connection_spec = configSnapshot->getFiledistributorrpcConfig().connectionspec;
configSnapshot.reset();
std::unique_ptr<ProtonServiceLayerProcess> spiProton;
if ( ! params.serviceidentity.empty()) {
- spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton);
+ spiProton = std::make_unique<ProtonServiceLayerProcess>(identityUri.createWithNewId(params.serviceidentity), proton, transport,
+ file_distributor_connection_spec);
spiProton->setupConfig(subscribeTimeout);
spiProton->createNode();
EV_STARTED("servicelayer");
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
index ab962fc78b2..69f83c6e5f0 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.cpp
@@ -7,7 +7,6 @@
#include <vespa/storage/config/config-stor-server.h>
#include <vespa/storage/storageserver/servicelayernode.h>
#include <vespa/storageframework/defaultimplementation/clock/realclock.h>
-#include <vespa/searchvisitor/searchvisitor.h>
#include <vespa/log/log.h>
LOG_SETUP(".storageserver.service_layer_process");
@@ -52,7 +51,7 @@ ServiceLayerProcess::shutdown()
void
ServiceLayerProcess::createNode()
{
- _externalVisitors["searchvisitor"] = std::make_shared<streaming::SearchVisitorFactory>(_configUri);
+ add_external_visitors();
setupProvider();
_node = std::make_unique<ServiceLayerNode>(_configUri, _context, *this, getProvider(), _externalVisitors);
if (_storage_chain_builder) {
@@ -82,4 +81,9 @@ ServiceLayerProcess::set_storage_chain_builder(std::unique_ptr<IStorageChainBuil
_storage_chain_builder = std::move(builder);
}
+void
+ServiceLayerProcess::add_external_visitors()
+{
+}
+
} // storage
diff --git a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
index f62db4c2fcf..1df7b173890 100644
--- a/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
+++ b/storageserver/src/vespa/storageserver/app/servicelayerprocess.h
@@ -30,7 +30,9 @@ class ServiceLayerNode;
class IStorageChainBuilder;
class ServiceLayerProcess : public Process {
+protected:
VisitorFactory::Map _externalVisitors;
+private:
std::unique_ptr<ServiceLayerNode> _node;
std::unique_ptr<IStorageChainBuilder> _storage_chain_builder;
@@ -51,6 +53,7 @@ public:
StorageNodeContext& getContext() override;
std::string getComponentName() const override;
void set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder);
+ virtual void add_external_visitors();
};
} // storage
diff --git a/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp b/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp
index 43309e9fe70..9b99df32d03 100644
--- a/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp
+++ b/streamingvisitors/src/tests/searchvisitor/searchvisitor_test.cpp
@@ -4,6 +4,7 @@
#include <vespa/document/datatype/documenttype.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/fnet/transport.h>
#include <vespa/persistence/spi/docentry.h>
#include <vespa/searchlib/query/tree/querybuilder.h>
#include <vespa/searchlib/query/tree/simplequery.h>
@@ -125,6 +126,7 @@ public:
framework::defaultimplementation::FakeClock _clock;
StorageComponentRegisterImpl _componentRegister;
std::unique_ptr<StorageComponent> _component;
+ FNET_Transport _transport;
SearchEnvironment _env;
SearchVisitorFactory _factory;
std::shared_ptr<DocumentTypeRepo> _repo;
@@ -159,8 +161,9 @@ public:
SearchVisitorTest::SearchVisitorTest() :
_componentRegister(),
- _env(::config::ConfigUri("dir:cfg")),
- _factory(::config::ConfigUri("dir:cfg")),
+ _transport(),
+ _env(::config::ConfigUri("dir:cfg"), _transport, ""),
+ _factory(::config::ConfigUri("dir:cfg"), _transport, ""),
_repo(std::make_shared<DocumentTypeRepo>(readDocumenttypesConfig("cfg/documenttypes.cfg"))),
_doc_type(_repo->getDocumentType("test"))
{
diff --git a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp
index 5f4db0fe2d9..9f33c9b2ea1 100644
--- a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp
+++ b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp
@@ -15,14 +15,16 @@ namespace streaming {
__thread SearchEnvironment::EnvMap * SearchEnvironment::_localEnvMap = nullptr;
-SearchEnvironment::Env::Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf)
+SearchEnvironment::Env::Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec)
: _configId(configUri.getConfigId()),
_configurer(std::make_unique<config::SimpleConfigRetriever>(createKeySet(configUri.getConfigId()), configUri.getContext()), this),
_vsmAdapter(std::make_unique<VSMAdapter>(_configId, wf)),
_rankManager(std::make_unique<RankManager>(_vsmAdapter.get())),
- _snapshot()
+ _snapshot(),
+ _lock(),
+ _transport(transport),
+ _file_distributor_connection_spec(file_distributor_connection_spec)
{
-
_configurer.start();
}
@@ -61,10 +63,12 @@ SearchEnvironment::Env::~Env()
_configurer.close();
}
-SearchEnvironment::SearchEnvironment(const config::ConfigUri & configUri) :
- VisitorEnvironment(),
- _envMap(),
- _configUri(configUri)
+SearchEnvironment::SearchEnvironment(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec)
+ : VisitorEnvironment(),
+ _envMap(),
+ _configUri(configUri),
+ _transport(transport),
+ _file_distributor_connection_spec(file_distributor_connection_spec)
{
}
@@ -91,7 +95,7 @@ SearchEnvironment::getEnv(const vespalib::string & searchCluster)
EnvMap::iterator found = _envMap.find(searchCluster);
if (found == _envMap.end()) {
LOG(debug, "Init VSMAdapter with config id = '%s'", searchCluster.c_str());
- Env::SP env = std::make_shared<Env>(searchClusterUri, _wordFolder);
+ Env::SP env = std::make_shared<Env>(searchClusterUri, _wordFolder, _transport, _file_distributor_connection_spec);
_envMap[searchCluster] = std::move(env);
found = _envMap.find(searchCluster);
}
diff --git a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h
index 2fdd11d6a77..7b9e878217b 100644
--- a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h
+++ b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h
@@ -11,6 +11,8 @@
#include <vespa/fastlib/text/normwordfolder.h>
#include <mutex>
+class FNET_Transport;
+
namespace streaming {
class SearchEnvironmentSnapshot;
@@ -21,19 +23,21 @@ private:
class Env : public config::SimpleConfigurable {
public:
using SP = std::shared_ptr<Env>;
- Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf);
+ Env(const config::ConfigUri& configUri, const Fast_NormalizeWordFolder& wf, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec);
~Env() override;
void configure(const config::ConfigSnapshot & snapshot) override;
static config::ConfigKeySet createKeySet(const vespalib::string & configId);
std::shared_ptr<const SearchEnvironmentSnapshot> get_snapshot();
private:
- const vespalib::string _configId;
- config::SimpleConfigurer _configurer;
- std::unique_ptr<vsm::VSMAdapter> _vsmAdapter;
- std::unique_ptr<RankManager> _rankManager;
+ const vespalib::string _configId;
+ config::SimpleConfigurer _configurer;
+ std::unique_ptr<vsm::VSMAdapter> _vsmAdapter;
+ std::unique_ptr<RankManager> _rankManager;
std::shared_ptr<const SearchEnvironmentSnapshot> _snapshot;
- std::mutex _lock;
+ std::mutex _lock;
+ FNET_Transport& _transport;
+ const vespalib::string _file_distributor_connection_spec;
};
using EnvMap = vespalib::hash_map<vespalib::string, Env::SP>;
using EnvMapUP = std::unique_ptr<EnvMap>;
@@ -45,11 +49,13 @@ private:
std::mutex _lock;
Fast_NormalizeWordFolder _wordFolder;
config::ConfigUri _configUri;
+ FNET_Transport& _transport;
+ vespalib::string _file_distributor_connection_spec;
Env & getEnv(const vespalib::string & searchcluster);
public:
- SearchEnvironment(const config::ConfigUri & configUri);
+ SearchEnvironment(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec);
~SearchEnvironment();
std::shared_ptr<const SearchEnvironmentSnapshot> get_snapshot(const vespalib::string& search_cluster);
// Should only be used by unit tests to simulate that the calling thread is finished.
diff --git a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp
index 94bc3da7df9..f9397c7b63f 100644
--- a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp
+++ b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp
@@ -459,11 +459,12 @@ void SearchVisitor::init(const Parameters & params)
VISITOR_TRACE(6, "Completed lazy VSM adapter initialization");
}
-SearchVisitorFactory::SearchVisitorFactory(const config::ConfigUri & configUri)
+SearchVisitorFactory::SearchVisitorFactory(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec)
: VisitorFactory(),
_configUri(configUri),
- _env(std::make_shared<SearchEnvironment>(_configUri))
-{}
+ _env(std::make_shared<SearchEnvironment>(_configUri, transport, file_distributor_connection_spec))
+{
+}
SearchVisitorFactory::~SearchVisitorFactory() = default;
diff --git a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h
index daeb4013ebd..e4fa756e28d 100644
--- a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h
+++ b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h
@@ -496,7 +496,7 @@ class SearchVisitorFactory : public storage::VisitorFactory {
storage::Visitor* makeVisitor(storage::StorageComponent&, storage::VisitorEnvironment&env,
const vdslib::Parameters& params) override;
public:
- explicit SearchVisitorFactory(const config::ConfigUri & configUri);
+ explicit SearchVisitorFactory(const config::ConfigUri & configUri, FNET_Transport& transport, const vespalib::string& file_distributor_connection_spec);
~SearchVisitorFactory() override;
};