diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-10-18 13:03:43 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-18 13:03:43 +0200 |
commit | 3d89f85313a0a8da5d8ad0e435b1877a46e6ecc3 (patch) | |
tree | 9b24a3f06f26c0c30f390994335b3136c4af8d65 | |
parent | 26928636364b427ee20c6e3171bcf74255f40e40 (diff) | |
parent | 1ad100b4d84db5438c470de26f6687e231aec4bd (diff) |
Merge pull request #14931 from vespa-engine/geirst/file-stor-handler-interface
Change FileStorHandler into an interface that FileStorHandlerImpl imp…
10 files changed, 137 insertions, 282 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h index 85cec16a546..548dd4f3bfd 100644 --- a/storage/src/tests/persistence/common/filestortestfixture.h +++ b/storage/src/tests/persistence/common/filestortestfixture.h @@ -1,13 +1,13 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include <tests/common/dummystoragelink.h> #include <tests/common/testhelper.h> +#include <tests/common/teststorageapp.h> #include <vespa/persistence/spi/persistenceprovider.h> +#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> #include <vespa/storage/persistence/filestorage/filestormanager.h> #include <vespa/storageapi/message/persistence.h> -#include <tests/common/dummystoragelink.h> -#include <tests/common/teststorageapp.h> -#include <tests/common/testhelper.h> #include <vespa/vespalib/gtest/gtest.h> namespace storage { diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 46e865a1ddd..257e943e5ab 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -1,26 +1,27 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include <tests/common/testhelper.h> #include <tests/common/dummystoragelink.h> +#include <tests/common/testhelper.h> #include <tests/common/teststorageapp.h> #include <tests/persistence/filestorage/forwardingmessagesender.h> +#include <vespa/config/common/exceptions.h> +#include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/repo/documenttyperepo.h> +#include <vespa/document/select/parser.h> #include <vespa/document/test/make_document_bucket.h> -#include <vespa/document/fieldset/fieldsets.h> -#include <vespa/storage/storageserver/statemanager.h> -#include <vespa/storage/bucketdb/bucketmanager.h> -#include <vespa/storage/persistence/persistencethread.h> -#include <vespa/storage/persistence/filestorage/filestormanager.h> -#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/update/documentupdate.h> -#include <vespa/document/select/parser.h> -#include <vespa/vdslib/state/random.h> -#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/fastos/file.h> #include <vespa/persistence/dummyimpl/dummypersistence.h> #include <vespa/persistence/spi/test.h> -#include <vespa/config/common/exceptions.h> -#include <vespa/fastos/file.h> +#include <vespa/storage/bucketdb/bucketmanager.h> +#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> +#include <vespa/storage/persistence/filestorage/filestormanager.h> +#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h> +#include <vespa/storage/persistence/persistencethread.h> +#include <vespa/storage/storageserver/statemanager.h> +#include <vespa/storageapi/message/bucketsplitting.h> +#include <vespa/vdslib/state/random.h> #include <vespa/vespalib/gtest/gtest.h> #include <atomic> #include <thread> @@ -396,7 +397,7 @@ TEST_F(FileStorManagerTest, handler_priority) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); uint32_t stripeId = filestorHandler.getNextStripeId(); ASSERT_EQ(0u, stripeId); @@ -504,7 +505,7 @@ TEST_F(FileStorManagerTest, handler_paused_multi_thread) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); std::string content("Here is some content which is in all documents"); @@ -550,7 +551,7 @@ TEST_F(FileStorManagerTest, handler_pause) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); uint32_t stripeId = filestorHandler.getNextStripeId(); @@ -596,7 +597,7 @@ TEST_F(FileStorManagerTest, remap_split) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); std::string content("Here is some content which is in all documents"); @@ -654,7 +655,7 @@ TEST_F(FileStorManagerTest, handler_timeout) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); filestorHandler.setGetNextMessageTimeout(50ms); uint32_t stripeId = filestorHandler.getNextStripeId(); @@ -714,7 +715,7 @@ TEST_F(FileStorManagerTest, priority) { FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 2); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0])); @@ -794,7 +795,7 @@ TEST_F(FileStorManagerTest, split1) { documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0])); @@ -934,7 +935,7 @@ TEST_F(FileStorManagerTest, split_single_group) { documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0)); for (uint32_t j=0; j<1; ++j) { // Test this twice, once where all the data ends up in file with @@ -1046,7 +1047,7 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) { documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0])); @@ -1111,7 +1112,7 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) { documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0])); @@ -1152,7 +1153,7 @@ TEST_F(FileStorManagerTest, join) { documentapi::LoadTypeSet loadTypes("raw:"); FileStorMetrics metrics(loadTypes.getMetricLoadTypes()); metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister()); + FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister()); std::unique_ptr<DiskThread> thread(createThread( *config, *_node, _node->getPersistenceProvider(), filestorHandler, *metrics.disks[0]->threads[0])); diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp index 7e54b45f96a..4737809a926 100644 --- a/storage/src/tests/persistence/persistencequeuetest.cpp +++ b/storage/src/tests/persistence/persistencequeuetest.cpp @@ -49,8 +49,8 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_) metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1); - filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics, - parent._node->getComponentRegister()); + filestorHandler = std::make_unique<FileStorHandlerImpl>(messageSender, metrics, + parent._node->getComponentRegister()); // getNextMessage will time out if no unlocked buckets are present. Choose a timeout // that is large enough to fail tests with high probability if this is not the case, // and small enough to not slow down testing too much. diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp index 6eb7fb7fd04..f6ba71940a8 100644 --- a/storage/src/tests/persistence/persistencetestutils.cpp +++ b/storage/src/tests/persistence/persistencetestutils.cpp @@ -2,13 +2,14 @@ #include "persistencetestutils.h" #include <vespa/document/datatype/documenttype.h> -#include <vespa/storageapi/message/persistence.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> -#include <vespa/persistence/dummyimpl/dummypersistence.h> -#include <vespa/persistence/spi/test.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/update/documentupdate.h> +#include <vespa/persistence/dummyimpl/dummypersistence.h> +#include <vespa/persistence/spi/test.h> +#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h> +#include <vespa/storageapi/message/persistence.h> #include <vespa/vespalib/io/fileutil.h> #include <vespa/vespalib/objects/nbostream.h> #include <vespa/vespalib/util/exceptions.h> @@ -54,7 +55,7 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(const std::string & rootO { _node.setupDummyPersistence(); _metrics.initDiskMetrics(1, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1); - _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, _node.getComponentRegister()); + _handler = std::make_unique<FileStorHandlerImpl>(_messageKeeper, _metrics, _node.getComponentRegister()); _diskEnv = std::make_unique<PersistenceUtil>(_config.getConfigId(), _component, *_handler, *_metrics.disks[0]->threads[0], _node.getPersistenceProvider()); } diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt index 182bf04b1cf..61cfcbcbfb8 100644 --- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt +++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt @@ -1,7 +1,6 @@ # Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_filestorpersistence OBJECT SOURCES - filestorhandler.cpp filestorhandlerimpl.cpp filestormanager.cpp filestormetrics.cpp diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp deleted file mode 100644 index af863f82d6f..00000000000 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. -#include "filestorhandler.h" -#include "filestorhandlerimpl.h" - -namespace storage { - -FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics, - ServiceLayerComponentRegister& compReg) - : _impl(std::make_unique<FileStorHandlerImpl>(1, 1, sender, metrics, compReg)) -{ -} - - -FileStorHandler::FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, - ServiceLayerComponentRegister& compReg) - : _impl(std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, sender, metrics, compReg)) -{ -} - -FileStorHandler::~FileStorHandler() = default; -void -FileStorHandler::flush(bool flushMerges) -{ - _impl->flush(flushMerges); -} - -void -FileStorHandler::setDiskState(DiskState state) -{ - _impl->setDiskState(state); -} - -FileStorHandler::DiskState -FileStorHandler::getDiskState() -{ - return _impl->getDiskState(); -} - -void -FileStorHandler::close() -{ - _impl->close(); -} - -ResumeGuard -FileStorHandler::pause() -{ - return _impl->pause(); -} - -bool -FileStorHandler::schedule(const api::StorageMessage::SP& msg) -{ - return _impl->schedule(msg); -} - -FileStorHandler::LockedMessage -FileStorHandler::getNextMessage(uint32_t stripeId) -{ - return _impl->getNextMessage(stripeId); -} - -FileStorHandler::BucketLockInterface::SP -FileStorHandler::lock(const document::Bucket& bucket, api::LockingRequirements lockReq) -{ - return _impl->lock(bucket, lockReq); -} - -void -FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket) -{ - RemapInfo target(bucket); - _impl->remapQueue(RemapInfo(bucket), target, FileStorHandlerImpl::MOVE); -} - -void -FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target) -{ - _impl->remapQueue(source, target, FileStorHandlerImpl::JOIN); -} - -void -FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2) -{ - _impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT); -} - -void -FileStorHandler::failOperations(const document::Bucket &bucket, const api::ReturnCode& err) -{ - _impl->failOperations(bucket, err); -} - -void -FileStorHandler::sendCommand(const api::StorageCommand::SP& msg) -{ - _impl->sendCommand(msg); -} - -void -FileStorHandler::sendReply(const api::StorageReply::SP& msg) -{ - _impl->sendReply(msg); -} - -void -FileStorHandler::sendReplyDirectly(const api::StorageReply::SP& msg) -{ - _impl->sendReplyDirectly(msg); -} - -void -FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const -{ - _impl->getStatus(out, path); -} - -uint32_t -FileStorHandler::getQueueSize() const -{ - return _impl->getQueueSize(); -} - -void -FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms) -{ - return _impl->addMergeStatus(bucket, ms); -} - -MergeStatus& -FileStorHandler::editMergeStatus(const document::Bucket& bucket) -{ - return _impl->editMergeStatus(bucket); -} - -bool -FileStorHandler::isMerging(const document::Bucket& bucket) const -{ - return _impl->isMerging(bucket); -} - -uint32_t -FileStorHandler::getNumActiveMerges() const -{ - return _impl->getNumActiveMerges(); -} - -uint32_t -FileStorHandler::getNextStripeId() { - return _impl->getNextStripeId(); -} - -void -FileStorHandler::clearMergeStatus(const document::Bucket& bucket, - const api::ReturnCode& code) -{ - return _impl->clearMergeStatus(bucket, &code); -} - -void -FileStorHandler::clearMergeStatus(const document::Bucket& bucket) -{ - return _impl->clearMergeStatus(bucket, 0); -} - -void -FileStorHandler::abortQueuedOperations(const AbortBucketOperationsCommand& cmd) -{ - _impl->abortQueuedOperations(cmd); -} - -void -FileStorHandler::setGetNextMessageTimeout(vespalib::duration timeout) -{ - _impl->setGetNextMessageTimeout(timeout); -} - -std::string -FileStorHandler::dumpQueue() const -{ - return _impl->dumpQueue(); -} - -} // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h index f0421970d08..4427a2f45e8 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h @@ -49,7 +49,7 @@ public: class BucketLockInterface { public: - typedef std::shared_ptr<BucketLockInterface> SP; + using SP = std::shared_ptr<BucketLockInterface>; virtual const document::Bucket &getBucket() const = 0; virtual api::LockingRequirements lockingRequirements() const noexcept = 0; @@ -57,7 +57,7 @@ public: virtual ~BucketLockInterface() = default; }; - typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage; + using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>; enum DiskState { AVAILABLE, @@ -65,12 +65,8 @@ public: CLOSED }; - FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, - ServiceLayerComponentRegister&); - FileStorHandler(MessageSender&, FileStorMetrics&, ServiceLayerComponentRegister&); - ~FileStorHandler(); + virtual ~FileStorHandler() = default; - // Commands used by file stor manager /** * Waits for the filestor queues to be empty. Providing no new load is @@ -79,10 +75,10 @@ public: * @param killPendingMerges If true, clear out all pending merges and reply * to them with failure. */ - void flush(bool killPendingMerges); + virtual void flush(bool killPendingMerges) = 0; - void setDiskState(DiskState state); - DiskState getDiskState(); + virtual void setDiskState(DiskState state) = 0; + virtual DiskState getDiskState() const = 0; /** Check whether it is enabled or not. */ bool enabled() { return (getDiskState() == AVAILABLE); } @@ -93,26 +89,26 @@ public: */ void disable() { setDiskState(DISABLED); } /** Closes all disk threads. */ - void close(); + virtual void close() = 0; /** * Makes sure no operations are active, then stops any new operations * from being performed, until the ResumeGuard is destroyed. */ - ResumeGuard pause(); + virtual ResumeGuard pause() = 0; /** * Schedule a storage message to be processed * @return True if we maanged to schedule operation. False if not */ - bool schedule(const std::shared_ptr<api::StorageMessage>&); + virtual bool schedule(const std::shared_ptr<api::StorageMessage>&) = 0; /** * Used by file stor threads to get their next message to process. * * @param stripe The stripe to get messages for */ - LockedMessage getNextMessage(uint32_t stripeId); + virtual LockedMessage getNextMessage(uint32_t stripeId) = 0; /** * Lock a bucket. By default, each file stor thread has the locks of all @@ -128,7 +124,7 @@ public: * * */ - BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq); + virtual BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq) = 0; /** * Called by FileStorThread::onBucketDiskMove() after moving file, in case @@ -140,7 +136,7 @@ public: * requeststatus - Ignore * readbucketinfo/bucketdiskmove/internalbucketjoin - Fail and log errors */ - void remapQueueAfterDiskMove(const document::Bucket &bucket); + virtual void remapQueueAfterDiskMove(const document::Bucket &bucket) = 0; /** * Called by FileStorThread::onJoin() after joining a bucket into another, @@ -157,7 +153,7 @@ public: * requeststatus/deletebucket - Ignore * readbucketinfo/internalbucketjoin - Fail and log errors */ - void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target); + virtual void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) = 0; /** * Called by FileStorThread::onSplit() after splitting a bucket, @@ -177,9 +173,9 @@ public: * requeststatus/deletebucket - Ignore * readbucketinfo/internalbucketjoin - Fail and log errors */ - void remapQueueAfterSplit(const RemapInfo& source, - RemapInfo& target1, - RemapInfo& target2); + virtual void remapQueueAfterSplit(const RemapInfo& source, + RemapInfo& target1, + RemapInfo& target2) = 0; struct DeactivateCallback { virtual ~DeactivateCallback() {} @@ -190,12 +186,12 @@ public: * Fail all operations towards a single bucket currently queued to the * given thread with the given error code. */ - void failOperations(const document::Bucket&, const api::ReturnCode&); + virtual void failOperations(const document::Bucket&, const api::ReturnCode&) = 0; /** * Add a new merge state to the registry. */ - void addMergeStatus(const document::Bucket&, MergeStatus::SP); + virtual void addMergeStatus(const document::Bucket&, MergeStatus::SP) = 0; /** * Returns the reference to the current merge status for the given bucket. @@ -204,7 +200,7 @@ public: * * @param bucket The bucket to start merging. */ - MergeStatus& editMergeStatus(const document::Bucket& bucket); + virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0; /** * Returns true if the bucket is currently being merged on this node. @@ -212,41 +208,33 @@ public: * @param bucket The bucket to check merge status for * @return Returns true if the bucket is being merged. */ - bool isMerging(const document::Bucket& bucket) const; + virtual bool isMerging(const document::Bucket& bucket) const = 0; /** * @return Returns the number of active merges on the node. */ - uint32_t getNumActiveMerges() const; + virtual uint32_t getNumActiveMerges() const = 0; /// Provides the next stripe id for a certain disk. - uint32_t getNextStripeId(); + virtual uint32_t getNextStripeId() = 0; /** Removes the merge status for the given bucket. */ - void clearMergeStatus(const document::Bucket&); - void clearMergeStatus(const document::Bucket&, const api::ReturnCode&); + virtual void clearMergeStatus(const document::Bucket&) = 0; + virtual void clearMergeStatus(const document::Bucket&, const api::ReturnCode&) = 0; - void abortQueuedOperations(const AbortBucketOperationsCommand& cmd); - - /** Send the given command back out of the persistence layer. */ - void sendCommand(const api::StorageCommand::SP&) override; - /** Send the given reply back out of the persistence layer. */ - void sendReply(const api::StorageReply::SP&) override; - void sendReplyDirectly(const std::shared_ptr<api::StorageReply>&) override; + virtual void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) = 0; /** Writes status page. */ - void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const; + virtual void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const = 0; /** Utility function to fetch total size of queue. */ - uint32_t getQueueSize() const; + virtual uint32_t getQueueSize() const = 0; // Commands used by testing - void setGetNextMessageTimeout(vespalib::duration timeout); + virtual void setGetNextMessageTimeout(vespalib::duration timeout) = 0; - std::string dumpQueue() const; + virtual std::string dumpQueue() const = 0; -private: - std::unique_ptr<FileStorHandlerImpl> _impl; }; } // storage diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp index 82541f5e9b4..31cf06dfda4 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp @@ -37,6 +37,12 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex } +FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, + ServiceLayerComponentRegister& compReg) + : FileStorHandlerImpl(1, 1, sender, metrics, compReg) +{ +} + FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics, ServiceLayerComponentRegister& compReg) @@ -106,6 +112,18 @@ FileStorHandlerImpl::getNumActiveMerges() const } void +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket) +{ + clearMergeStatus(bucket, nullptr); +} + +void +FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) +{ + clearMergeStatus(bucket, &code); +} + +void FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code) { std::lock_guard mlock(_mergeStatesLock); @@ -707,6 +725,25 @@ FileStorHandlerImpl::remapQueueNoLock(const RemapInfo& source, std::vector<Remap } void +FileStorHandlerImpl::remapQueueAfterDiskMove(const document::Bucket& bucket) +{ + RemapInfo target(bucket); + remapQueue(RemapInfo(bucket), target, FileStorHandlerImpl::MOVE); +} + +void +FileStorHandlerImpl::remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) +{ + remapQueue(source, target, FileStorHandlerImpl::JOIN); +} + +void +FileStorHandlerImpl::remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) +{ + remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT); +} + +void FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Operation op) { // Use a helper class to lock to solve issue that some buckets might be diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h index dfb830d72a5..b4fd18bd2e2 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h +++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h @@ -43,10 +43,8 @@ namespace bmi = boost::multi_index; class FileStorHandlerImpl : private framework::MetricUpdateHook, private ResumeGuard::Callback, - public MessageSender { + public FileStorHandler { public: - typedef FileStorHandler::DiskState DiskState; - typedef FileStorHandler::RemapInfo RemapInfo; struct MessageEntry { std::shared_ptr<api::StorageMessage> _command; @@ -172,54 +170,68 @@ public: api::LockingRequirements _lockReq; }; + + FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics, + ServiceLayerComponentRegister& compReg); FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&, ServiceLayerComponentRegister&); ~FileStorHandlerImpl(); - void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimeout = timeout; } + void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; } + + void flush(bool killPendingMerges) override; + void setDiskState(DiskState state) override; + DiskState getDiskState() const override; + void close() override; + bool schedule(const std::shared_ptr<api::StorageMessage>&) override; - void flush(bool killPendingMerges); - void setDiskState(DiskState state); - DiskState getDiskState() const; - void close(); - bool schedule(const std::shared_ptr<api::StorageMessage>&); + FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId) override; - FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId); + void remapQueueAfterDiskMove(const document::Bucket& bucket) override; + void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override; + void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override; enum Operation { MOVE, SPLIT, JOIN }; void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op); void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op); - void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) { + void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) override { stripe(bucket).failOperations(bucket, code); } + + // Implements MessageSender void sendCommand(const std::shared_ptr<api::StorageCommand>&) override; void sendReply(const std::shared_ptr<api::StorageReply>&) override; void sendReplyDirectly(const api::StorageReply::SP& msg) override; - void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const; + void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const override; - uint32_t getQueueSize() const; - uint32_t getNextStripeId() { + uint32_t getQueueSize() const override; + uint32_t getNextStripeId() override { return (_nextStripeId++) % _stripes.size(); } std::shared_ptr<FileStorHandler::BucketLockInterface> - lock(const document::Bucket & bucket, api::LockingRequirements lockReq) { + lock(const document::Bucket & bucket, api::LockingRequirements lockReq) override { return stripe(bucket).lock(bucket, lockReq); } - void addMergeStatus(const document::Bucket&, MergeStatus::SP); - MergeStatus& editMergeStatus(const document::Bucket&); - bool isMerging(const document::Bucket&) const; - uint32_t getNumActiveMerges() const; + void addMergeStatus(const document::Bucket&, MergeStatus::SP) override; + MergeStatus& editMergeStatus(const document::Bucket&) override; + bool isMerging(const document::Bucket&) const override; + uint32_t getNumActiveMerges() const override; + void clearMergeStatus(const document::Bucket& bucket) override; + void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override; + void clearMergeStatus(const document::Bucket&, const api::ReturnCode*); - std::string dumpQueue() const; - ResumeGuard pause(); + std::string dumpQueue() const override; + ResumeGuard pause() override; + void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override; + + // Implements ResumeGuard::Callback void resume() override; - void abortQueuedOperations(const AbortBucketOperationsCommand& cmd); private: ServiceLayerComponent _component; @@ -269,7 +281,7 @@ private: static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg); static bool messageMayBeAborted(const api::StorageMessage& msg); - // Update hook + // Implements framework::MetricUpdateHook void updateMetrics(const MetricLockGuard &) override; document::Bucket diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp index 97d7feac5e9..fdb52635292 100644 --- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp +++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp @@ -1,5 +1,6 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "filestorhandlerimpl.h" #include "filestormanager.h" #include <vespa/storage/bucketdb/lockablemap.hpp> @@ -126,7 +127,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC size_t numStripes = std::max(size_t(1u), numThreads / 2); _metrics->initDiskMetrics(1, _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads); - _filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _compReg); + _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg); uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads); _sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType)); assert(_sequencedExecutor); |