// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include LOG_SETUP("persistenceconformance_test"); using namespace config; using namespace proton; using namespace cloud::config::filedistribution; using namespace vespa::config::search::core; using namespace vespa::config::search::summary; using namespace vespa::config::search; using namespace std::chrono_literals; using vespa::config::content::core::BucketspacesConfig; using std::shared_ptr; using document::BucketSpace; using document::DocumentType; using document::DocumentTypeRepo; using document::TestDocMan; using document::test::makeBucketSpace; using search::TuneFileDocumentDB; using search::index::DummyFileHeaderContext; using search::index::Schema; using search::transactionlog::TransLogServer; using storage::spi::ConformanceTest; using storage::spi::PersistenceProvider; typedef ConformanceTest::PersistenceFactory PersistenceFactory; typedef DocumentDBConfig::DocumenttypesConfigSP DocumenttypesConfigSP; typedef std::map DocumentDBMap; typedef std::vector DocTypeVector; void storeDocType(DocTypeVector *types, const DocumentType &type) { types->push_back(DocTypeName(type.getName())); } struct SchemaConfigFactory { typedef DocumentDBConfig CS; typedef std::shared_ptr SP; virtual ~SchemaConfigFactory() = default; static SchemaConfigFactory::SP get() { return std::make_shared(); } virtual CS::IndexschemaConfigSP createIndexSchema(const DocumentType &docType) { (void) docType; return std::make_shared(); } virtual CS::AttributesConfigSP createAttributes(const DocumentType &docType) { (void) docType; return std::make_shared(); } virtual CS::SummaryConfigSP createSummary(const DocumentType &docType) { (void) docType; return std::make_shared(); } }; class ConfigFactory { private: std::shared_ptr _repo; DocumenttypesConfigSP _typeCfg; SchemaConfigFactory::SP _schemaFactory; public: ConfigFactory(std::shared_ptr repo, DocumenttypesConfigSP typeCfg, SchemaConfigFactory::SP schemaFactory); ~ConfigFactory(); std::shared_ptr getTypeRepo() const { return _repo; } DocumenttypesConfigSP getTypeCfg() const { return _typeCfg; } DocTypeVector getDocTypes() const { DocTypeVector types; _repo->forEachDocumentType([&types](const DocumentType &type) { types.push_back(DocTypeName(type.getName())); }); return types; } DocumentDBConfig::SP create(const DocTypeName &docTypeName) const { const DocumentType *docType = _repo->getDocumentType(docTypeName.getName()); if (docType == nullptr) { return DocumentDBConfig::SP(); } typedef DocumentDBConfig CS; CS::IndexschemaConfigSP indexschema = _schemaFactory->createIndexSchema(*docType); CS::AttributesConfigSP attributes = _schemaFactory->createAttributes(*docType); CS::SummaryConfigSP summary = _schemaFactory->createSummary(*docType); auto schema = DocumentDBConfig::build_schema(*attributes, *indexschema); return std::make_shared( 1, std::make_shared(), std::make_shared(), std::make_shared(), std::make_shared(), indexschema, attributes, summary, std::make_shared(), _typeCfg, _repo, std::make_shared(), std::make_shared(), schema, std::make_shared(), search::LogDocumentStore::Config(), ThreadingServiceConfig::make(), AllocConfig::makeDefault(), "client", docTypeName.getName()); } }; ConfigFactory::ConfigFactory(std::shared_ptr repo, DocumenttypesConfigSP typeCfg, SchemaConfigFactory::SP schemaFactory) : _repo(std::move(repo)), _typeCfg(std::move(typeCfg)), _schemaFactory(std::move(schemaFactory)) {} ConfigFactory::~ConfigFactory() = default; class DocumentDBFactory : public DummyDBOwner { private: vespalib::string _baseDir; DummyFileHeaderContext _fileHeaderContext; vespalib::string _tlsSpec; matching::QueryLimiter _queryLimiter; mutable DummyWireService _metricsWireService; mutable MemoryConfigStores _config_stores; vespalib::ThreadStackExecutor _summaryExecutor; MockSharedThreadingService _shared_service; TransLogServer _tls; static std::shared_ptr make_proton_config() { ProtonConfigBuilder proton_config; proton_config.indexing.optimize = ProtonConfigBuilder::Indexing::Optimize::LATENCY; return std::make_shared(proton_config); } public: DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort); ~DocumentDBFactory() override; DocumentDB::SP create(BucketSpace bucketSpace, const DocTypeName &docType, const ConfigFactory &factory) { DocumentDBConfig::SP snapshot = factory.create(docType); std::filesystem::create_directory(std::filesystem::path(_baseDir)); std::filesystem::create_directory(std::filesystem::path(_baseDir + "/" + docType.toString())); vespalib::string inputCfg = _baseDir + "/" + docType.toString() + "/baseconfig"; { FileConfigManager fileCfg(_shared_service.transport(), inputCfg, "", docType.getName()); fileCfg.saveConfig(*snapshot, 1); } config::DirSpec spec(inputCfg + "/config-1"); auto tuneFileDocDB = std::make_shared(); DocumentDBConfigHelper mgr(spec, docType.getName()); auto b = std::make_shared(1, factory.getTypeCfg(), factory.getTypeRepo(), make_proton_config(), std::make_shared(), std::make_shared(), tuneFileDocDB, HwInfo()); mgr.forwardConfig(b); mgr.nextGeneration(_shared_service.transport(), 0ms); return DocumentDB::create(_baseDir, mgr.getConfig(), _tlsSpec, _queryLimiter, docType, bucketSpace, *b->getProtonConfigSP(), const_cast(*this), _shared_service, _tls, _metricsWireService, _fileHeaderContext, std::make_shared(), _config_stores.getConfigStore(docType.toString()), std::make_shared(16, 128_Ki), HwInfo()); } }; DocumentDBFactory::DocumentDBFactory(const vespalib::string &baseDir, int tlsListenPort) : _baseDir(baseDir), _fileHeaderContext(), _tlsSpec(vespalib::make_string("tcp/localhost:%d", tlsListenPort)), _queryLimiter(), _metricsWireService(), _summaryExecutor(8, 128_Ki), _shared_service(_summaryExecutor, _summaryExecutor), _tls(_shared_service.transport(), "tls", tlsListenPort, baseDir, _fileHeaderContext) {} DocumentDBFactory::~DocumentDBFactory() = default; class DocumentDBRepo { private: DocumentDBMap _docDbs; public: typedef std::unique_ptr UP; DocumentDBRepo(const ConfigFactory &cfgFactory, DocumentDBFactory &docDbFactory) : _docDbs() { DocTypeVector types = cfgFactory.getDocTypes(); for (const auto & type : types) { BucketSpace bucketSpace(makeBucketSpace(type.getName())); DocumentDB::SP docDb = docDbFactory.create(bucketSpace, type, cfgFactory); docDb->start(); docDb->waitForOnlineState(); _docDbs[type] = docDb; } } void close() { for (auto & dbEntry : _docDbs) { dbEntry.second->close(); } } ~DocumentDBRepo() { close(); } const DocumentDBMap &getDocDbs() const { return _docDbs; } }; class DocDBRepoHolder { protected: DocumentDBRepo::UP _docDbRepo; explicit DocDBRepoHolder(DocumentDBRepo::UP docDbRepo) : _docDbRepo(std::move(docDbRepo)) { } virtual ~DocDBRepoHolder() = default; void close() { if (_docDbRepo) _docDbRepo->close(); } }; class MyPersistenceEngineOwner : public IPersistenceEngineOwner { void setClusterState(BucketSpace, const storage::spi::ClusterState &) override { } }; struct MyResourceWriteFilter : public IResourceWriteFilter { bool acceptWriteOperation() const override { return true; } State getAcceptState() const override { return IResourceWriteFilter::State(); } }; class MyPersistenceEngine : public DocDBRepoHolder, public PersistenceEngine { public: MyPersistenceEngine(MyPersistenceEngineOwner &owner, MyResourceWriteFilter &writeFilter, IDiskMemUsageNotifier& disk_mem_usage_notifier, DocumentDBRepo::UP docDbRepo, const vespalib::string &docType = "") : DocDBRepoHolder(std::move(docDbRepo)), PersistenceEngine(owner, writeFilter, disk_mem_usage_notifier, -1, false) { addHandlers(docType); } void addHandlers(const vespalib::string &docType) { if (!_docDbRepo) return; const DocumentDBMap &docDbs = _docDbRepo->getDocDbs(); for (const auto & dbEntry : docDbs) { if (!docType.empty() && docType != dbEntry.first.getName()) { continue; } LOG(info, "putHandler(%s)", dbEntry.first.toString().c_str()); auto proxy = std::make_shared(dbEntry.second); putHandler(getWLock(), dbEntry.second->getBucketSpace(), dbEntry.first, proxy); } } void removeHandlers() { if ( ! _docDbRepo) return; const DocumentDBMap &docDbs = _docDbRepo->getDocDbs(); for (const auto & dbEntry : docDbs) { IPersistenceHandler::SP proxy(removeHandler(getWLock(), dbEntry.second->getBucketSpace(), dbEntry.first)); (void) proxy; } } ~MyPersistenceEngine() override { destroyIterators(); removeHandlers(); // Block calls to document db from engine close(); // Block upcalls to engine from document db } }; class MyPersistenceFactory : public PersistenceFactory { private: vespalib::string _baseDir; DocumentDBFactory _docDbFactory; SchemaConfigFactory::SP _schemaFactory; DocumentDBRepo::UP _docDbRepo; vespalib::string _docType; MyPersistenceEngineOwner _engineOwner; MyResourceWriteFilter _writeFilter; test::DiskMemUsageNotifier _disk_mem_usage_notifier; public: MyPersistenceFactory(const vespalib::string &baseDir, int tlsListenPort, SchemaConfigFactory::SP schemaFactory, const vespalib::string & docType = "") : _baseDir(baseDir), _docDbFactory(baseDir, tlsListenPort), _schemaFactory(std::move(schemaFactory)), _docDbRepo(), _docType(docType), _engineOwner(), _writeFilter(), _disk_mem_usage_notifier(DiskMemUsageState({ 0.8, 0.5 }, { 0.8, 0.4 })) { clear(); } ~MyPersistenceFactory() override { clear(); } std::unique_ptr getPersistenceImplementation(const std::shared_ptr &repo, const DocumenttypesConfig &typesCfg) override { ConfigFactory cfgFactory(repo, std::make_shared(typesCfg), _schemaFactory); _docDbRepo = std::make_unique(cfgFactory, _docDbFactory); auto engine = std::make_unique(_engineOwner,_writeFilter, _disk_mem_usage_notifier, std::move(_docDbRepo), _docType); assert( ! _docDbRepo); // Repo should be handed over return engine; } void clear() override { std::filesystem::remove_all(std::filesystem::path(_baseDir)); } bool hasPersistence() const override { return true; } bool supportsActiveState() const override { return true; } bool supportsBucketSpaces() const override { return true; } }; std::unique_ptr makeMyPersistenceFactory(const std::string &docType) { return std::make_unique("testdb", 9017, SchemaConfigFactory::get(), docType); } int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); DummyFileHeaderContext::setCreator("persistenceconformance_test"); ConformanceTest::_factoryFactory = &makeMyPersistenceFactory; return RUN_ALL_TESTS(); }