diff options
9 files changed, 741 insertions, 0 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt index d9aeeada520..273ae940770 100644 --- a/searchcore/CMakeLists.txt +++ b/searchcore/CMakeLists.txt @@ -120,6 +120,7 @@ vespa_define_module( src/tests/proton/persistenceconformance src/tests/proton/persistenceengine src/tests/proton/proton + src/tests/proton/proton_configurer src/tests/proton/reference/gid_to_lid_change_handler src/tests/proton/reference/gid_to_lid_change_listener src/tests/proton/reference/gid_to_lid_change_registrator diff --git a/searchcore/src/tests/proton/proton_configurer/CMakeLists.txt b/searchcore/src/tests/proton/proton_configurer/CMakeLists.txt new file mode 100644 index 00000000000..3aea88986f5 --- /dev/null +++ b/searchcore/src/tests/proton/proton_configurer/CMakeLists.txt @@ -0,0 +1,9 @@ +# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +vespa_add_executable(searchcore_proton_configurer_test_app TEST + SOURCES + proton_configurer_test.cpp + DEPENDS + searchcore_server + searchcore_fconfig +) +vespa_add_test(NAME searchcore_proton_configurer_test_app COMMAND searchcore_proton_configurer_test_app) diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp new file mode 100644 index 00000000000..2d58d94947f --- /dev/null +++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp @@ -0,0 +1,411 @@ +// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include <map> +#include <vespa/config-attributes.h> +#include <vespa/config-imported-fields.h> +#include <vespa/config-indexschema.h> +#include <vespa/config-rank-profiles.h> +#include <vespa/config-summary.h> +#include <vespa/config-summarymap.h> +#include <vespa/fileacquirer/config-filedistributorrpc.h> +#include <vespa/searchcore/proton/server/bootstrapconfig.h> +#include <vespa/searchcore/proton/server/bootstrapconfigmanager.h> +#include <vespa/searchcore/proton/server/documentdbconfigmanager.h> +#include <vespa/searchcore/proton/server/proton_config_fetcher.h> +#include <vespa/searchcore/proton/server/proton_config_snapshot.h> +#include <vespa/searchcore/proton/server/proton_configurer.h> +#include <vespa/searchcore/proton/server/i_proton_configurer_owner.h> +#include <vespa/searchsummary/config/config-juniperrc.h> +#include <vespa/vespalib/testkit/testapp.h> +#include <vespa/searchcommon/common/schemaconfigurer.h> +#include <vespa/vespalib/util/threadstackexecutor.h> +#include <vespa/vespalib/test/insertion_operators.h> + +using namespace config; +using namespace proton; +using namespace vespa::config::search::core; +using namespace vespa::config::search::summary; +using namespace vespa::config::search; +using namespace cloud::config::filedistribution; + +using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; +using config::ConfigUri; +using document::DocumentTypeRepo; +using document::DocumenttypesConfig; +using document::DocumenttypesConfigBuilder; +using search::TuneFileDocumentDB; +using std::map; +using search::index::Schema; +using search::index::SchemaBuilder; +using proton::matching::RankingConstants; + +struct DBConfigFixture { + using UP = std::unique_ptr<DBConfigFixture>; + AttributesConfigBuilder attributesBuilder; + RankProfilesConfigBuilder rankProfilesBuilder; + RankingConstantsConfigBuilder rankingConstantsBuilder; + IndexschemaConfigBuilder indexschemaBuilder; + SummaryConfigBuilder summaryBuilder; + SummarymapConfigBuilder summarymapBuilder; + JuniperrcConfigBuilder juniperrcBuilder; + ImportedFieldsConfigBuilder importedFieldsBuilder; + + Schema::SP buildSchema() + { + Schema::SP schema(std::make_shared<Schema>()); + SchemaBuilder::build(attributesBuilder, *schema); + SchemaBuilder::build(summaryBuilder, *schema); + SchemaBuilder::build(indexschemaBuilder, *schema); + SchemaBuilder::build(importedFieldsBuilder, *schema); + return schema; + } + + RankingConstants::SP buildRankingConstants() + { + return std::make_shared<RankingConstants>(); + } + + DocumentDBConfig::SP getConfig(int64_t generation, + std::shared_ptr<DocumenttypesConfig> documentTypes, + DocumentTypeRepo::SP repo, + const vespalib::string &configId, + const vespalib::string &docTypeName) + { + return std::make_shared<DocumentDBConfig> + (generation, + std::make_shared<RankProfilesConfig>(rankProfilesBuilder), + buildRankingConstants(), + std::make_shared<IndexschemaConfig>(indexschemaBuilder), + std::make_shared<AttributesConfig>(attributesBuilder), + std::make_shared<SummaryConfig>(summaryBuilder), + std::make_shared<SummarymapConfig>(summarymapBuilder), + std::make_shared<JuniperrcConfig>(juniperrcBuilder), + documentTypes, + repo, + std::make_shared<ImportedFieldsConfig>(importedFieldsBuilder), + std::make_shared<TuneFileDocumentDB>(), + buildSchema(), + std::make_shared<DocumentDBMaintenanceConfig>(), + configId, + docTypeName, + config::ConfigSnapshot()); + } +}; + +struct ConfigFixture { + const std::string configId; + ProtonConfigBuilder protonBuilder; + DocumenttypesConfigBuilder documenttypesBuilder; + FiledistributorrpcConfigBuilder filedistBuilder; + map<std::string, DBConfigFixture::UP> dbConfig; + int idcounter; + int64_t _generation; + std::shared_ptr<ProtonConfigSnapshot> _cachedConfigSnapshot; + + ConfigFixture(const std::string & id) + : configId(id), + protonBuilder(), + documenttypesBuilder(), + filedistBuilder(), + dbConfig(), + idcounter(-1), + _generation(1), + _cachedConfigSnapshot() + { + addDocType("_alwaysthere_"); + } + + ~ConfigFixture() { } + + DBConfigFixture *addDocType(const std::string & name) { + DocumenttypesConfigBuilder::Documenttype dt; + dt.bodystruct = -1270491200; + dt.headerstruct = 306916075; + dt.id = idcounter--; + dt.name = name; + dt.version = 0; + documenttypesBuilder.documenttype.push_back(dt); + + ProtonConfigBuilder::Documentdb db; + db.inputdoctypename = name; + db.configid = configId + "/" + name; + protonBuilder.documentdb.push_back(db); + + DBConfigFixture::UP fixture = std::make_unique<DBConfigFixture>(); + return dbConfig.emplace(std::make_pair(name, std::move(fixture))).first->second.get(); + } + + void removeDocType(const std::string & name) + { + for (DocumenttypesConfigBuilder::DocumenttypeVector::iterator it(documenttypesBuilder.documenttype.begin()), + mt(documenttypesBuilder.documenttype.end()); + it != mt; + it++) { + if ((*it).name.compare(name) == 0) { + documenttypesBuilder.documenttype.erase(it); + break; + } + } + + for (ProtonConfigBuilder::DocumentdbVector::iterator it(protonBuilder.documentdb.begin()), + mt(protonBuilder.documentdb.end()); + it != mt; + it++) { + if ((*it).inputdoctypename.compare(name) == 0) { + protonBuilder.documentdb.erase(it); + break; + } + } + } + + BootstrapConfig::SP getBootstrapConfig(int64_t generation) const { + return BootstrapConfig::SP(new BootstrapConfig(generation, + BootstrapConfig::DocumenttypesConfigSP(new DocumenttypesConfig(documenttypesBuilder)), + DocumentTypeRepo::SP(new DocumentTypeRepo(documenttypesBuilder)), + BootstrapConfig::ProtonConfigSP(new ProtonConfig(protonBuilder)), + std::make_shared<FiledistributorrpcConfig>(), + std::make_shared<TuneFileDocumentDB>())); + } + + std::shared_ptr<ProtonConfigSnapshot> getConfigSnapshot() + { + if (_cachedConfigSnapshot) { + return _cachedConfigSnapshot; + } + int64_t generation = ++_generation; + auto bootstrap = getBootstrapConfig(generation); + std::map<DocTypeName, DocumentDBConfig::SP> dbconfigs; + auto doctypes = bootstrap->getDocumenttypesConfigSP(); + auto repo = bootstrap->getDocumentTypeRepoSP(); + for (auto &db : dbConfig) { + DocTypeName name(db.first); + dbconfigs.insert(std::make_pair(name, + db.second->getConfig(generation, + doctypes, + repo, + configId + "/" + db.first, + db.first))); + } + _cachedConfigSnapshot = std::make_shared<ProtonConfigSnapshot>(bootstrap, dbconfigs); + return _cachedConfigSnapshot; + } + void newConfig() { _cachedConfigSnapshot.reset(); } + +}; + +struct MyProtonConfigurerOwner; + +struct MyDocumentDBConfigOwner : public IDocumentDBConfigOwner +{ + vespalib::string _name; + MyProtonConfigurerOwner &_owner; + MyDocumentDBConfigOwner(const vespalib::string &name, + MyProtonConfigurerOwner &owner) + : IDocumentDBConfigOwner(), + _name(name), + _owner(owner) + { + } + virtual ~MyDocumentDBConfigOwner() { } + + virtual void reconfigure(const DocumentDBConfig::SP & config) override; +}; + +struct MyProtonConfigurerOwner : public IProtonConfigurerOwner +{ + using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; + vespalib::ThreadStackExecutor _executor; + std::map<DocTypeName, std::shared_ptr<MyDocumentDBConfigOwner>> _dbs; + std::vector<vespalib::string> _log; + + MyProtonConfigurerOwner() + : IProtonConfigurerOwner(), + _executor(1, 128 * 1024), + _dbs(), + _log() + { + } + virtual ~MyProtonConfigurerOwner() { } + + virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName &docTypeName, + const vespalib::string &configId, + const std::shared_ptr<BootstrapConfig> &bootstrapConfig, + const std::shared_ptr<DocumentDBConfig> &documentDBConfig, + InitializeThreads initializeThreads) override + { + (void) configId; + (void) bootstrapConfig; + (void) initializeThreads; + assert(_dbs.find(docTypeName) == _dbs.end()); + auto db = std::make_shared<MyDocumentDBConfigOwner>(docTypeName.getName(), *this); + _dbs.insert(std::make_pair(docTypeName, db)); + std::ostringstream os; + os << "add db " << docTypeName.getName() << " " << documentDBConfig->getGeneration(); + _log.push_back(os.str()); + return db.get(); + } + virtual void removeDocumentDB(const DocTypeName &docTypeName) override { + assert(_dbs.find(docTypeName) != _dbs.end()); + _dbs.erase(docTypeName); + std::ostringstream os; + os << "remove db " << docTypeName.getName(); + _log.push_back(os.str()); + } + virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override { + std::ostringstream os; + os << "reconf bootstrap " << bootstrapConfig->getGeneration(); + _log.push_back(os.str()); + } + void reconfigureDocumentDB(const vespalib::string &name, const DocumentDBConfig::SP &config) + { + std::ostringstream os; + os << "reconf db " << name << " " << config->getGeneration(); + _log.push_back(os.str()); + } + void sync() { _executor.sync(); } +}; + +void +MyDocumentDBConfigOwner::reconfigure(const DocumentDBConfig::SP & config) +{ + _owner.reconfigureDocumentDB(_name, config); +} + +struct Fixture +{ + MyProtonConfigurerOwner _owner; + ConfigFixture _config; + ProtonConfigurer _configurer; + + Fixture() + : _owner(), + _config("test"), + _configurer(_owner._executor, _owner) + { + } + ~Fixture() { } + + void assertLog(const std::vector<vespalib::string> &expLog) { + EXPECT_EQUAL(expLog, _owner._log); + } + void sync() { _owner.sync(); } + void addDocType(const vespalib::string &name) { _config.addDocType(name); } + void removeDocType(const vespalib::string &name) { _config.removeDocType(name); } + void applyConfig() { + _configurer.reconfigure(_config.getConfigSnapshot()); + sync(); + } + + void applyInitialConfig() { + applyConfig(); // sets initial pending config + _configurer.applyInitialConfig(_config.getConfigSnapshot(), InitializeThreads()); + } + void reconfigure() { + _config.newConfig(); + applyConfig(); + } + + void allowReconfig() { + _configurer.setAllowReconfig(true); + sync(); + } + void disableReconfig() { + _configurer.setAllowReconfig(false); + } +}; + +TEST_F("require that nothing is applied before initial config", Fixture()) +{ + f.applyConfig(); + TEST_DO(f1.assertLog({})); +} + +TEST_F("require that initial config is applied", Fixture()) +{ + f.applyInitialConfig(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"})); +} + +TEST_F("require that new config is blocked", Fixture()) +{ + f.applyInitialConfig(); + f.reconfigure(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"})); +} + +TEST_F("require that new config can ba unblocked", Fixture()) +{ + f.applyInitialConfig(); + f.reconfigure(); + f.allowReconfig(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "reconf bootstrap 3", "reconf db _alwaysthere_ 3"})); +} + +TEST_F("require that initial config is not reapplied due to config unblock", Fixture()) +{ + f.applyInitialConfig(); + f.allowReconfig(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"})); +} + +TEST_F("require that initial config is not reapplied due to unblock", Fixture()) +{ + f.applyInitialConfig(); + f.allowReconfig(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"})); +} + +TEST_F("require that we can add document db", Fixture()) +{ + f.applyInitialConfig(); + f.allowReconfig(); + f.addDocType("foobar"); + f.reconfigure(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "add db foobar 3"})); +} + +TEST_F("require that we can remove document db", Fixture()) +{ + f.addDocType("foobar"); + f.applyInitialConfig(); + f.allowReconfig(); + f.removeDocType("foobar"); + f.reconfigure(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "add db foobar 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "remove db foobar"})); +} + +TEST_F("require that document db adds and reconfigs are intermingled", Fixture()) +{ + f.addDocType("foobar"); + f.applyInitialConfig(); + f.allowReconfig(); + f.addDocType("abar"); + f.removeDocType("foobar"); + f.addDocType("foobar"); + f.addDocType("zbar"); + f.reconfigure(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "add db foobar 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "add db abar 3", "reconf db foobar 3", "add db zbar 3"})); +} + +TEST_F("require that document db removes are applied at end", Fixture()) +{ + f.addDocType("abar"); + f.addDocType("foobar"); + f.applyInitialConfig(); + f.allowReconfig(); + f.removeDocType("abar"); + f.reconfigure(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "add db abar 2", "add db foobar 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "reconf db foobar 3", "remove db abar"})); +} + +TEST_F("require that new configs can be blocked again", Fixture()) +{ + f.applyInitialConfig(); + f.reconfigure(); + f.allowReconfig(); + f.disableReconfig(); + f.reconfigure(); + TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "reconf bootstrap 3", "reconf db _alwaysthere_ 3"})); +} + +TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt index 09a635034f5..8e9141d0b40 100644 --- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt +++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt @@ -71,6 +71,8 @@ vespa_add_library(searchcore_server STATIC persistenceproviderproxy.cpp proton.cpp proton_config_fetcher.cpp + proton_config_snapshot.cpp + proton_configurer.cpp prune_session_cache_job.cpp pruneremoveddocumentsjob.cpp putdonecontext.cpp diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h new file mode 100644 index 00000000000..bdbdb9ff0f8 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h @@ -0,0 +1,33 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/vespalib/stllike/string.h> +#include <memory> + +namespace vespalib { class ThreadStackExecutorBase; } + +namespace proton { + +class IDocumentDBConfigOwner; + +/* + * Interface class for owner of a proton configurer, with callback methods + * for adding/removing document dbs and applying bootstrap config. + */ +class IProtonConfigurerOwner +{ + using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; +public: + virtual ~IProtonConfigurerOwner() { } + virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName &docTypeName, + const vespalib::string &configId, + const std::shared_ptr<BootstrapConfig> &bootstrapConfig, + const std::shared_ptr<DocumentDBConfig> &documentDBConfig, + InitializeThreads initializeThreads) = 0; + virtual void removeDocumentDB(const DocTypeName &docTypeName) = 0; + virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) = 0; +}; + + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp new file mode 100644 index 00000000000..88d3e46eedc --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp @@ -0,0 +1,18 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "proton_config_snapshot.h" + +namespace proton { + +ProtonConfigSnapshot::ProtonConfigSnapshot(std::shared_ptr<BootstrapConfig> bootstrapConfig, + DocumentDBConfigs documentDBConfigs) + : _bootstrapConfig(std::move(bootstrapConfig)), + _documentDBConfigs(std::move(documentDBConfigs)) +{ +} + +ProtonConfigSnapshot::~ProtonConfigSnapshot() +{ +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h new file mode 100644 index 00000000000..69a5b5801cc --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h @@ -0,0 +1,32 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcore/proton/common/doctypename.h> +#include <map> +#include <memory> + +namespace proton { + +class BootstrapConfig; +class DocumentDBConfig; + +/* + * Class representing a config snapshot accross all document dbs as well as + * the matching bootstrap config. + */ +class ProtonConfigSnapshot +{ + using DocumentDBConfigs = std::map<DocTypeName, std::shared_ptr<DocumentDBConfig>>; + std::shared_ptr<BootstrapConfig> _bootstrapConfig; + DocumentDBConfigs _documentDBConfigs; + +public: + ProtonConfigSnapshot(std::shared_ptr<BootstrapConfig> bootstrapConfig, + DocumentDBConfigs documentDBConfigs); + ~ProtonConfigSnapshot(); + const std::shared_ptr<BootstrapConfig> getBootstrapConfig() const { return _bootstrapConfig; } + const DocumentDBConfigs &getDocumentDBConfigs() const { return _documentDBConfigs; } +}; + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp new file mode 100644 index 00000000000..41b5ec52d6b --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp @@ -0,0 +1,176 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "proton_configurer.h" +#include "proton_config_snapshot.h" +#include "bootstrapconfig.h" +#include "i_proton_configurer_owner.h" +#include "proton_config_fetcher.h" +#include <vespa/searchlib/common/lambdatask.h> +#include <vespa/vespalib/util/threadstackexecutorbase.h> +#include <future> + +using search::makeLambdaTask; +using vespa::config::search::core::ProtonConfig; + +namespace proton { + +ProtonConfigurer::ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor, + IProtonConfigurerOwner &owner) + : _executor(executor), + _owner(owner), + _pendingConfigSnapshot(), + _activeConfigSnapshot(), + _mutex(), + _allowReconfig(false) +{ +} + +ProtonConfigurer::~ProtonConfigurer() +{ +} + +void +ProtonConfigurer::setAllowReconfig(bool allowReconfig) +{ + // called by proton app main thread + assert(!_executor.isCurrentThread()); + { + std::lock_guard<std::mutex> guard(_mutex); + _allowReconfig = allowReconfig; + if (allowReconfig) { + // Ensure that pending config is applied + _executor.execute(makeLambdaTask([=]() { performReconfigure(); })); + } + } + if (!allowReconfig) { + _executor.sync(); // drain queued performReconfigure tasks + } +} + +std::shared_ptr<ProtonConfigSnapshot> +ProtonConfigurer::getPendingConfigSnapshot() +{ + std::lock_guard<std::mutex> guard(_mutex); + return _pendingConfigSnapshot; +} + +std::shared_ptr<ProtonConfigSnapshot> +ProtonConfigurer::getActiveConfigSnapshot() +{ + std::lock_guard<std::mutex> guard(_mutex); + return _activeConfigSnapshot; +} + +void +ProtonConfigurer::reconfigure(std::shared_ptr<ProtonConfigSnapshot> configSnapshot) +{ + // called by proton config fetcher thread + assert(!_executor.isCurrentThread()); + std::lock_guard<std::mutex> guard(_mutex); + _pendingConfigSnapshot = configSnapshot; + if (_allowReconfig) { + _executor.execute(makeLambdaTask([=]() { performReconfigure(); })); + } +} + +void +ProtonConfigurer::performReconfigure() +{ + // called by proton executor thread + assert(_executor.isCurrentThread()); + auto configSnapshot(getPendingConfigSnapshot()); + applyConfig(configSnapshot, InitializeThreads(), false); +} + +bool +ProtonConfigurer::skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig) +{ + // called by proton executor thread + std::lock_guard<std::mutex> guard(_mutex); + assert((_activeConfigSnapshot.get() == nullptr) == initialConfig); + if (_activeConfigSnapshot.get() == configSnapshot) { + return true; // config snapshot already applied + } + if (!initialConfig && !_allowReconfig) { + return true; // reconfig not allowed + } + return false; +} + +void +ProtonConfigurer::applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot, + InitializeThreads initializeThreads, bool initialConfig) +{ + // called by proton executor thread + assert(_executor.isCurrentThread()); + if (skipConfig(configSnapshot.get(), initialConfig)) { + return; // config should be skipped + } + const auto &bootstrapConfig = configSnapshot->getBootstrapConfig(); + const ProtonConfig &protonConfig = bootstrapConfig->getProtonConfig(); + _owner.applyConfig(bootstrapConfig); + for (const auto &ddbConfig : protonConfig.documentdb) { + DocTypeName docTypeName(ddbConfig.inputdoctypename); + configureDocumentDB(*configSnapshot, docTypeName, ddbConfig.configid, initializeThreads); + } + pruneDocumentDBs(*configSnapshot); + std::lock_guard<std::mutex> guard(_mutex); + _activeConfigSnapshot = configSnapshot; +} + +void +ProtonConfigurer::configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, const DocTypeName &docTypeName, const vespalib::string &configId, const InitializeThreads &initializeThreads) +{ + // called by proton executor thread + const auto &bootstrapConfig = configSnapshot.getBootstrapConfig(); + const auto &documentDBConfigs = configSnapshot.getDocumentDBConfigs(); + auto cfgitr = documentDBConfigs.find(docTypeName); + assert(cfgitr != documentDBConfigs.end()); + const auto &documentDBConfig = cfgitr->second; + auto dbitr(_documentDBs.find(docTypeName)); + if (dbitr == _documentDBs.end()) { + auto *newdb = _owner.addDocumentDB(docTypeName, configId, bootstrapConfig, documentDBConfig, initializeThreads); + auto insres = _documentDBs.insert(std::make_pair(docTypeName, newdb)); + assert(insres.second); + } else { + dbitr->second->reconfigure(documentDBConfig); + } +} + +void +ProtonConfigurer::pruneDocumentDBs(const ProtonConfigSnapshot &configSnapshot) +{ + // called by proton executor thread + const auto &bootstrapConfig = configSnapshot.getBootstrapConfig(); + const ProtonConfig &protonConfig = bootstrapConfig->getProtonConfig(); + using DocTypeSet = std::set<DocTypeName>; + DocTypeSet newDocTypes; + for (const auto &ddbConfig : protonConfig.documentdb) { + DocTypeName docTypeName(ddbConfig.inputdoctypename); + newDocTypes.insert(docTypeName); + } + auto dbitr = _documentDBs.begin(); + while (dbitr != _documentDBs.end()) { + DocTypeSet::const_iterator found(newDocTypes.find(dbitr->first)); + if (found == newDocTypes.end()) { + _owner.removeDocumentDB(dbitr->first); + dbitr = _documentDBs.erase(dbitr); + } else { + ++dbitr; + } + } +} + +void +ProtonConfigurer::applyInitialConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot, + InitializeThreads initializeThreads) +{ + // called by proton app main thread + assert(!_executor.isCurrentThread()); + std::promise<bool> promise; + std::future<bool> future = promise.get_future(); + _executor.execute(makeLambdaTask([this, configSnapshot, initializeThreads, &promise]() { applyConfig(configSnapshot, initializeThreads, true); promise.set_value(true); })); + (void) future.get(); +} + +} // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h new file mode 100644 index 00000000000..ba5acde4473 --- /dev/null +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h @@ -0,0 +1,59 @@ +// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <vespa/searchcore/proton/common/doctypename.h> +#include <map> +#include <memory> +#include <mutex> +#include "executor_thread_service.h" + +namespace proton { + +class IDocumentDBConfigOwner; +class IProtonConfigurerOwner; +class BootstrapConfig; +class ProtonConfigSnapshot; + +/* + * Class to handle config changes to proton using config snapshots spanning + * all document types. + */ +class ProtonConfigurer +{ + using DocumentDBs = std::map<DocTypeName, IDocumentDBConfigOwner *>; + using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>; + + ExecutorThreadService _executor; + IProtonConfigurerOwner &_owner; + DocumentDBs _documentDBs; + std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot; + std::shared_ptr<ProtonConfigSnapshot> _activeConfigSnapshot; + std::mutex _mutex; + bool _allowReconfig; + + void performReconfigure(); + bool skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig); + void applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot, + InitializeThreads initializeThreads, bool initialConfig); + void configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, const DocTypeName &docTypeName, const vespalib::string &configId, const InitializeThreads &initializeThreads); + void pruneDocumentDBs(const ProtonConfigSnapshot &configSnapshot); +public: + ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor, + IProtonConfigurerOwner &owner); + + ~ProtonConfigurer(); + + void setAllowReconfig(bool allowReconfig); + + std::shared_ptr<ProtonConfigSnapshot> getPendingConfigSnapshot(); + + std::shared_ptr<ProtonConfigSnapshot> getActiveConfigSnapshot(); + + void reconfigure(std::shared_ptr<ProtonConfigSnapshot> configSnapshot); + + void applyInitialConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot, + InitializeThreads initializeThreads); +}; + +} // namespace proton |