diff options
14 files changed, 71 insertions, 34 deletions
diff --git a/searchcore/src/apps/tests/persistenceconformance_test.cpp b/searchcore/src/apps/tests/persistenceconformance_test.cpp index de0acf643fc..e95f572f7e3 100644 --- a/searchcore/src/apps/tests/persistenceconformance_test.cpp +++ b/searchcore/src/apps/tests/persistenceconformance_test.cpp @@ -186,6 +186,7 @@ public: _queryLimiter, _clock, docType, + document::BucketSpace::placeHolder(), *b->getProtonConfigSP(), const_cast<DocumentDBFactory &>(*this), _summaryExecutor, @@ -316,7 +317,7 @@ public: LOG(info, "putHandler(%s)", itr->first.toString().c_str()); IPersistenceHandler::SP proxy( new PersistenceHandlerProxy(itr->second)); - putHandler(itr->first, proxy); + putHandler(document::BucketSpace::placeHolder(), itr->first, proxy); } } @@ -328,7 +329,7 @@ public: const DocumentDBMap &docDbs = _docDbRepo->getDocDbs(); for (DocumentDBMap::const_iterator itr = docDbs.begin(); itr != docDbs.end(); ++itr) { - IPersistenceHandler::SP proxy(removeHandler(itr->first)); + IPersistenceHandler::SP proxy(removeHandler(itr->second->getBucketSpace(), itr->first)); } } diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index da85bb3e8c0..8787882b5ae 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -209,7 +209,8 @@ public: _configMgr.nextGeneration(0); if (! FastOS_File::MakeDirectory((std::string("tmpdb/") + docTypeName).c_str())) { abort(); } _ddb.reset(new DocumentDB("tmpdb", _configMgr.getConfig(), "tcp/localhost:9013", _queryLimiter, _clock, - DocTypeName(docTypeName), *b->getProtonConfigSP(), *this, _summaryExecutor, _summaryExecutor, + DocTypeName(docTypeName), document::BucketSpace::placeHolder(), + *b->getProtonConfigSP(), *this, _summaryExecutor, _summaryExecutor, _tls, _dummy, _fileHeaderContext, ConfigStore::UP(new MemoryConfigStore), std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo)), _ddb->start(); diff --git a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp index 96b98dd4274..f737b2ed92c 100644 --- a/searchcore/src/tests/proton/documentdb/documentdb_test.cpp +++ b/searchcore/src/tests/proton/documentdb/documentdb_test.cpp @@ -114,6 +114,7 @@ Fixture::Fixture() mgr.forwardConfig(b); mgr.nextGeneration(0); _db.reset(new DocumentDB(".", mgr.getConfig(), "tcp/localhost:9014", _queryLimiter, _clock, DocTypeName("typea"), + document::BucketSpace::placeHolder(), *b->getProtonConfigSP(), _myDBOwner, _summaryExecutor, _summaryExecutor, _tls, _dummy, _fileHeaderContext, ConfigStore::UP(new MemoryConfigStore), std::make_shared<vespalib::ThreadStackExecutor>(16, 128 * 1024), _hwInfo)); diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 97ff517821c..4422c67c5bd 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -14,6 +14,7 @@ #include <set> using document::BucketId; +using document::BucketSpace; using document::Document; using document::DocumentId; using document::DocumentType; @@ -432,8 +433,8 @@ struct SimpleFixture { engine(_owner, _writeFilter, -1, false), hset() { - engine.putHandler(DocTypeName(doc1->getType()), hset.phandler1); - engine.putHandler(DocTypeName(doc2->getType()), hset.phandler2); + engine.putHandler(BucketSpace::placeHolder(), DocTypeName(doc1->getType()), hset.phandler1); + engine.putHandler(BucketSpace::placeHolder(), DocTypeName(doc2->getType()), hset.phandler2); } }; diff --git a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp index 08aa4430fde..a99eaa0daf6 100644 --- a/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp +++ b/searchcore/src/tests/proton/proton_configurer/proton_configurer_test.cpp @@ -230,11 +230,13 @@ struct MyProtonConfigurerOwner : public IProtonConfigurerOwner virtual ~MyProtonConfigurerOwner() { } virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName &docTypeName, + document::BucketSpace bucketSpace, const vespalib::string &configId, const std::shared_ptr<BootstrapConfig> &bootstrapConfig, const std::shared_ptr<DocumentDBConfig> &documentDBConfig, InitializeThreads initializeThreads) override { + (void) bucketSpace; (void) configId; (void) bootstrapConfig; (void) initializeThreads; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index eaf163161b3..b5a4872604f 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -233,9 +233,11 @@ PersistenceEngine::~PersistenceEngine() IPersistenceHandler::SP -PersistenceEngine::putHandler(const DocTypeName &docType, +PersistenceEngine::putHandler(document::BucketSpace bucketSpace, + const DocTypeName &docType, const IPersistenceHandler::SP &handler) { + (void) bucketSpace; LockGuard guard(_lock); return _handlers.putHandler(docType, handler); } @@ -250,8 +252,10 @@ PersistenceEngine::getHandler(const DocTypeName &docType) const IPersistenceHandler::SP -PersistenceEngine::removeHandler(const DocTypeName &docType) +PersistenceEngine::removeHandler(document::BucketSpace bucketSpace, + const DocTypeName &docType) { + (void) bucketSpace; // TODO: Grab bucket list and treat them as modified LockGuard guard(_lock); return _handlers.removeHandler(docType); diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 1204aab2fcf..758b39f4417 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -3,6 +3,7 @@ #include "document_iterator.h" #include "i_resource_write_filter.h" +#include <vespa/document/bucket/bucketspace.h> #include <vespa/persistence/spi/abstractpersistenceprovider.h> #include <vespa/searchcore/proton/common/handlermap.hpp> #include <vespa/searchcore/proton/persistenceengine/ipersistencehandler.h> @@ -102,9 +103,12 @@ public: ssize_t defaultSerializedSize, bool ignoreMaxBytes); ~PersistenceEngine(); - IPersistenceHandler::SP putHandler(const DocTypeName &docType, const IPersistenceHandler::SP &handler); + IPersistenceHandler::SP putHandler(document::BucketSpace bucketSpace, + const DocTypeName &docType, + const IPersistenceHandler::SP &handler); IPersistenceHandler::SP getHandler(const DocTypeName &docType) const; - IPersistenceHandler::SP removeHandler(const DocTypeName &docType); + IPersistenceHandler::SP removeHandler(document::BucketSpace bucketSpace, + const DocTypeName &docType); // Implements PersistenceProvider virtual Result initialize() override; diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp index 5f9a018aed5..e02a6eb7ee3 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.cpp @@ -80,16 +80,17 @@ DocumentDB::masterExecute(FunctionType &&function) { } DocumentDB::DocumentDB(const vespalib::string &baseDir, - const DocumentDBConfig::SP & configSnapshot, + const DocumentDBConfig::SP &configSnapshot, const vespalib::string &tlsSpec, - matching::QueryLimiter & queryLimiter, + matching::QueryLimiter &queryLimiter, const vespalib::Clock &clock, const DocTypeName &docTypeName, - const ProtonConfig & protonCfg, - IDocumentDBOwner & owner, - vespalib::ThreadExecutor & warmupExecutor, - vespalib::ThreadStackExecutorBase & summaryExecutor, - search::transactionlog::Writer & tlsDirectWriter, + document::BucketSpace bucketSpace, + const ProtonConfig &protonCfg, + IDocumentDBOwner &owner, + vespalib::ThreadExecutor &warmupExecutor, + vespalib::ThreadStackExecutorBase &summaryExecutor, + search::transactionlog::Writer &tlsDirectWriter, MetricsWireService &metricsWireService, const FileHeaderContext &fileHeaderContext, ConfigStore::UP config_store, @@ -102,6 +103,7 @@ DocumentDB::DocumentDB(const vespalib::string &baseDir, IClusterStateChangedHandler(), search::transactionlog::SyncProxy(), _docTypeName(docTypeName), + _bucketSpace(bucketSpace), _baseDir(baseDir + "/" + _docTypeName.toString()), // Only one thread per executor, or performDropFeedView() will fail. _defaultExecutorTaskLimit(protonCfg.indexing.tasklimit), diff --git a/searchcore/src/vespa/searchcore/proton/server/documentdb.h b/searchcore/src/vespa/searchcore/proton/server/documentdb.h index 7fff00adc72..b97b5e310ea 100644 --- a/searchcore/src/vespa/searchcore/proton/server/documentdb.h +++ b/searchcore/src/vespa/searchcore/proton/server/documentdb.h @@ -78,6 +78,7 @@ private: using ProtonConfig = vespa::config::search::core::ProtonConfig; DocTypeName _docTypeName; + document::BucketSpace _bucketSpace; vespalib::string _baseDir; uint32_t _defaultExecutorTaskLimit; uint32_t _semiUnboundExecutorTaskLimit; @@ -233,16 +234,17 @@ public: * @param config_store Access to read and write configs. */ DocumentDB(const vespalib::string &baseDir, - const DocumentDBConfig::SP & currentSnapshot, + const DocumentDBConfig::SP ¤tSnapshot, const vespalib::string &tlsSpec, - matching::QueryLimiter & queryLimiter, + matching::QueryLimiter &queryLimiter, const vespalib::Clock &clock, const DocTypeName &docTypeName, + document::BucketSpace bucketSpace, const ProtonConfig &protonCfg, - IDocumentDBOwner & owner, - vespalib::ThreadExecutor & warmupExecutor, - vespalib::ThreadStackExecutorBase & summaryExecutor, - search::transactionlog::Writer & tlsDirectWriter, + IDocumentDBOwner &owner, + vespalib::ThreadExecutor &warmupExecutor, + vespalib::ThreadStackExecutorBase &summaryExecutor, + search::transactionlog::Writer &tlsDirectWriter, MetricsWireService &metricsWireService, const search::common::FileHeaderContext &fileHeaderContext, ConfigStore::UP config_store, @@ -392,6 +394,7 @@ public: bool getDelayedConfig() const { return _state.getDelayedConfig(); } void replayConfig(SerialNum serialNum) override; const DocTypeName & getDocTypeName() const { return _docTypeName; } + document::BucketSpace getBucketSpace() const { return _bucketSpace; } void newConfigSnapshot(DocumentDBConfig::SP snapshot); void reconfigure(const DocumentDBConfig::SP & snapshot) override; int64_t getActiveGeneration() const; diff --git a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h index e5c018958fe..242e99fffbf 100644 --- a/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h +++ b/searchcore/src/vespa/searchcore/proton/server/i_proton_configurer_owner.h @@ -2,6 +2,7 @@ #pragma once +#include <vespa/document/bucket/bucketspace.h> #include <vespa/vespalib/stllike/string.h> #include <memory> @@ -21,6 +22,7 @@ class IProtonConfigurerOwner public: virtual ~IProtonConfigurerOwner() { } virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName &docTypeName, + document::BucketSpace bucketSpace, const vespalib::string &configId, const std::shared_ptr<BootstrapConfig> &bootstrapConfig, const std::shared_ptr<DocumentDBConfig> &documentDBConfig, diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.cpp b/searchcore/src/vespa/searchcore/proton/server/proton.cpp index 3689edc55a4..82db32adb7b 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton.cpp @@ -356,9 +356,10 @@ Proton::applyConfig(const BootstrapConfig::SP & configSnapshot) } IDocumentDBConfigOwner * -Proton::addDocumentDB(const DocTypeName & docTypeName, - const vespalib::string & configId, - const BootstrapConfig::SP & bootstrapConfig, +Proton::addDocumentDB(const DocTypeName &docTypeName, + document::BucketSpace bucketSpace, + const vespalib::string &configId, + const BootstrapConfig::SP &bootstrapConfig, const DocumentDBConfig::SP &documentDBConfig, InitializeThreads initializeThreads) { @@ -368,7 +369,7 @@ Proton::addDocumentDB(const DocTypeName & docTypeName, if (docType != NULL) { LOG(info, "Add document database: doctypename(%s), configid(%s)", docTypeName.toString().c_str(), configId.c_str()); - return addDocumentDB(*docType, bootstrapConfig, documentDBConfig, initializeThreads).get(); + return addDocumentDB(*docType, bucketSpace, bootstrapConfig, documentDBConfig, initializeThreads).get(); } else { LOG(warning, @@ -514,6 +515,7 @@ Proton::getDocumentDB(const document::DocumentType &docType) DocumentDB::SP Proton::addDocumentDB(const document::DocumentType &docType, + document::BucketSpace bucketSpace, const BootstrapConfig::SP &bootstrapConfig, const DocumentDBConfig::SP &documentDBConfig, InitializeThreads initializeThreads) @@ -547,6 +549,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, _queryLimiter, _clock, docTypeName, + bucketSpace, config, *this, *_warmupExecutor, @@ -585,7 +588,7 @@ Proton::addDocumentDB(const document::DocumentType &docType, _persistenceEngine->populateInitialBucketDB(*persistenceHandler); } // TODO: Fix race with new cluster state setting. - _persistenceEngine->putHandler(docTypeName, persistenceHandler); + _persistenceEngine->putHandler(bucketSpace, docTypeName, persistenceHandler); } SearchHandlerProxy::SP searchHandler(new SearchHandlerProxy(ret)); _summaryEngine->putSearchHandler(docTypeName, searchHandler); @@ -604,8 +607,9 @@ Proton::removeDocumentDB(const DocTypeName &docTypeName) { std::lock_guard<std::shared_timed_mutex> guard(_mutex); DocumentDBMap::iterator it = _documentDBMap.find(docTypeName); - if (it == _documentDBMap.end()) + if (it == _documentDBMap.end()) { return; + } old = it->second; _documentDBMap.erase(it); } @@ -616,7 +620,7 @@ Proton::removeDocumentDB(const DocTypeName &docTypeName) // Not allowed to get to service layer to call pause(). std::unique_lock<std::shared_timed_mutex> persistenceWguard(_persistenceEngine->getWLock()); IPersistenceHandler::SP oldHandler; - oldHandler = _persistenceEngine->removeHandler(docTypeName); + oldHandler = _persistenceEngine->removeHandler(old->getBucketSpace(), docTypeName); if (_initComplete && oldHandler) { // TODO: Fix race with bucket db modifying ops. _persistenceEngine->grabExtraModifiedBuckets(*oldHandler); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton.h b/searchcore/src/vespa/searchcore/proton/server/proton.h index 0d6c136b244..67f3662549d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton.h @@ -138,6 +138,7 @@ private: std::shared_ptr<IDocumentDBReferenceRegistry> _documentDBReferenceRegistry; virtual IDocumentDBConfigOwner *addDocumentDB(const DocTypeName & docTypeName, + document::BucketSpace bucketSpace, const vespalib::string & configid, const BootstrapConfig::SP & bootstrapConfig, const std::shared_ptr<DocumentDBConfig> &documentDBConfig, @@ -190,6 +191,7 @@ public: DocumentDB::SP addDocumentDB(const document::DocumentType &docType, + document::BucketSpace bucketSpace, const BootstrapConfig::SP &configSnapshot, const std::shared_ptr<DocumentDBConfig> &documentDBConfig, InitializeThreads initializeThreads); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp index fe3314e7976..436cf3865e5 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.cpp @@ -114,7 +114,9 @@ ProtonConfigurer::applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapsh _owner.applyConfig(bootstrapConfig); for (const auto &ddbConfig : protonConfig.documentdb) { DocTypeName docTypeName(ddbConfig.inputdoctypename); - configureDocumentDB(*configSnapshot, docTypeName, ddbConfig.configid, initializeThreads); + // TODO: set bucket space based on config when available + document::BucketSpace bucketSpace = document::BucketSpace::placeHolder(); + configureDocumentDB(*configSnapshot, docTypeName, bucketSpace, ddbConfig.configid, initializeThreads); } pruneDocumentDBs(*configSnapshot); size_t gen = bootstrapConfig->getGeneration(); @@ -124,7 +126,11 @@ ProtonConfigurer::applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapsh } void -ProtonConfigurer::configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, const DocTypeName &docTypeName, const vespalib::string &configId, const InitializeThreads &initializeThreads) +ProtonConfigurer::configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, + const DocTypeName &docTypeName, + document::BucketSpace bucketSpace, + const vespalib::string &configId, + const InitializeThreads &initializeThreads) { // called by proton executor thread const auto &bootstrapConfig = configSnapshot.getBootstrapConfig(); @@ -134,7 +140,7 @@ ProtonConfigurer::configureDocumentDB(const ProtonConfigSnapshot &configSnapshot const auto &documentDBConfig = cfgitr->second; auto dbitr(_documentDBs.find(docTypeName)); if (dbitr == _documentDBs.end()) { - auto *newdb = _owner.addDocumentDB(docTypeName, configId, bootstrapConfig, documentDBConfig, initializeThreads); + auto *newdb = _owner.addDocumentDB(docTypeName, bucketSpace, configId, bootstrapConfig, documentDBConfig, initializeThreads); if (newdb != nullptr) { auto insres = _documentDBs.insert(std::make_pair(docTypeName, newdb)); assert(insres.second); diff --git a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h index a7f3fd61d0b..149be3a9e62 100644 --- a/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h +++ b/searchcore/src/vespa/searchcore/proton/server/proton_configurer.h @@ -2,12 +2,13 @@ #pragma once +#include "executor_thread_service.h" #include "i_proton_configurer.h" +#include <vespa/document/bucket/bucketspace.h> #include <vespa/searchcore/proton/common/doctypename.h> #include <vespa/vespalib/net/simple_component_config_producer.h> #include <map> #include <mutex> -#include "executor_thread_service.h" namespace proton { @@ -37,8 +38,11 @@ class ProtonConfigurer : public IProtonConfigurer bool skipConfig(const ProtonConfigSnapshot *configSnapshot, bool initialConfig); void applyConfig(std::shared_ptr<ProtonConfigSnapshot> configSnapshot, InitializeThreads initializeThreads, bool initialConfig); - void configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, const DocTypeName &docTypeName, const vespalib::string &configId, const InitializeThreads &initializeThreads); + void configureDocumentDB(const ProtonConfigSnapshot &configSnapshot, + const DocTypeName &docTypeName, document::BucketSpace bucketSpace, + const vespalib::string &configId, const InitializeThreads &initializeThreads); void pruneDocumentDBs(const ProtonConfigSnapshot &configSnapshot); + public: ProtonConfigurer(vespalib::ThreadStackExecutorBase &executor, IProtonConfigurerOwner &owner); |