diff options
7 files changed, 54 insertions, 7 deletions
diff --git a/searchcore/src/apps/proton/proton.cpp b/searchcore/src/apps/proton/proton.cpp index 4c20c40b406..e967c012bbe 100644 --- a/searchcore/src/apps/proton/proton.cpp +++ b/searchcore/src/apps/proton/proton.cpp @@ -105,6 +105,7 @@ class ProtonServiceLayerProcess : public storage::ServiceLayerProcess { FNET_Transport& _transport; vespalib::string _file_distributor_connection_spec; metrics::MetricManager* _metricManager; + std::weak_ptr<streaming::SearchVisitorFactory> _search_visitor_factory; public: ProtonServiceLayerProcess(const config::ConfigUri & configUri, @@ -137,7 +138,8 @@ ProtonServiceLayerProcess::ProtonServiceLayerProcess(const config::ConfigUri & c _proton(proton), _transport(transport), _file_distributor_connection_spec(file_distributor_connection_spec), - _metricManager(nullptr) + _metricManager(nullptr), + _search_visitor_factory() { setMetricManager(_proton.getMetricManager()); } @@ -167,13 +169,23 @@ ProtonServiceLayerProcess::getGeneration() const { int64_t slGen = storage::ServiceLayerProcess::getGeneration(); int64_t protonGen = _proton.getConfigGeneration(); - return std::min(slGen, protonGen); + int64_t gen = std::min(slGen, protonGen); + auto factory = _search_visitor_factory.lock(); + if (factory) { + auto factory_gen = factory->get_oldest_config_generation(); + if (factory_gen.has_value()) { + gen = std::min(gen, factory_gen.value()); + } + } + return gen; } void ProtonServiceLayerProcess::add_external_visitors() { - _externalVisitors["searchvisitor"] = std::make_shared<streaming::SearchVisitorFactory>(_configUri, &_transport, _file_distributor_connection_spec); + auto factory = std::make_shared<streaming::SearchVisitorFactory>(_configUri, &_transport, _file_distributor_connection_spec); + _search_visitor_factory = factory; + _externalVisitors["searchvisitor"] = factory; } namespace { diff --git a/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.cpp b/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.cpp index 06cd6cc67ba..2e162b4e899 100644 --- a/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.cpp +++ b/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.cpp @@ -4,10 +4,11 @@ namespace streaming { -SearchEnvironmentSnapshot::SearchEnvironmentSnapshot(const RankManager& rank_manager, const vsm::VSMAdapter& vsm_adapter) +SearchEnvironmentSnapshot::SearchEnvironmentSnapshot(const RankManager& rank_manager, const vsm::VSMAdapter& vsm_adapter, int64_t generation) : _rank_manager_snapshot(rank_manager.getSnapshot()), _vsm_fields_cfg(vsm_adapter.getFieldsConfig()), - _docsum_tools(vsm_adapter.getDocsumTools()) + _docsum_tools(vsm_adapter.getDocsumTools()), + _generation(generation) { } diff --git a/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.h b/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.h index 56223686b3c..167a0df6671 100644 --- a/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.h +++ b/streamingvisitors/src/vespa/searchvisitor/search_environment_snapshot.h @@ -16,13 +16,15 @@ class SearchEnvironmentSnapshot std::shared_ptr<const RankManager::Snapshot> _rank_manager_snapshot; std::shared_ptr<VsmfieldsConfig> _vsm_fields_cfg; std::shared_ptr<const vsm::DocsumTools> _docsum_tools; + int64_t _generation; public: - SearchEnvironmentSnapshot(const RankManager& rank_manager, const vsm::VSMAdapter& vsm_adapter); + SearchEnvironmentSnapshot(const RankManager& rank_manager, const vsm::VSMAdapter& vsm_adapter, int64_t generation); ~SearchEnvironmentSnapshot(); const std::shared_ptr<const RankManager::Snapshot>& get_rank_manager_snapshot() const noexcept { return _rank_manager_snapshot; } const std::shared_ptr<VsmfieldsConfig>& get_vsm_fields_config() const noexcept { return _vsm_fields_cfg; } const std::shared_ptr<const vsm::DocsumTools>& get_docsum_tools() const noexcept { return _docsum_tools; } + int64_t get_generation() const noexcept { return _generation; } }; } diff --git a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp index 75e7f523d7f..e051f9206e1 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp +++ b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.cpp @@ -76,7 +76,7 @@ SearchEnvironment::Env::configure(const config::ConfigSnapshot & snapshot) _generation = snapshot.getGeneration(); _vsmAdapter->configure(snap); _rankManager->configure(snap, _ranking_assets_repo); - auto se_snapshot = std::make_shared<const SearchEnvironmentSnapshot>(*_rankManager, *_vsmAdapter); + auto se_snapshot = std::make_shared<const SearchEnvironmentSnapshot>(*_rankManager, *_vsmAdapter, _generation); std::lock_guard guard(_lock); std::swap(se_snapshot, _snapshot); } @@ -161,4 +161,27 @@ SearchEnvironment::get_snapshot(const vespalib::string& search_cluster) return getEnv(search_cluster).get_snapshot(); } +std::optional<int64_t> +SearchEnvironment::get_oldest_config_generation() +{ + std::optional<int64_t> oldest; + std::vector<std::shared_ptr<Env>> envs; + { + std::lock_guard guard(_lock); + for (auto& env : _envMap) { + envs.emplace_back(env.second); + } + } + for (auto& env : envs) { + auto snapshot = env->get_snapshot(); + if (snapshot) { + auto gen = snapshot->get_generation(); + if (!oldest.has_value() || oldest.value() > gen) { + oldest = gen; + } + } + } + return oldest; +} + } diff --git a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h index 1699409d154..9ea4867272e 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h +++ b/streamingvisitors/src/vespa/searchvisitor/searchenvironment.h @@ -81,6 +81,7 @@ public: SearchEnvironment(const config::ConfigUri & configUri, FNET_Transport* transport, const vespalib::string& file_distributor_connection_spec); ~SearchEnvironment(); std::shared_ptr<const SearchEnvironmentSnapshot> get_snapshot(const vespalib::string& search_cluster); + std::optional<int64_t> get_oldest_config_generation(); // Should only be used by unit tests to simulate that the calling thread is finished. void clear_thread_local_env_map(); }; diff --git a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp index a1e8fddc3bf..8e3a206a3a1 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp +++ b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.cpp @@ -556,6 +556,13 @@ SearchVisitorFactory::makeVisitor(StorageComponent& component, return new SearchVisitor(component, env, params); } +std::optional<int64_t> +SearchVisitorFactory::get_oldest_config_generation() const +{ + auto& env = dynamic_cast<SearchEnvironment&>(*_env); + return env.get_oldest_config_generation(); +} + void SearchVisitor::AttributeInserter::onPrimitive(uint32_t, const Content & c) { diff --git a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h index 98d0747baec..24816b74d57 100644 --- a/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h +++ b/streamingvisitors/src/vespa/searchvisitor/searchvisitor.h @@ -502,6 +502,7 @@ class SearchVisitorFactory : public storage::VisitorFactory { public: explicit SearchVisitorFactory(const config::ConfigUri & configUri, FNET_Transport* transport, const vespalib::string& file_distributor_connection_spec); ~SearchVisitorFactory() override; + std::optional<int64_t> get_oldest_config_generation() const; }; } |