summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Egge <Tor.Egge@broadpark.no>2020-09-25 15:55:27 +0200
committerTor Egge <Tor.Egge@broadpark.no>2020-09-25 16:05:32 +0200
commit8cc41e7cbdb125698d7c05baf064dc40541a2892 (patch)
tree09554b8c883ce55fa186b80a003d3ec0c0984253
parentd1fa3efdc591999ef354b0c43f6ac5c5a7e27247 (diff)
Add storage chain builder.
-rw-r--r--storage/src/vespa/storage/common/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/common/i_storage_chain_builder.h22
-rw-r--r--storage/src/vespa/storage/common/storage_chain_builder.cpp31
-rw-r--r--storage/src/vespa/storage/common/storage_chain_builder.h23
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.cpp30
-rw-r--r--storage/src/vespa/storage/storageserver/distributornode.h2
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.cpp42
-rw-r--r--storage/src/vespa/storage/storageserver/servicelayernode.h2
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.cpp14
-rw-r--r--storage/src/vespa/storage/storageserver/storagenode.h8
10 files changed, 135 insertions, 40 deletions
diff --git a/storage/src/vespa/storage/common/CMakeLists.txt b/storage/src/vespa/storage/common/CMakeLists.txt
index 20b73987bf3..81c6486eaeb 100644
--- a/storage/src/vespa/storage/common/CMakeLists.txt
+++ b/storage/src/vespa/storage/common/CMakeLists.txt
@@ -15,5 +15,6 @@ vespa_add_library(storage_common OBJECT
storagecomponent.cpp
storagelink.cpp
storagelinkqueued.cpp
+ storage_chain_builder.cpp
DEPENDS
)
diff --git a/storage/src/vespa/storage/common/i_storage_chain_builder.h b/storage/src/vespa/storage/common/i_storage_chain_builder.h
new file mode 100644
index 00000000000..f50cc7572e9
--- /dev/null
+++ b/storage/src/vespa/storage/common/i_storage_chain_builder.h
@@ -0,0 +1,22 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include <memory>
+
+namespace storage {
+
+class StorageLink;
+
+/*
+ * Interface class for building a storage chain.
+ */
+class IStorageChainBuilder
+{
+public:
+ virtual ~IStorageChainBuilder() = default;
+ virtual void add(std::unique_ptr<StorageLink> child) = 0;
+ virtual std::unique_ptr<StorageLink> build() && = 0;
+};
+
+}
diff --git a/storage/src/vespa/storage/common/storage_chain_builder.cpp b/storage/src/vespa/storage/common/storage_chain_builder.cpp
new file mode 100644
index 00000000000..45878f452cb
--- /dev/null
+++ b/storage/src/vespa/storage/common/storage_chain_builder.cpp
@@ -0,0 +1,31 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#include "storage_chain_builder.h"
+#include "storagelink.h"
+
+namespace storage {
+
+StorageChainBuilder::StorageChainBuilder()
+ : _top()
+{
+}
+
+StorageChainBuilder::~StorageChainBuilder() = default;
+
+void
+StorageChainBuilder::add(std::unique_ptr<StorageLink> link)
+{
+ if (_top) {
+ _top->push_back(std::move(link));
+ } else {
+ _top = std::move(link);
+ }
+};
+
+std::unique_ptr<StorageLink>
+StorageChainBuilder::build() &&
+{
+ return std::move(_top);
+}
+
+}
diff --git a/storage/src/vespa/storage/common/storage_chain_builder.h b/storage/src/vespa/storage/common/storage_chain_builder.h
new file mode 100644
index 00000000000..fc9ca7ec1c9
--- /dev/null
+++ b/storage/src/vespa/storage/common/storage_chain_builder.h
@@ -0,0 +1,23 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "i_storage_chain_builder.h"
+
+namespace storage {
+
+/**
+ * Class for building a storage chain.
+ */
+class StorageChainBuilder : public IStorageChainBuilder
+{
+protected:
+ std::unique_ptr<StorageLink> _top;
+public:
+ StorageChainBuilder();
+ ~StorageChainBuilder() override;
+ void add(std::unique_ptr<StorageLink> child) override;
+ std::unique_ptr<StorageLink> build() && override;
+};
+
+}
diff --git a/storage/src/vespa/storage/storageserver/distributornode.cpp b/storage/src/vespa/storage/storageserver/distributornode.cpp
index 1cd1477e769..3d1f9bbaf2e 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.cpp
+++ b/storage/src/vespa/storage/storageserver/distributornode.cpp
@@ -5,6 +5,7 @@
#include "communicationmanager.h"
#include "opslogger.h"
#include "statemanager.h"
+#include <vespa/storage/common/i_storage_chain_builder.h>
#include <vespa/storage/distributor/distributor.h>
#include <vespa/storage/common/hostreporter/hostinfo.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -84,34 +85,33 @@ DistributorNode::handleConfigChange(vespa::config::content::core::StorVisitordis
_context.getComponentRegister().setVisitorConfig(c);
}
-StorageLink::UP
-DistributorNode::createChain()
+void
+DistributorNode::createChain(IStorageChainBuilder &builder)
{
DistributorComponentRegister& dcr(_context.getComponentRegister());
// TODO: All components in this chain should use a common thread instead of
// each having its own configfetcher.
StorageLink::UP chain;
if (_retrievedCommunicationManager.get()) {
- chain = std::move(_retrievedCommunicationManager);
+ builder.add(std::move(_retrievedCommunicationManager));
} else {
- chain.reset(_communicationManager
- = new CommunicationManager(dcr, _configUri));
+ auto communication_manager = std::make_unique<CommunicationManager>(dcr, _configUri);
+ _communicationManager = communication_manager.get();
+ builder.add(std::move(communication_manager));
}
std::unique_ptr<StateManager> stateManager(releaseStateManager());
- chain->push_back(StorageLink::UP(new Bouncer(dcr, _configUri)));
- chain->push_back(StorageLink::UP(new OpsLogger(dcr, _configUri)));
+ builder.add(std::make_unique<Bouncer>(dcr, _configUri));
+ builder.add(std::make_unique<OpsLogger>(dcr, _configUri));
// Distributor instance registers a host info reporter with the state
// manager, which is safe since the lifetime of said state manager
// extends to the end of the process.
- chain->push_back(StorageLink::UP(
- new storage::distributor::Distributor(
- dcr, *_threadPool, getDoneInitializeHandler(),
- _manageActiveBucketCopies,
- stateManager->getHostInfo())));
-
- chain->push_back(StorageLink::UP(stateManager.release()));
- return chain;
+ builder.add(std::make_unique<storage::distributor::Distributor>
+ (dcr, *_threadPool, getDoneInitializeHandler(),
+ _manageActiveBucketCopies,
+ stateManager->getHostInfo()));
+
+ builder.add(std::move(stateManager));
}
api::Timestamp
diff --git a/storage/src/vespa/storage/storageserver/distributornode.h b/storage/src/vespa/storage/storageserver/distributornode.h
index 4db8876dc24..39614674bb5 100644
--- a/storage/src/vespa/storage/storageserver/distributornode.h
+++ b/storage/src/vespa/storage/storageserver/distributornode.h
@@ -49,7 +49,7 @@ public:
private:
void initializeNodeSpecific() override;
- std::unique_ptr<StorageLink> createChain() override;
+ void createChain(IStorageChainBuilder &builder) override;
api::Timestamp getUniqueTimestamp() override;
/**
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.cpp b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
index b64f6db60fc..4f1db0e1b30 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.cpp
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.cpp
@@ -9,6 +9,7 @@
#include "statemanager.h"
#include "priorityconverter.h"
#include "service_layer_error_listener.h"
+#include <vespa/storage/common/i_storage_chain_builder.h>
#include <vespa/storage/visiting/messagebusvisitormessagesession.h>
#include <vespa/storage/visiting/visitormanager.h>
#include <vespa/storage/bucketdb/bucketmanager.h>
@@ -220,43 +221,44 @@ ServiceLayerNode::toDocumentPriority(uint8_t storagePriority) const
return _communicationManager->getPriorityConverter().toDocumentPriority(storagePriority);
}
-StorageLink::UP
-ServiceLayerNode::createChain()
+void
+ServiceLayerNode::createChain(IStorageChainBuilder &builder)
{
ServiceLayerComponentRegister& compReg(_context.getComponentRegister());
- StorageLink::UP chain;
- chain.reset(_communicationManager = new CommunicationManager(compReg, _configUri));
- chain->push_back(std::make_unique<Bouncer>(compReg, _configUri));
+ auto communication_manager = std::make_unique<CommunicationManager>(compReg, _configUri);
+ _communicationManager = communication_manager.get();
+ builder.add(std::move(communication_manager));
+ builder.add(std::make_unique<Bouncer>(compReg, _configUri));
if (_noUsablePartitionMode) {
/*
* No usable partitions. Use minimal chain. Still needs to be
* able to report state back to cluster controller.
*/
- chain->push_back(releaseStateManager());
- return chain;
+ builder.add(releaseStateManager());
+ return;
}
- chain->push_back(std::make_unique<OpsLogger>(compReg, _configUri));
- auto* merge_throttler = new MergeThrottler(_configUri, compReg);
- chain->push_back(StorageLink::UP(merge_throttler));
- chain->push_back(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg));
- chain->push_back(std::make_unique<StorageBucketDBInitializer>(
+ builder.add(std::make_unique<OpsLogger>(compReg, _configUri));
+ auto merge_throttler_up = std::make_unique<MergeThrottler>(_configUri, compReg);
+ auto merge_throttler = merge_throttler_up.get();
+ builder.add(std::move(merge_throttler_up));
+ builder.add(std::make_unique<ChangedBucketOwnershipHandler>(_configUri, compReg));
+ builder.add(std::make_unique<StorageBucketDBInitializer>(
_configUri, _partitions, getDoneInitializeHandler(), compReg));
- chain->push_back(std::make_unique<BucketManager>(_configUri, _context.getComponentRegister()));
- chain->push_back(StorageLink::UP(new VisitorManager(
- _configUri, _context.getComponentRegister(), *this, _externalVisitors)));
- chain->push_back(std::make_unique<ModifiedBucketChecker>(
+ builder.add(std::make_unique<BucketManager>(_configUri, _context.getComponentRegister()));
+ builder.add(std::make_unique<VisitorManager>(_configUri, _context.getComponentRegister(), static_cast<VisitorMessageSessionFactory &>(*this), _externalVisitors));
+ builder.add(std::make_unique<ModifiedBucketChecker>(
_context.getComponentRegister(), _persistenceProvider, _configUri));
- chain->push_back(StorageLink::UP(_fileStorManager = new FileStorManager(
- _configUri, _partitions, _persistenceProvider, _context.getComponentRegister())));
- chain->push_back(releaseStateManager());
+ auto filstor_manager = std::make_unique<FileStorManager>(_configUri, _partitions, _persistenceProvider, _context.getComponentRegister());
+ _fileStorManager = filstor_manager.get();
+ builder.add(std::move(filstor_manager));
+ builder.add(releaseStateManager());
// Lifetimes of all referenced components shall outlive the last call going
// through the SPI, as queues are flushed and worker threads joined when
// the storage link chain is closed prior to destruction.
auto error_listener = std::make_shared<ServiceLayerErrorListener>(*_component, *merge_throttler);
_fileStorManager->error_wrapper().register_error_listener(std::move(error_listener));
- return chain;
}
ResumeGuard
diff --git a/storage/src/vespa/storage/storageserver/servicelayernode.h b/storage/src/vespa/storage/storageserver/servicelayernode.h
index 7c0d6cd8ee3..ad570202f5b 100644
--- a/storage/src/vespa/storage/storageserver/servicelayernode.h
+++ b/storage/src/vespa/storage/storageserver/servicelayernode.h
@@ -62,7 +62,7 @@ private:
void handleLiveConfigUpdate(const InitialGuard & initGuard) override;
VisitorMessageSession::UP createSession(Visitor&, VisitorThread&) override;
documentapi::Priority::Value toDocumentPriority(uint8_t storagePriority) const override;
- std::unique_ptr<StorageLink> createChain() override;
+ void createChain(IStorageChainBuilder &builder) override;
void removeConfigSubscriptions() override;
};
diff --git a/storage/src/vespa/storage/storageserver/storagenode.cpp b/storage/src/vespa/storage/storageserver/storagenode.cpp
index e962ee4b1b6..aa50391c037 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.cpp
+++ b/storage/src/vespa/storage/storageserver/storagenode.cpp
@@ -12,6 +12,7 @@
#include <vespa/storage/frameworkimpl/status/statuswebserver.h>
#include <vespa/storage/frameworkimpl/thread/deadlockdetector.h>
#include <vespa/storage/common/statusmetricconsumer.h>
+#include <vespa/storage/common/storage_chain_builder.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/util/exceptions.h>
#include <vespa/vespalib/util/time.h>
@@ -111,7 +112,8 @@ StorageNode::StorageNode(
_newBucketSpacesConfig(),
_component(),
_configUri(configUri),
- _communicationManager(nullptr)
+ _communicationManager(nullptr),
+ _chain_builder(std::make_unique<StorageChainBuilder>())
{
}
@@ -203,7 +205,9 @@ StorageNode::initialize()
_deadLockDetector->setWaitSlack(framework::MilliSecTime(
static_cast<uint32_t>(_serverConfig->deadLockDetectorTimeoutSlack * 1000)));
- _chain.reset(createChain().release());
+ createChain(*_chain_builder);
+ _chain = std::move(*_chain_builder).build();
+ _chain_builder.reset();
assert(_communicationManager != nullptr);
_communicationManager->updateBucketSpacesConfig(*_bucketSpacesConfig);
@@ -622,4 +626,10 @@ StorageNode::releaseStateManager() {
return std::move(_stateManager);
}
+void
+StorageNode::set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder)
+{
+ _chain_builder = std::move(builder);
+}
+
} // storage
diff --git a/storage/src/vespa/storage/storageserver/storagenode.h b/storage/src/vespa/storage/storageserver/storagenode.h
index ea1bf1027d4..91a2bae3190 100644
--- a/storage/src/vespa/storage/storageserver/storagenode.h
+++ b/storage/src/vespa/storage/storageserver/storagenode.h
@@ -44,6 +44,7 @@ struct DeadLockDetector;
struct StorageMetricSet;
struct StorageNodeContext;
class ApplicationGenerationFetcher;
+class IStorageChainBuilder;
class StorageComponent;
namespace lib { class NodeType; }
@@ -164,6 +165,9 @@ protected:
std::unique_ptr<StorageComponent> _component;
config::ConfigUri _configUri;
CommunicationManager* _communicationManager;
+private:
+ std::unique_ptr<IStorageChainBuilder> _chain_builder;
+protected:
/**
* Node subclasses currently need to explicitly acquire ownership of state
@@ -177,10 +181,12 @@ protected:
void initialize();
virtual void subscribeToConfigs();
virtual void initializeNodeSpecific() = 0;
- virtual std::unique_ptr<StorageLink> createChain() = 0;
+ virtual void createChain(IStorageChainBuilder &builder) = 0;
virtual void handleLiveConfigUpdate(const InitialGuard & initGuard);
void shutdown();
virtual void removeConfigSubscriptions();
+public:
+ void set_storage_chain_builder(std::unique_ptr<IStorageChainBuilder> builder);
};
} // storage