diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-17 22:38:52 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-17 22:38:52 +0000 |
commit | f9fd396dd0dc8ec336ee011bab1c3728a1c25c54 (patch) | |
tree | 320cf8944c4da888cfb35131f89c37f531f57033 /storage | |
parent | c7610b3a6585ad09a170da9658909d2a157ef44e (diff) |
Wire in skeleton for implementation of the BucketExecutor interface.
Diffstat (limited to 'storage')
5 files changed, 72 insertions, 45 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.cpp b/storage/src/tests/persistence/common/filestortestfixture.cpp index 57c63747ece..b695fb508af 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.cpp +++ b/storage/src/tests/persistence/common/filestortestfixture.cpp @@ -37,8 +37,7 @@ void FileStorTestFixture::SetUp() { setupPersistenceThreads(1); - _node->setPersistenceProvider( - std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo())); + _node->setPersistenceProvider(std::make_unique<spi::dummy::DummyPersistence>(_node->getTypeRepo())); _node->getPersistenceProvider().initialize(); } @@ -64,8 +63,7 @@ FileStorTestFixture::createBucket(const document::BucketId& bid) bool FileStorTestFixture::bucketExistsInDb(const document::BucketId& bucket) const { - StorBucketDatabase::WrappedEntry entry( - _node->getStorageBucketDatabase().get(bucket, "bucketExistsInDb")); + StorBucketDatabase::WrappedEntry entry(_node->getStorageBucketDatabase().get(bucket, "bucketExistsInDb")); return entry.exist(); } @@ -73,13 +71,13 @@ FileStorTestFixture::TestFileStorComponents::TestFileStorComponents( FileStorTestFixture& fixture, const StorageLinkInjector& injector) : _fixture(fixture), - manager(new FileStorManager(fixture._config->getConfigId(), - fixture._node->getPersistenceProvider(), - fixture._node->getComponentRegister(), - *fixture._node)) + manager(nullptr) { injector.inject(top); - top.push_back(StorageLink::UP(manager)); + auto fsm = std::make_unique<FileStorManager>(fixture._config->getConfigId(), fixture._node->getPersistenceProvider(), + fixture._node->getComponentRegister(), *fixture._node); + manager = fsm.get(); + top.push_back(std::move(fsm)); top.open(); } @@ -91,8 +89,7 @@ FileStorTestFixture::makeSelfAddress() { } void -FileStorTestFixture::TestFileStorComponents::sendDummyGet( - const document::BucketId& bid) +FileStorTestFixture::TestFileStorComponents::sendDummyGet(const document::BucketId& bid) { std::ostringstream id; id << "id:foo:testdoctype1:n=" << bid.getId() << ":0"; @@ -103,14 +100,12 @@ FileStorTestFixture::TestFileStorComponents::sendDummyGet( } void -FileStorTestFixture::TestFileStorComponents::sendDummyGetDiff( - const document::BucketId& bid) +FileStorTestFixture::TestFileStorComponents::sendDummyGetDiff(const document::BucketId& bid) { std::vector<api::GetBucketDiffCommand::Node> nodes; nodes.push_back(0); nodes.push_back(1); - std::shared_ptr<api::GetBucketDiffCommand> cmd( - new api::GetBucketDiffCommand(makeDocumentBucket(bid), nodes, 12345)); + auto cmd = std::make_shared<api::GetBucketDiffCommand>(makeDocumentBucket(bid), nodes, 12345); cmd->setAddress(makeSelfAddress()); cmd->setPriority(255); top.sendDown(cmd); @@ -124,10 +119,8 @@ FileStorTestFixture::TestFileStorComponents::sendPut( { std::ostringstream id; id << "id:foo:testdoctype1:n=" << bid.getId() << ":" << docIdx; - document::Document::SP doc( - _fixture._node->getTestDocMan().createDocument("foobar", id.str())); - std::shared_ptr<api::PutCommand> cmd( - new api::PutCommand(makeDocumentBucket(bid), doc, timestamp)); + document::Document::SP doc(_fixture._node->getTestDocMan().createDocument("foobar", id.str())); + auto cmd = std::make_shared<api::PutCommand>(makeDocumentBucket(bid), doc, timestamp); cmd->setAddress(makeSelfAddress()); top.sendDown(cmd); } @@ -135,9 +128,7 @@ FileStorTestFixture::TestFileStorComponents::sendPut( void FileStorTestFixture::setClusterState(const std::string& state) { - _node->getStateUpdater().setClusterState( - lib::ClusterState::CSP(new lib::ClusterState(state))); + _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>(state)); } - } // ns storage diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 3bd92d31ca7..cae24c5c6ce 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -238,14 +238,14 @@ struct TestFileStorComponents { DummyStorageLink top; FileStorManager* manager; - explicit TestFileStorComponents(FileStorTestBase& test, - bool use_small_config = false) - : manager(new FileStorManager((use_small_config ? test.smallConfig : test.config)->getConfigId(), - test._node->getPersistenceProvider(), - test._node->getComponentRegister(), - *test._node)) + explicit TestFileStorComponents(FileStorTestBase& test, bool use_small_config = false) + : manager(nullptr) { - top.push_back(unique_ptr<StorageLink>(manager)); + auto fsm = std::make_unique<FileStorManager>((use_small_config ? test.smallConfig : test.config)->getConfigId(), + test._node->getPersistenceProvider(), + test._node->getComponentRegister(), *test._node); + manager = fsm.get(); + top.push_back(std::move(fsm)); top.open(); } }; diff --git a/storage/src/tests/visiting/visitormanagertest.cpp b/storage/src/tests/visiting/visitormanagertest.cpp index d5c558c5a0d..6ad876d0d77 100644 --- a/storage/src/tests/visiting/visitormanagertest.cpp +++ b/storage/src/tests/visiting/visitormanagertest.cpp @@ -83,21 +83,16 @@ VisitorManagerTest::initializeTest() vdstestlib::DirConfig config(getStandardConfig(true)); config.getConfig("stor-visitor").set("visitorthreads", "1"); - _messageSessionFactory.reset( - new TestVisitorMessageSessionFactory(config.getConfigId())); - _node.reset( - new TestServiceLayerApp(config.getConfigId())); + _messageSessionFactory = std::make_unique<TestVisitorMessageSessionFactory>(config.getConfigId()); + _node = std::make_unique<TestServiceLayerApp>(config.getConfigId()); _node->setupDummyPersistence(); - _node->getStateUpdater().setClusterState( - lib::ClusterState::CSP( - new lib::ClusterState("storage:1 distributor:1"))); - _top.reset(new DummyStorageLink()); - _top->push_back(std::unique_ptr<StorageLink>(_manager - = new VisitorManager( - config.getConfigId(), _node->getComponentRegister(), - *_messageSessionFactory))); - _top->push_back(std::unique_ptr<StorageLink>(new FileStorManager( - config.getConfigId(), _node->getPersistenceProvider(), _node->getComponentRegister(), *_node))); + _node->getStateUpdater().setClusterState(std::make_shared<lib::ClusterState>("storage:1 distributor:1")); + _top = std::make_unique<DummyStorageLink>(); + auto vm = std::make_unique<VisitorManager>(config.getConfigId(), _node->getComponentRegister(), *_messageSessionFactory); + _manager = vm.get(); + _top->push_back(std::move(vm)); + _top->push_back(std::make_unique<FileStorManager>(config.getConfigId(), _node->getPersistenceProvider(), + _node->getComponentRegister(), *_node)); _manager->setTimeBetweenTicks(10); _top->open(); diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index f378e750f62..9c7ae586def 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -20,6 +20,7 @@ #include <vespa/storageapi/message/stat.h> #include <vespa/vespalib/stllike/asciistream.h> #include <vespa/vespalib/util/stringfmt.h> +#include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/sequencedtaskexecutor.h> #include <thread> @@ -32,10 +33,30 @@ using vespalib::make_string_short::fmt; namespace { -VESPA_THREAD_STACK_TAG(response_executor) + VESPA_THREAD_STACK_TAG(response_executor) } + namespace storage { +namespace { + +class BucketExecutorWrapper : public spi::BucketExecutor { +public: + BucketExecutorWrapper(spi::BucketExecutor & executor) noexcept : _executor(executor) { } + + std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override { + return _executor.execute(bucket, std::move(task)); + } + + void sync() override { + _executor.sync(); + } + +private: + spi::BucketExecutor & _executor; +}; + +} FileStorManager:: FileStorManager(const config::ConfigUri & configUri, spi::PersistenceProvider& provider, @@ -183,6 +204,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC _threads.push_back(std::make_unique<PersistenceThread>(createRegisteredHandler(_component), *_filestorHandler, i % numStripes, _component)); } + _bucketExecutorRegistration = _provider->register_executor(std::make_shared<BucketExecutorWrapper>(*this)); } } @@ -946,4 +968,14 @@ void FileStorManager::initialize_bucket_databases_from_provider() { _init_handler.notifyDoneInitializing(); } +std::unique_ptr<spi::BucketTask> +FileStorManager::execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) { + (void) bucket; + return task; +} + +void +FileStorManager::sync() { +} + } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h index b757227e27d..451059f7d3a 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.h @@ -12,6 +12,7 @@ #include <vespa/vespalib/util/document_runnable.h> #include <vespa/vespalib/util/isequencedtaskexecutor.h> #include <vespa/document/bucket/bucketid.h> +#include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/storage/bucketdb/storbucketdb.h> #include <vespa/storage/common/messagesender.h> #include <vespa/storage/common/servicelayercomponent.h> @@ -27,6 +28,8 @@ #include <vespa/config/helper/ifetchercallback.h> #include <vespa/config/config.h> +namespace vespalib { class IDestructorCallback; } + namespace storage { namespace api { class ReturnCode; @@ -48,7 +51,8 @@ class FileStorManager : public StorageLinkQueued, public framework::HtmlStatusReporter, public StateListener, private config::IFetcherCallback<vespa::config::content::StorFilestorConfig>, - public MessageSender + public MessageSender, + public spi::BucketExecutor { ServiceLayerComponentRegister & _compReg; ServiceLayerComponent _component; @@ -66,8 +70,10 @@ class FileStorManager : public StorageLinkQueued, std::shared_ptr<FileStorMetrics> _metrics; std::unique_ptr<FileStorHandler> _filestorHandler; std::unique_ptr<vespalib::ISequencedTaskExecutor> _sequencedExecutor; + bool _closed; std::mutex _lock; + std::unique_ptr<vespalib::IDestructorCallback> _bucketExecutorRegistration; public: FileStorManager(const config::ConfigUri &, spi::PersistenceProvider&, @@ -163,6 +169,9 @@ private: void updateState(); void propagateClusterStates(); void update_reported_state_after_db_init(); + + std::unique_ptr<spi::BucketTask> execute(const spi::Bucket &bucket, std::unique_ptr<spi::BucketTask> task) override; + void sync() override; }; } // storage |