diff options
author | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-25 15:55:27 +0200 |
---|---|---|
committer | Tor Egge <Tor.Egge@broadpark.no> | 2020-09-25 16:05:32 +0200 |
commit | 8cc41e7cbdb125698d7c05baf064dc40541a2892 (patch) | |
tree | 09554b8c883ce55fa186b80a003d3ec0c0984253 | |
parent | d1fa3efdc591999ef354b0c43f6ac5c5a7e27247 (diff) |
Add storage chain builder.
10 files changed, 135 insertions, 40 deletions
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt index 20b73987bf3..81c6486eaeb 100644 --- a/storage/src/vespa/storage/common/CMakeLists.txt +++ b/storage/src/vespa/storage/common/CMakeLists.txt @@ -15,5 +15,6 @@ vespa_add_library(storage_common OBJECT storagecomponent.cpp storagelink.cpp storagelinkqueued.cpp + storage_chain_builder.cpp DEPENDS ) diff --git a/storage/src/vespa/storage/common/i_storage_chain_builder.h b/storage/src/vespa/storage/common/i_storage_chain_builder.h new file mode 100644 index 00000000000..f50cc7572e9 --- /dev/null +++ b/storage/src/vespa/storage/common/i_storage_chain_builder.h @@ -0,0 +1,22 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include <memory> + +namespace storage { + +class StorageLink; + +/* + * Interface class for building a storage chain. + */ +class IStorageChainBuilder +{ +public: + virtual ~IStorageChainBuilder() = default; + virtual void add(std::unique_ptr<StorageLink> child) = 0; + virtual std::unique_ptr<StorageLink> build() && = 0; +}; + +} diff --git a/storage/src/vespa/storage/common/storage_chain_builder.cpp b/storage/src/vespa/storage/common/storage_chain_builder.cpp new file mode 100644 index 00000000000..45878f452cb --- /dev/null +++ b/storage/src/vespa/storage/common/storage_chain_builder.cpp @@ -0,0 +1,31 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#include "storage_chain_builder.h" +#include "storagelink.h" + +namespace storage { + +StorageChainBuilder::StorageChainBuilder() + : _top() +{ +} + +StorageChainBuilder::~StorageChainBuilder() = default; + +void +StorageChainBuilder::add(std::unique_ptr<StorageLink> link) +{ + if (_top) { + _top->push_back(std::move(link)); + } else { + _top = std::move(link); + } +}; + +std::unique_ptr<StorageLink> +StorageChainBuilder::build() && +{ + return std::move(_top); +} + +} diff --git a/storage/src/vespa/storage/common/storage_chain_builder.h b/storage/src/vespa/storage/common/storage_chain_builder.h new file mode 100644 index 00000000000..fc9ca7ec1c9 --- /dev/null +++ b/storage/src/vespa/storage/common/storage_chain_builder.h @@ -0,0 +1,23 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "i_storage_chain_builder.h" + +namespace storage { + +/** + * Class for building a storage chain. + */ +class StorageChainBuilder : public IStorageChainBuilder +{ +protected: + std::unique_ptr<StorageLink> _top; +public: + StorageChainBuilder(); + ~StorageChainBuilder() override; + void add(std::unique_ptr<StorageLink> child) override; + std::unique_ptr<StorageLink> build() && override; +}; + +} diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp index 1cd1477e769..3d1f9bbaf2e 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.cpp +++ b/storage/src/vespa/storage/storageserver/distributornode.cpp @@ -5,6 +5,7 @@ #include "communicationmanager.h" #include "opslogger.h" #include "statemanager.h" +#include <vespa/storage/common/i_storage_chain_builder.h> #include <vespa/storage/distributor/distributor.h> #include <vespa/storage/common/hostreporter/hostinfo.h> #include <vespa/vespalib/util/exceptions.h> @@ -84,34 +85,33 @@ DistributorNode::handleConfigChange(vespa::config::content::core::StorVisitordis _context.getComponentRegister().setVisitorConfig(c); } -StorageLink::UP -DistributorNode::createChain() +void +DistributorNode::createChain(IStorageChainBuilder &builder) { DistributorComponentRegister& dcr(_context.getComponentRegister()); // TODO: All components in this chain should use a common thread instead of // each having its own configfetcher. StorageLink::UP chain; if (_retrievedCommunicationManager.get()) { - chain = std::move(_retrievedCommunicationManager); + builder.add(std::move(_retrievedCommunicationManager)); } else { - chain.reset(_communicationManager - = new CommunicationManager(dcr, _configUri)); + auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri); + _communicationManager = communication_manager.get(); + builder.add(std::move(communication_manager)); } std::unique_ptr<StateManager> stateManager(releaseStateManager()); - chain->push_back(StorageLink::UP(new Bouncer(dcr, _configUri))); - chain->push_back(StorageLink::UP(new OpsLogger(dcr, _configUri))); + builder.add(std::make_unique<Bouncer>(dcr, _configUri)); + builder.add(std::make_unique<OpsLogger>(dcr, _configUri)); // Distributor instance registers a host info reporter with the state // manager, which is safe since the lifetime of said state manager // extends to the end of the process. - chain->push_back(StorageLink::UP( - new storage::distributor::Distributor( - dcr, *_threadPool, getDoneInitializeHandler(), - _manageActiveBucketCopies, - stateManager->getHostInfo()))); - - chain->push_back(StorageLink::UP(stateManager.release())); - return chain; + builder.add(std::make_unique<storage::distributor::Distributor> + (dcr, *_threadPool, getDoneInitializeHandler(), + _manageActiveBucketCopies, + stateManager->getHostInfo())); + + builder.add(std::move(stateManager)); } api::Timestamp diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h index 4db8876dc24..39614674bb5 100644 --- a/storage/src/vespa/storage/storageserver/distributornode.h +++ b/storage/src/vespa/storage/storageserver/distributornode.h @@ -49,7 +49,7 @@ public: private: void initializeNodeSpecific() override; - std::unique_ptr<StorageLink> createChain() override; + void createChain(IStorageChainBuilder &builder) override; api::Timestamp getUniqueTimestamp() override; /** diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp index b64f6db60fc..4f1db0e1b30 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp +++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp @@ -9,6 +9,7 @@ #include "statemanager.h" #include "priorityconverter.h" #include "service_layer_error_listener.h" +#include <vespa/storage/common/i_storage_chain_builder.h> #include <vespa/storage/visiting/messagebusvisitormessagesession.h> #include <vespa/storage/visiting/visitormanager.h> #include <vespa/storage/bucketdb/bucketmanager.h> @@ -220,43 +221,44 @@ ServiceLayerNode::toDocumentPriority(uint8_t storagePriority) const return _communicationManager->getPriorityConverter().toDocumentPriority(storagePriority); } -StorageLink::UP -ServiceLayerNode::createChain() +void +ServiceLayerNode::createChain(IStorageChainBuilder &builder) { ServiceLayerComponentRegister& compReg(_context.getComponentRegister()); - StorageLink::UP chain; - chain.reset(_communicationManager = new CommunicationManager(compReg, _configUri)); - chain->push_back(std::make_unique<Bouncer>(compReg, _configUri)); + auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri); + _communicationManager = communication_manager.get(); + builder.add(std::move(communication_manager)); + builder.add(std::make_unique<Bouncer>(compReg, _configUri)); if (_noUsablePartitionMode) { /* * No usable partitions. Use minimal chain. Still needs to be * able to report state back to cluster controller. */ - chain->push_back(releaseStateManager()); - return chain; + builder.add(releaseStateManager()); + return; } - chain->push_back(std::make_unique<OpsLogger>(compReg, _configUri)); - auto* merge_throttler = new MergeThrottler(_configUri, compReg); - chain->push_back(StorageLink::UP(merge_throttler)); - chain->push_back(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg)); - chain->push_back(std::make_unique<StorageBucketDBInitializer>( + builder.add(std::make_unique<OpsLogger>(compReg, _configUri)); + auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg); + auto merge_throttler = merge_throttler_up.get(); + builder.add(std::move(merge_throttler_up)); + builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg)); + builder.add(std::make_unique<StorageBucketDBInitializer>( _configUri, _partitions, getDoneInitializeHandler(), compReg)); - chain->push_back(std::make_unique<BucketManager>(_configUri, _context.getComponentRegister())); - chain->push_back(StorageLink::UP(new VisitorManager( - _configUri, _context.getComponentRegister(), *this, _externalVisitors))); - chain->push_back(std::make_unique<ModifiedBucketChecker>( + builder.add(std::make_unique<BucketManager>(_configUri, _context.getComponentRegister())); + builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(), static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors)); + builder.add(std::make_unique<ModifiedBucketChecker>( _context.getComponentRegister(), _persistenceProvider, _configUri)); - chain->push_back(StorageLink::UP(_fileStorManager = new FileStorManager( - _configUri, _partitions, _persistenceProvider, _context.getComponentRegister()))); - chain->push_back(releaseStateManager()); + auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _partitions, _persistenceProvider, _context.getComponentRegister()); + _fileStorManager = filstor_manager.get(); + builder.add(std::move(filstor_manager)); + builder.add(releaseStateManager()); // Lifetimes of all referenced components shall outlive the last call going // through the SPI, as queues are flushed and worker threads joined when // the storage link chain is closed prior to destruction. auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler); _fileStorManager->error_wrapper().register_error_listener(std::move(error_listener)); - return chain; } ResumeGuard diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h index 7c0d6cd8ee3..ad570202f5b 100644 --- a/storage/src/vespa/storage/storageserver/servicelayernode.h +++ b/storage/src/vespa/storage/storageserver/servicelayernode.h @@ -62,7 +62,7 @@ private: void handleLiveConfigUpdate(const InitialGuard & initGuard) override; VisitorMessageSession::UP createSession(Visitor&, VisitorThread&) override; documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override; - std::unique_ptr<StorageLink> createChain() override; + void createChain(IStorageChainBuilder &builder) override; void removeConfigSubscriptions() override; }; diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp index e962ee4b1b6..aa50391c037 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.cpp +++ b/storage/src/vespa/storage/storageserver/storagenode.cpp @@ -12,6 +12,7 @@ #include <vespa/storage/frameworkimpl/status/statuswebserver.h> #include <vespa/storage/frameworkimpl/thread/deadlockdetector.h> #include <vespa/storage/common/statusmetricconsumer.h> +#include <vespa/storage/common/storage_chain_builder.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/util/exceptions.h> #include <vespa/vespalib/util/time.h> @@ -111,7 +112,8 @@ StorageNode::StorageNode( _newBucketSpacesConfig(), _component(), _configUri(configUri), - _communicationManager(nullptr) + _communicationManager(nullptr), + _chain_builder(std::make_unique<StorageChainBuilder>()) { } @@ -203,7 +205,9 @@ StorageNode::initialize() _deadLockDetector->setWaitSlack(framework::MilliSecTime( static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000))); - _chain.reset(createChain().release()); + createChain(*_chain_builder); + _chain = std::move(*_chain_builder).build(); + _chain_builder.reset(); assert(_communicationManager != nullptr); _communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig); @@ -622,4 +626,10 @@ StorageNode::releaseStateManager() { return std::move(_stateManager); } +void +StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder) +{ + _chain_builder = std::move(builder); +} + } // storage diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h index ea1bf1027d4..91a2bae3190 100644 --- a/storage/src/vespa/storage/storageserver/storagenode.h +++ b/storage/src/vespa/storage/storageserver/storagenode.h @@ -44,6 +44,7 @@ struct DeadLockDetector; struct StorageMetricSet; struct StorageNodeContext; class ApplicationGenerationFetcher; +class IStorageChainBuilder; class StorageComponent; namespace lib { class NodeType; } @@ -164,6 +165,9 @@ protected: std::unique_ptr<StorageComponent> _component; config::ConfigUri _configUri; CommunicationManager* _communicationManager; +private: + std::unique_ptr<IStorageChainBuilder> _chain_builder; +protected: /** * Node subclasses currently need to explicitly acquire ownership of state @@ -177,10 +181,12 @@ protected: void initialize(); virtual void subscribeToConfigs(); virtual void initializeNodeSpecific() = 0; - virtual std::unique_ptr<StorageLink> createChain() = 0; + virtual void createChain(IStorageChainBuilder &builder) = 0; virtual void handleLiveConfigUpdate(const InitialGuard & initGuard); void shutdown(); virtual void removeConfigSubscriptions(); +public: + void set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder); }; } // storage |