summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--searchcore/CMakeLists.txt1
-rw-r--r--searchcore/src/tests/proton/proton_configurer/CMakeLists.txt9
-rw-r--r--searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp411
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt2
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h33
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp18
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h32
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp176
-rw-r--r--searchcore/src/vespa/searchcore/proton/server/proton_configurer.h59
9 files changed, 741 insertions, 0 deletions
diff --git a/searchcore/CMakeLists.txt b/searchcore/CMakeLists.txt
index d9aeeada520..273ae940770 100644
--- a/searchcore/CMakeLists.txt
+++ b/searchcore/CMakeLists.txt
@@ -120,6 +120,7 @@ vespa_define_module(
src/tests/proton/persistenceconformance
src/tests/proton/persistenceengine
src/tests/proton/proton
+ src/tests/proton/proton_configurer
src/tests/proton/reference/gid_to_lid_change_handler
src/tests/proton/reference/gid_to_lid_change_listener
src/tests/proton/reference/gid_to_lid_change_registrator
diff --git a/searchcore/src/tests/proton/proton_configurer/CMakeLists.txt b/searchcore/src/tests/proton/proton_configurer/CMakeLists.txt
new file mode 100644
index 00000000000..3aea88986f5
--- /dev/null
+++ b/searchcore/src/tests/proton/proton_configurer/CMakeLists.txt
@@ -0,0 +1,9 @@
+# Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+vespa_add_executable(searchcore_proton_configurer_test_app TEST
+ SOURCES
+ proton_configurer_test.cpp
+ DEPENDS
+ searchcore_server
+ searchcore_fconfig
+)
+vespa_add_test(NAME searchcore_proton_configurer_test_app COMMAND searchcore_proton_configurer_test_app)
diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
new file mode 100644
index 00000000000..2d58d94947f
--- /dev/null
+++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp
@@ -0,0 +1,411 @@
+// Copyright 2016 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include <map>
+#include <vespa/config-attributes.h>
+#include <vespa/config-imported-fields.h>
+#include <vespa/config-indexschema.h>
+#include <vespa/config-rank-profiles.h>
+#include <vespa/config-summary.h>
+#include <vespa/config-summarymap.h>
+#include <vespa/fileacquirer/config-filedistributorrpc.h>
+#include <vespa/searchcore/proton/server/bootstrapconfig.h>
+#include <vespa/searchcore/proton/server/bootstrapconfigmanager.h>
+#include <vespa/searchcore/proton/server/documentdbconfigmanager.h>
+#include <vespa/searchcore/proton/server/proton_config_fetcher.h>
+#include <vespa/searchcore/proton/server/proton_config_snapshot.h>
+#include <vespa/searchcore/proton/server/proton_configurer.h>
+#include <vespa/searchcore/proton/server/i_proton_configurer_owner.h>
+#include <vespa/searchsummary/config/config-juniperrc.h>
+#include <vespa/vespalib/testkit/testapp.h>
+#include <vespa/searchcommon/common/schemaconfigurer.h>
+#include <vespa/vespalib/util/threadstackexecutor.h>
+#include <vespa/vespalib/test/insertion_operators.h>
+
+using namespace config;
+using namespace proton;
+using namespace vespa::config::search::core;
+using namespace vespa::config::search::summary;
+using namespace vespa::config::search;
+using namespace cloud::config::filedistribution;
+
+using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+using config::ConfigUri;
+using document::DocumentTypeRepo;
+using document::DocumenttypesConfig;
+using document::DocumenttypesConfigBuilder;
+using search::TuneFileDocumentDB;
+using std::map;
+using search::index::Schema;
+using search::index::SchemaBuilder;
+using proton::matching::RankingConstants;
+
+struct DBConfigFixture {
+ using UP = std::unique_ptr<DBConfigFixture>;
+ AttributesConfigBuilder attributesBuilder;
+ RankProfilesConfigBuilder rankProfilesBuilder;
+ RankingConstantsConfigBuilder rankingConstantsBuilder;
+ IndexschemaConfigBuilder indexschemaBuilder;
+ SummaryConfigBuilder summaryBuilder;
+ SummarymapConfigBuilder summarymapBuilder;
+ JuniperrcConfigBuilder juniperrcBuilder;
+ ImportedFieldsConfigBuilder importedFieldsBuilder;
+
+ Schema::SP buildSchema()
+ {
+ Schema::SP schema(std::make_shared<Schema>());
+ SchemaBuilder::build(attributesBuilder, *schema);
+ SchemaBuilder::build(summaryBuilder, *schema);
+ SchemaBuilder::build(indexschemaBuilder, *schema);
+ SchemaBuilder::build(importedFieldsBuilder, *schema);
+ return schema;
+ }
+
+ RankingConstants::SP buildRankingConstants()
+ {
+ return std::make_shared<RankingConstants>();
+ }
+
+ DocumentDBConfig::SP getConfig(int64_t generation,
+ std::shared_ptr<DocumenttypesConfig> documentTypes,
+ DocumentTypeRepo::SP repo,
+ const vespalib::string &configId,
+ const vespalib::string &docTypeName)
+ {
+ return std::make_shared<DocumentDBConfig>
+ (generation,
+ std::make_shared<RankProfilesConfig>(rankProfilesBuilder),
+ buildRankingConstants(),
+ std::make_shared<IndexschemaConfig>(indexschemaBuilder),
+ std::make_shared<AttributesConfig>(attributesBuilder),
+ std::make_shared<SummaryConfig>(summaryBuilder),
+ std::make_shared<SummarymapConfig>(summarymapBuilder),
+ std::make_shared<JuniperrcConfig>(juniperrcBuilder),
+ documentTypes,
+ repo,
+ std::make_shared<ImportedFieldsConfig>(importedFieldsBuilder),
+ std::make_shared<TuneFileDocumentDB>(),
+ buildSchema(),
+ std::make_shared<DocumentDBMaintenanceConfig>(),
+ configId,
+ docTypeName,
+ config::ConfigSnapshot());
+ }
+};
+
+struct ConfigFixture {
+ const std::string configId;
+ ProtonConfigBuilder protonBuilder;
+ DocumenttypesConfigBuilder documenttypesBuilder;
+ FiledistributorrpcConfigBuilder filedistBuilder;
+ map<std::string, DBConfigFixture::UP> dbConfig;
+ int idcounter;
+ int64_t _generation;
+ std::shared_ptr<ProtonConfigSnapshot> _cachedConfigSnapshot;
+
+ ConfigFixture(const std::string & id)
+ : configId(id),
+ protonBuilder(),
+ documenttypesBuilder(),
+ filedistBuilder(),
+ dbConfig(),
+ idcounter(-1),
+ _generation(1),
+ _cachedConfigSnapshot()
+ {
+ addDocType("_alwaysthere_");
+ }
+
+ ~ConfigFixture() { }
+
+ DBConfigFixture *addDocType(const std::string & name) {
+ DocumenttypesConfigBuilder::Documenttype dt;
+ dt.bodystruct = -1270491200;
+ dt.headerstruct = 306916075;
+ dt.id = idcounter--;
+ dt.name = name;
+ dt.version = 0;
+ documenttypesBuilder.documenttype.push_back(dt);
+
+ ProtonConfigBuilder::Documentdb db;
+ db.inputdoctypename = name;
+ db.configid = configId + "/" + name;
+ protonBuilder.documentdb.push_back(db);
+
+ DBConfigFixture::UP fixture = std::make_unique<DBConfigFixture>();
+ return dbConfig.emplace(std::make_pair(name, std::move(fixture))).first->second.get();
+ }
+
+ void removeDocType(const std::string & name)
+ {
+ for (DocumenttypesConfigBuilder::DocumenttypeVector::iterator it(documenttypesBuilder.documenttype.begin()),
+ mt(documenttypesBuilder.documenttype.end());
+ it != mt;
+ it++) {
+ if ((*it).name.compare(name) == 0) {
+ documenttypesBuilder.documenttype.erase(it);
+ break;
+ }
+ }
+
+ for (ProtonConfigBuilder::DocumentdbVector::iterator it(protonBuilder.documentdb.begin()),
+ mt(protonBuilder.documentdb.end());
+ it != mt;
+ it++) {
+ if ((*it).inputdoctypename.compare(name) == 0) {
+ protonBuilder.documentdb.erase(it);
+ break;
+ }
+ }
+ }
+
+ BootstrapConfig::SP getBootstrapConfig(int64_t generation) const {
+ return BootstrapConfig::SP(new BootstrapConfig(generation,
+ BootstrapConfig::DocumenttypesConfigSP(new DocumenttypesConfig(documenttypesBuilder)),
+ DocumentTypeRepo::SP(new DocumentTypeRepo(documenttypesBuilder)),
+ BootstrapConfig::ProtonConfigSP(new ProtonConfig(protonBuilder)),
+ std::make_shared<FiledistributorrpcConfig>(),
+ std::make_shared<TuneFileDocumentDB>()));
+ }
+
+ std::shared_ptr<ProtonConfigSnapshot> getConfigSnapshot()
+ {
+ if (_cachedConfigSnapshot) {
+ return _cachedConfigSnapshot;
+ }
+ int64_t generation = ++_generation;
+ auto bootstrap = getBootstrapConfig(generation);
+ std::map<DocTypeName, DocumentDBConfig::SP> dbconfigs;
+ auto doctypes = bootstrap->getDocumenttypesConfigSP();
+ auto repo = bootstrap->getDocumentTypeRepoSP();
+ for (auto &db : dbConfig) {
+ DocTypeName name(db.first);
+ dbconfigs.insert(std::make_pair(name,
+ db.second->getConfig(generation,
+ doctypes,
+ repo,
+ configId + "/" + db.first,
+ db.first)));
+ }
+ _cachedConfigSnapshot = std::make_shared<ProtonConfigSnapshot>(bootstrap, dbconfigs);
+ return _cachedConfigSnapshot;
+ }
+ void newConfig() { _cachedConfigSnapshot.reset(); }
+
+};
+
+struct MyProtonConfigurerOwner;
+
+struct MyDocumentDBConfigOwner : public IDocumentDBConfigOwner
+{
+ vespalib::string _name;
+ MyProtonConfigurerOwner &_owner;
+ MyDocumentDBConfigOwner(const vespalib::string &name,
+ MyProtonConfigurerOwner &owner)
+ : IDocumentDBConfigOwner(),
+ _name(name),
+ _owner(owner)
+ {
+ }
+ virtual ~MyDocumentDBConfigOwner() { }
+
+ virtual void reconfigure(const DocumentDBConfig::SP & config) override;
+};
+
+struct MyProtonConfigurerOwner : public IProtonConfigurerOwner
+{
+ using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+ vespalib::ThreadStackExecutor _executor;
+ std::map<DocTypeName, std::shared_ptr<MyDocumentDBConfigOwner>> _dbs;
+ std::vector<vespalib::string> _log;
+
+ MyProtonConfigurerOwner()
+ : IProtonConfigurerOwner(),
+ _executor(1, 128 * 1024),
+ _dbs(),
+ _log()
+ {
+ }
+ virtual ~MyProtonConfigurerOwner() { }
+
+ virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName &docTypeName,
+ const vespalib::string &configId,
+ const std::shared_ptr<BootstrapConfig> &bootstrapConfig,
+ const std::shared_ptr<DocumentDBConfig> &documentDBConfig,
+ InitializeThreads initializeThreads) override
+ {
+ (void) configId;
+ (void) bootstrapConfig;
+ (void) initializeThreads;
+ assert(_dbs.find(docTypeName) == _dbs.end());
+ auto db = std::make_shared<MyDocumentDBConfigOwner>(docTypeName.getName(), *this);
+ _dbs.insert(std::make_pair(docTypeName, db));
+ std::ostringstream os;
+ os << "add db " << docTypeName.getName() << " " << documentDBConfig->getGeneration();
+ _log.push_back(os.str());
+ return db.get();
+ }
+ virtual void removeDocumentDB(const DocTypeName &docTypeName) override {
+ assert(_dbs.find(docTypeName) != _dbs.end());
+ _dbs.erase(docTypeName);
+ std::ostringstream os;
+ os << "remove db " << docTypeName.getName();
+ _log.push_back(os.str());
+ }
+ virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) override {
+ std::ostringstream os;
+ os << "reconf bootstrap " << bootstrapConfig->getGeneration();
+ _log.push_back(os.str());
+ }
+ void reconfigureDocumentDB(const vespalib::string &name, const DocumentDBConfig::SP &config)
+ {
+ std::ostringstream os;
+ os << "reconf db " << name << " " << config->getGeneration();
+ _log.push_back(os.str());
+ }
+ void sync() { _executor.sync(); }
+};
+
+void
+MyDocumentDBConfigOwner::reconfigure(const DocumentDBConfig::SP & config)
+{
+ _owner.reconfigureDocumentDB(_name, config);
+}
+
+struct Fixture
+{
+ MyProtonConfigurerOwner _owner;
+ ConfigFixture _config;
+ ProtonConfigurer _configurer;
+
+ Fixture()
+ : _owner(),
+ _config("test"),
+ _configurer(_owner._executor, _owner)
+ {
+ }
+ ~Fixture() { }
+
+ void assertLog(const std::vector<vespalib::string> &expLog) {
+ EXPECT_EQUAL(expLog, _owner._log);
+ }
+ void sync() { _owner.sync(); }
+ void addDocType(const vespalib::string &name) { _config.addDocType(name); }
+ void removeDocType(const vespalib::string &name) { _config.removeDocType(name); }
+ void applyConfig() {
+ _configurer.reconfigure(_config.getConfigSnapshot());
+ sync();
+ }
+
+ void applyInitialConfig() {
+ applyConfig(); // sets initial pending config
+ _configurer.applyInitialConfig(_config.getConfigSnapshot(), InitializeThreads());
+ }
+ void reconfigure() {
+ _config.newConfig();
+ applyConfig();
+ }
+
+ void allowReconfig() {
+ _configurer.setAllowReconfig(true);
+ sync();
+ }
+ void disableReconfig() {
+ _configurer.setAllowReconfig(false);
+ }
+};
+
+TEST_F("require that nothing is applied before initial config", Fixture())
+{
+ f.applyConfig();
+ TEST_DO(f1.assertLog({}));
+}
+
+TEST_F("require that initial config is applied", Fixture())
+{
+ f.applyInitialConfig();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"}));
+}
+
+TEST_F("require that new config is blocked", Fixture())
+{
+ f.applyInitialConfig();
+ f.reconfigure();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"}));
+}
+
+TEST_F("require that new config can ba unblocked", Fixture())
+{
+ f.applyInitialConfig();
+ f.reconfigure();
+ f.allowReconfig();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "reconf bootstrap 3", "reconf db _alwaysthere_ 3"}));
+}
+
+TEST_F("require that initial config is not reapplied due to config unblock", Fixture())
+{
+ f.applyInitialConfig();
+ f.allowReconfig();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"}));
+}
+
+TEST_F("require that initial config is not reapplied due to unblock", Fixture())
+{
+ f.applyInitialConfig();
+ f.allowReconfig();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2"}));
+}
+
+TEST_F("require that we can add document db", Fixture())
+{
+ f.applyInitialConfig();
+ f.allowReconfig();
+ f.addDocType("foobar");
+ f.reconfigure();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "add db foobar 3"}));
+}
+
+TEST_F("require that we can remove document db", Fixture())
+{
+ f.addDocType("foobar");
+ f.applyInitialConfig();
+ f.allowReconfig();
+ f.removeDocType("foobar");
+ f.reconfigure();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "add db foobar 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "remove db foobar"}));
+}
+
+TEST_F("require that document db adds and reconfigs are intermingled", Fixture())
+{
+ f.addDocType("foobar");
+ f.applyInitialConfig();
+ f.allowReconfig();
+ f.addDocType("abar");
+ f.removeDocType("foobar");
+ f.addDocType("foobar");
+ f.addDocType("zbar");
+ f.reconfigure();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "add db foobar 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "add db abar 3", "reconf db foobar 3", "add db zbar 3"}));
+}
+
+TEST_F("require that document db removes are applied at end", Fixture())
+{
+ f.addDocType("abar");
+ f.addDocType("foobar");
+ f.applyInitialConfig();
+ f.allowReconfig();
+ f.removeDocType("abar");
+ f.reconfigure();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "add db abar 2", "add db foobar 2", "reconf bootstrap 3","reconf db _alwaysthere_ 3", "reconf db foobar 3", "remove db abar"}));
+}
+
+TEST_F("require that new configs can be blocked again", Fixture())
+{
+ f.applyInitialConfig();
+ f.reconfigure();
+ f.allowReconfig();
+ f.disableReconfig();
+ f.reconfigure();
+ TEST_DO(f1.assertLog({"reconf bootstrap 2", "add db _alwaysthere_ 2", "reconf bootstrap 3", "reconf db _alwaysthere_ 3"}));
+}
+
+TEST_MAIN() { TEST_RUN_ALL(); }
diff --git a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
index 09a635034f5..8e9141d0b40 100644
--- a/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
+++ b/searchcore/src/vespa/searchcore/proton/server/CMakeLists.txt
@@ -71,6 +71,8 @@ vespa_add_library(searchcore_server STATIC
persistenceproviderproxy.cpp
proton.cpp
proton_config_fetcher.cpp
+ proton_config_snapshot.cpp
+ proton_configurer.cpp
prune_session_cache_job.cpp
pruneremoveddocumentsjob.cpp
putdonecontext.cpp
diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
new file mode 100644
index 00000000000..bdbdb9ff0f8
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h
@@ -0,0 +1,33 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/vespalib/stllike/string.h>
+#include <memory>
+
+namespace vespalib { class ThreadStackExecutorBase; }
+
+namespace proton {
+
+class IDocumentDBConfigOwner;
+
+/*
+ * Interface class for owner of a proton configurer, with callback methods
+ * for adding/removing document dbs and applying bootstrap config.
+ */
+class IProtonConfigurerOwner
+{
+ using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+public:
+ virtual ~IProtonConfigurerOwner() { }
+ virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName &docTypeName,
+ const vespalib::string &configId,
+ const std::shared_ptr<BootstrapConfig> &bootstrapConfig,
+ const std::shared_ptr<DocumentDBConfig> &documentDBConfig,
+ InitializeThreads initializeThreads) = 0;
+ virtual void removeDocumentDB(const DocTypeName &docTypeName) = 0;
+ virtual void applyConfig(const std::shared_ptr<BootstrapConfig> &bootstrapConfig) = 0;
+};
+
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp
new file mode 100644
index 00000000000..88d3e46eedc
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.cpp
@@ -0,0 +1,18 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "proton_config_snapshot.h"
+
+namespace proton {
+
+ProtonConfigSnapshot::ProtonConfigSnapshot(std::shared_ptr<BootstrapConfig> bootstrapConfig,
+ DocumentDBConfigs documentDBConfigs)
+ : _bootstrapConfig(std::move(bootstrapConfig)),
+ _documentDBConfigs(std::move(documentDBConfigs))
+{
+}
+
+ProtonConfigSnapshot::~ProtonConfigSnapshot()
+{
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h
new file mode 100644
index 00000000000..69a5b5801cc
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_config_snapshot.h
@@ -0,0 +1,32 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcore/proton/common/doctypename.h>
+#include <map>
+#include <memory>
+
+namespace proton {
+
+class BootstrapConfig;
+class DocumentDBConfig;
+
+/*
+ * Class representing a config snapshot accross all document dbs as well as
+ * the matching bootstrap config.
+ */
+class ProtonConfigSnapshot
+{
+ using DocumentDBConfigs = std::map<DocTypeName, std::shared_ptr<DocumentDBConfig>>;
+ std::shared_ptr<BootstrapConfig> _bootstrapConfig;
+ DocumentDBConfigs _documentDBConfigs;
+
+public:
+ ProtonConfigSnapshot(std::shared_ptr<BootstrapConfig> bootstrapConfig,
+ DocumentDBConfigs documentDBConfigs);
+ ~ProtonConfigSnapshot();
+ const std::shared_ptr<BootstrapConfig> getBootstrapConfig() const { return _bootstrapConfig; }
+ const DocumentDBConfigs &getDocumentDBConfigs() const { return _documentDBConfigs; }
+};
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
new file mode 100644
index 00000000000..41b5ec52d6b
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp
@@ -0,0 +1,176 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "proton_configurer.h"
+#include "proton_config_snapshot.h"
+#include "bootstrapconfig.h"
+#include "i_proton_configurer_owner.h"
+#include "proton_config_fetcher.h"
+#include <vespa/searchlib/common/lambdatask.h>
+#include <vespa/vespalib/util/threadstackexecutorbase.h>
+#include <future>
+
+using search::makeLambdaTask;
+using vespa::config::search::core::ProtonConfig;
+
+namespace proton {
+
+ProtonConfigurer::ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor,
+ IProtonConfigurerOwner &owner)
+ : _executor(executor),
+ _owner(owner),
+ _pendingConfigSnapshot(),
+ _activeConfigSnapshot(),
+ _mutex(),
+ _allowReconfig(false)
+{
+}
+
+ProtonConfigurer::~ProtonConfigurer()
+{
+}
+
+void
+ProtonConfigurer::setAllowReconfig(bool allowReconfig)
+{
+ // called by proton app main thread
+ assert(!_executor.isCurrentThread());
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ _allowReconfig = allowReconfig;
+ if (allowReconfig) {
+ // Ensure that pending config is applied
+ _executor.execute(makeLambdaTask([=]() { performReconfigure(); }));
+ }
+ }
+ if (!allowReconfig) {
+ _executor.sync(); // drain queued performReconfigure tasks
+ }
+}
+
+std::shared_ptr<ProtonConfigSnapshot>
+ProtonConfigurer::getPendingConfigSnapshot()
+{
+ std::lock_guard<std::mutex> guard(_mutex);
+ return _pendingConfigSnapshot;
+}
+
+std::shared_ptr<ProtonConfigSnapshot>
+ProtonConfigurer::getActiveConfigSnapshot()
+{
+ std::lock_guard<std::mutex> guard(_mutex);
+ return _activeConfigSnapshot;
+}
+
+void
+ProtonConfigurer::reconfigure(std::shared_ptr<ProtonConfigSnapshot> configSnapshot)
+{
+ // called by proton config fetcher thread
+ assert(!_executor.isCurrentThread());
+ std::lock_guard<std::mutex> guard(_mutex);
+ _pendingConfigSnapshot = configSnapshot;
+ if (_allowReconfig) {
+ _executor.execute(makeLambdaTask([=]() { performReconfigure(); }));
+ }
+}
+
+void
+ProtonConfigurer::performReconfigure()
+{
+ // called by proton executor thread
+ assert(_executor.isCurrentThread());
+ auto configSnapshot(getPendingConfigSnapshot());
+ applyConfig(configSnapshot, InitializeThreads(), false);
+}
+
+bool
+ProtonConfigurer::skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig)
+{
+ // called by proton executor thread
+ std::lock_guard<std::mutex> guard(_mutex);
+ assert((_activeConfigSnapshot.get() == nullptr) == initialConfig);
+ if (_activeConfigSnapshot.get() == configSnapshot) {
+ return true; // config snapshot already applied
+ }
+ if (!initialConfig && !_allowReconfig) {
+ return true; // reconfig not allowed
+ }
+ return false;
+}
+
+void
+ProtonConfigurer::applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot,
+ InitializeThreads initializeThreads, bool initialConfig)
+{
+ // called by proton executor thread
+ assert(_executor.isCurrentThread());
+ if (skipConfig(configSnapshot.get(), initialConfig)) {
+ return; // config should be skipped
+ }
+ const auto &bootstrapConfig = configSnapshot->getBootstrapConfig();
+ const ProtonConfig &protonConfig = bootstrapConfig->getProtonConfig();
+ _owner.applyConfig(bootstrapConfig);
+ for (const auto &ddbConfig : protonConfig.documentdb) {
+ DocTypeName docTypeName(ddbConfig.inputdoctypename);
+ configureDocumentDB(*configSnapshot, docTypeName, ddbConfig.configid, initializeThreads);
+ }
+ pruneDocumentDBs(*configSnapshot);
+ std::lock_guard<std::mutex> guard(_mutex);
+ _activeConfigSnapshot = configSnapshot;
+}
+
+void
+ProtonConfigurer::configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, const DocTypeName &docTypeName, const vespalib::string &configId, const InitializeThreads &initializeThreads)
+{
+ // called by proton executor thread
+ const auto &bootstrapConfig = configSnapshot.getBootstrapConfig();
+ const auto &documentDBConfigs = configSnapshot.getDocumentDBConfigs();
+ auto cfgitr = documentDBConfigs.find(docTypeName);
+ assert(cfgitr != documentDBConfigs.end());
+ const auto &documentDBConfig = cfgitr->second;
+ auto dbitr(_documentDBs.find(docTypeName));
+ if (dbitr == _documentDBs.end()) {
+ auto *newdb = _owner.addDocumentDB(docTypeName, configId, bootstrapConfig, documentDBConfig, initializeThreads);
+ auto insres = _documentDBs.insert(std::make_pair(docTypeName, newdb));
+ assert(insres.second);
+ } else {
+ dbitr->second->reconfigure(documentDBConfig);
+ }
+}
+
+void
+ProtonConfigurer::pruneDocumentDBs(const ProtonConfigSnapshot &configSnapshot)
+{
+ // called by proton executor thread
+ const auto &bootstrapConfig = configSnapshot.getBootstrapConfig();
+ const ProtonConfig &protonConfig = bootstrapConfig->getProtonConfig();
+ using DocTypeSet = std::set<DocTypeName>;
+ DocTypeSet newDocTypes;
+ for (const auto &ddbConfig : protonConfig.documentdb) {
+ DocTypeName docTypeName(ddbConfig.inputdoctypename);
+ newDocTypes.insert(docTypeName);
+ }
+ auto dbitr = _documentDBs.begin();
+ while (dbitr != _documentDBs.end()) {
+ DocTypeSet::const_iterator found(newDocTypes.find(dbitr->first));
+ if (found == newDocTypes.end()) {
+ _owner.removeDocumentDB(dbitr->first);
+ dbitr = _documentDBs.erase(dbitr);
+ } else {
+ ++dbitr;
+ }
+ }
+}
+
+void
+ProtonConfigurer::applyInitialConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot,
+ InitializeThreads initializeThreads)
+{
+ // called by proton app main thread
+ assert(!_executor.isCurrentThread());
+ std::promise<bool> promise;
+ std::future<bool> future = promise.get_future();
+ _executor.execute(makeLambdaTask([this, configSnapshot, initializeThreads, &promise]() { applyConfig(configSnapshot, initializeThreads, true); promise.set_value(true); }));
+ (void) future.get();
+}
+
+} // namespace proton
diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
new file mode 100644
index 00000000000..ba5acde4473
--- /dev/null
+++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h
@@ -0,0 +1,59 @@
+// Copyright 2017 Yahoo Inc. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <vespa/searchcore/proton/common/doctypename.h>
+#include <map>
+#include <memory>
+#include <mutex>
+#include "executor_thread_service.h"
+
+namespace proton {
+
+class IDocumentDBConfigOwner;
+class IProtonConfigurerOwner;
+class BootstrapConfig;
+class ProtonConfigSnapshot;
+
+/*
+ * Class to handle config changes to proton using config snapshots spanning
+ * all document types.
+ */
+class ProtonConfigurer
+{
+ using DocumentDBs = std::map<DocTypeName, IDocumentDBConfigOwner *>;
+ using InitializeThreads = std::shared_ptr<vespalib::ThreadStackExecutorBase>;
+
+ ExecutorThreadService _executor;
+ IProtonConfigurerOwner &_owner;
+ DocumentDBs _documentDBs;
+ std::shared_ptr<ProtonConfigSnapshot> _pendingConfigSnapshot;
+ std::shared_ptr<ProtonConfigSnapshot> _activeConfigSnapshot;
+ std::mutex _mutex;
+ bool _allowReconfig;
+
+ void performReconfigure();
+ bool skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig);
+ void applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot,
+ InitializeThreads initializeThreads, bool initialConfig);
+ void configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, const DocTypeName &docTypeName, const vespalib::string &configId, const InitializeThreads &initializeThreads);
+ void pruneDocumentDBs(const ProtonConfigSnapshot &configSnapshot);
+public:
+ ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor,
+ IProtonConfigurerOwner &owner);
+
+ ~ProtonConfigurer();
+
+ void setAllowReconfig(bool allowReconfig);
+
+ std::shared_ptr<ProtonConfigSnapshot> getPendingConfigSnapshot();
+
+ std::shared_ptr<ProtonConfigSnapshot> getActiveConfigSnapshot();
+
+ void reconfigure(std::shared_ptr<ProtonConfigSnapshot> configSnapshot);
+
+ void applyInitialConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot,
+ InitializeThreads initializeThreads);
+};
+
+} // namespace proton