summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-10-18 13:03:43 +0200
committerGitHub <noreply@github.com>2020-10-18 13:03:43 +0200
commit3d89f85313a0a8da5d8ad0e435b1877a46e6ecc3 (patch)
tree9b24a3f06f26c0c30f390994335b3136c4af8d65
parent26928636364b427ee20c6e3171bcf74255f40e40 (diff)
parent1ad100b4d84db5438c470de26f6687e231aec4bd (diff)
Merge pull request #14931 from vespa-engine/geirst/file-stor-handler-interface
Change FileStorHandler into an interface that FileStorHandlerImpl imp…
-rw-r--r--storage/src/tests/persistence/common/filestortestfixture.h6
-rw-r--r--storage/src/tests/persistence/filestorage/filestormanagertest.cpp47
-rw-r--r--storage/src/tests/persistence/persistencequeuetest.cpp4
-rw-r--r--storage/src/tests/persistence/persistencetestutils.cpp9
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt1
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp184
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandler.h70
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp37
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h58
-rw-r--r--storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp3
10 files changed, 137 insertions, 282 deletions
diff --git a/storage/src/tests/persistence/common/filestortestfixture.h b/storage/src/tests/persistence/common/filestortestfixture.h
index 85cec16a546..548dd4f3bfd 100644
--- a/storage/src/tests/persistence/common/filestortestfixture.h
+++ b/storage/src/tests/persistence/common/filestortestfixture.h
@@ -1,13 +1,13 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include <tests/common/dummystoragelink.h>
#include <tests/common/testhelper.h>
+#include <tests/common/teststorageapp.h>
#include <vespa/persistence/spi/persistenceprovider.h>
+#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
#include <vespa/storage/persistence/filestorage/filestormanager.h>
#include <vespa/storageapi/message/persistence.h>
-#include <tests/common/dummystoragelink.h>
-#include <tests/common/teststorageapp.h>
-#include <tests/common/testhelper.h>
#include <vespa/vespalib/gtest/gtest.h>
namespace storage {
diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
index 46e865a1ddd..257e943e5ab 100644
--- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
+++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp
@@ -1,26 +1,27 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include <tests/common/testhelper.h>
#include <tests/common/dummystoragelink.h>
+#include <tests/common/testhelper.h>
#include <tests/common/teststorageapp.h>
#include <tests/persistence/filestorage/forwardingmessagesender.h>
+#include <vespa/config/common/exceptions.h>
+#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/repo/documenttyperepo.h>
+#include <vespa/document/select/parser.h>
#include <vespa/document/test/make_document_bucket.h>
-#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/storage/storageserver/statemanager.h>
-#include <vespa/storage/bucketdb/bucketmanager.h>
-#include <vespa/storage/persistence/persistencethread.h>
-#include <vespa/storage/persistence/filestorage/filestormanager.h>
-#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/select/parser.h>
-#include <vespa/vdslib/state/random.h>
-#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/fastos/file.h>
#include <vespa/persistence/dummyimpl/dummypersistence.h>
#include <vespa/persistence/spi/test.h>
-#include <vespa/config/common/exceptions.h>
-#include <vespa/fastos/file.h>
+#include <vespa/storage/bucketdb/bucketmanager.h>
+#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
+#include <vespa/storage/persistence/filestorage/filestormanager.h>
+#include <vespa/storage/persistence/filestorage/modifiedbucketchecker.h>
+#include <vespa/storage/persistence/persistencethread.h>
+#include <vespa/storage/storageserver/statemanager.h>
+#include <vespa/storageapi/message/bucketsplitting.h>
+#include <vespa/vdslib/state/random.h>
#include <vespa/vespalib/gtest/gtest.h>
#include <atomic>
#include <thread>
@@ -396,7 +397,7 @@ TEST_F(FileStorManagerTest, handler_priority) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
uint32_t stripeId = filestorHandler.getNextStripeId();
ASSERT_EQ(0u, stripeId);
@@ -504,7 +505,7 @@ TEST_F(FileStorManagerTest, handler_paused_multi_thread) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
std::string content("Here is some content which is in all documents");
@@ -550,7 +551,7 @@ TEST_F(FileStorManagerTest, handler_pause) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
uint32_t stripeId = filestorHandler.getNextStripeId();
@@ -596,7 +597,7 @@ TEST_F(FileStorManagerTest, remap_split) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
std::string content("Here is some content which is in all documents");
@@ -654,7 +655,7 @@ TEST_F(FileStorManagerTest, handler_timeout) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
filestorHandler.setGetNextMessageTimeout(50ms);
uint32_t stripeId = filestorHandler.getNextStripeId();
@@ -714,7 +715,7 @@ TEST_F(FileStorManagerTest, priority) {
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 2);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0]));
@@ -794,7 +795,7 @@ TEST_F(FileStorManagerTest, split1) {
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0]));
@@ -934,7 +935,7 @@ TEST_F(FileStorManagerTest, split_single_group) {
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(),1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
spi::Context context(defaultLoadType, spi::Priority(0), spi::Trace::TraceLevel(0));
for (uint32_t j=0; j<1; ++j) {
// Test this twice, once where all the data ends up in file with
@@ -1046,7 +1047,7 @@ TEST_F(FileStorManagerTest, split_empty_target_with_remapped_ops) {
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0]));
@@ -1111,7 +1112,7 @@ TEST_F(FileStorManagerTest, notify_on_split_source_ownership_changed) {
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0]));
@@ -1152,7 +1153,7 @@ TEST_F(FileStorManagerTest, join) {
documentapi::LoadTypeSet loadTypes("raw:");
FileStorMetrics metrics(loadTypes.getMetricLoadTypes());
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- FileStorHandler filestorHandler(messageSender, metrics, _node->getComponentRegister());
+ FileStorHandlerImpl filestorHandler(messageSender, metrics, _node->getComponentRegister());
std::unique_ptr<DiskThread> thread(createThread(
*config, *_node, _node->getPersistenceProvider(),
filestorHandler, *metrics.disks[0]->threads[0]));
diff --git a/storage/src/tests/persistence/persistencequeuetest.cpp b/storage/src/tests/persistence/persistencequeuetest.cpp
index 7e54b45f96a..4737809a926 100644
--- a/storage/src/tests/persistence/persistencequeuetest.cpp
+++ b/storage/src/tests/persistence/persistencequeuetest.cpp
@@ -49,8 +49,8 @@ PersistenceQueueTest::Fixture::Fixture(FileStorTestFixture& parent_)
metrics.initDiskMetrics(1u, loadTypes.getMetricLoadTypes(), 1, 1);
- filestorHandler = std::make_unique<FileStorHandler>(messageSender, metrics,
- parent._node->getComponentRegister());
+ filestorHandler = std::make_unique<FileStorHandlerImpl>(messageSender, metrics,
+ parent._node->getComponentRegister());
// getNextMessage will time out if no unlocked buckets are present. Choose a timeout
// that is large enough to fail tests with high probability if this is not the case,
// and small enough to not slow down testing too much.
diff --git a/storage/src/tests/persistence/persistencetestutils.cpp b/storage/src/tests/persistence/persistencetestutils.cpp
index 6eb7fb7fd04..f6ba71940a8 100644
--- a/storage/src/tests/persistence/persistencetestutils.cpp
+++ b/storage/src/tests/persistence/persistencetestutils.cpp
@@ -2,13 +2,14 @@
#include "persistencetestutils.h"
#include <vespa/document/datatype/documenttype.h>
-#include <vespa/storageapi/message/persistence.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_document_bucket.h>
-#include <vespa/persistence/dummyimpl/dummypersistence.h>
-#include <vespa/persistence/spi/test.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/document/update/documentupdate.h>
+#include <vespa/persistence/dummyimpl/dummypersistence.h>
+#include <vespa/persistence/spi/test.h>
+#include <vespa/storage/persistence/filestorage/filestorhandlerimpl.h>
+#include <vespa/storageapi/message/persistence.h>
#include <vespa/vespalib/io/fileutil.h>
#include <vespa/vespalib/objects/nbostream.h>
#include <vespa/vespalib/util/exceptions.h>
@@ -54,7 +55,7 @@ PersistenceTestEnvironment::PersistenceTestEnvironment(const std::string & rootO
{
_node.setupDummyPersistence();
_metrics.initDiskMetrics(1, _node.getLoadTypes()->getMetricLoadTypes(), 1, 1);
- _handler = std::make_unique<FileStorHandler>(_messageKeeper, _metrics, _node.getComponentRegister());
+ _handler = std::make_unique<FileStorHandlerImpl>(_messageKeeper, _metrics, _node.getComponentRegister());
_diskEnv = std::make_unique<PersistenceUtil>(_config.getConfigId(), _component, *_handler,
*_metrics.disks[0]->threads[0], _node.getPersistenceProvider());
}
diff --git a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
index 182bf04b1cf..61cfcbcbfb8 100644
--- a/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
+++ b/storage/src/vespa/storage/persistence/filestorage/CMakeLists.txt
@@ -1,7 +1,6 @@
# Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_filestorpersistence OBJECT
SOURCES
- filestorhandler.cpp
filestorhandlerimpl.cpp
filestormanager.cpp
filestormetrics.cpp
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
deleted file mode 100644
index af863f82d6f..00000000000
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.cpp
+++ /dev/null
@@ -1,184 +0,0 @@
-// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
-#include "filestorhandler.h"
-#include "filestorhandlerimpl.h"
-
-namespace storage {
-
-FileStorHandler::FileStorHandler(MessageSender& sender, FileStorMetrics& metrics,
- ServiceLayerComponentRegister& compReg)
- : _impl(std::make_unique<FileStorHandlerImpl>(1, 1, sender, metrics, compReg))
-{
-}
-
-
-FileStorHandler::FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender& sender, FileStorMetrics& metrics,
- ServiceLayerComponentRegister& compReg)
- : _impl(std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, sender, metrics, compReg))
-{
-}
-
-FileStorHandler::~FileStorHandler() = default;
-void
-FileStorHandler::flush(bool flushMerges)
-{
- _impl->flush(flushMerges);
-}
-
-void
-FileStorHandler::setDiskState(DiskState state)
-{
- _impl->setDiskState(state);
-}
-
-FileStorHandler::DiskState
-FileStorHandler::getDiskState()
-{
- return _impl->getDiskState();
-}
-
-void
-FileStorHandler::close()
-{
- _impl->close();
-}
-
-ResumeGuard
-FileStorHandler::pause()
-{
- return _impl->pause();
-}
-
-bool
-FileStorHandler::schedule(const api::StorageMessage::SP& msg)
-{
- return _impl->schedule(msg);
-}
-
-FileStorHandler::LockedMessage
-FileStorHandler::getNextMessage(uint32_t stripeId)
-{
- return _impl->getNextMessage(stripeId);
-}
-
-FileStorHandler::BucketLockInterface::SP
-FileStorHandler::lock(const document::Bucket& bucket, api::LockingRequirements lockReq)
-{
- return _impl->lock(bucket, lockReq);
-}
-
-void
-FileStorHandler::remapQueueAfterDiskMove(const document::Bucket& bucket)
-{
- RemapInfo target(bucket);
- _impl->remapQueue(RemapInfo(bucket), target, FileStorHandlerImpl::MOVE);
-}
-
-void
-FileStorHandler::remapQueueAfterJoin(const RemapInfo& source,RemapInfo& target)
-{
- _impl->remapQueue(source, target, FileStorHandlerImpl::JOIN);
-}
-
-void
-FileStorHandler::remapQueueAfterSplit(const RemapInfo& source,RemapInfo& target1, RemapInfo& target2)
-{
- _impl->remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT);
-}
-
-void
-FileStorHandler::failOperations(const document::Bucket &bucket, const api::ReturnCode& err)
-{
- _impl->failOperations(bucket, err);
-}
-
-void
-FileStorHandler::sendCommand(const api::StorageCommand::SP& msg)
-{
- _impl->sendCommand(msg);
-}
-
-void
-FileStorHandler::sendReply(const api::StorageReply::SP& msg)
-{
- _impl->sendReply(msg);
-}
-
-void
-FileStorHandler::sendReplyDirectly(const api::StorageReply::SP& msg)
-{
- _impl->sendReplyDirectly(msg);
-}
-
-void
-FileStorHandler::getStatus(std::ostream& out, const framework::HttpUrlPath& path) const
-{
- _impl->getStatus(out, path);
-}
-
-uint32_t
-FileStorHandler::getQueueSize() const
-{
- return _impl->getQueueSize();
-}
-
-void
-FileStorHandler::addMergeStatus(const document::Bucket& bucket, MergeStatus::SP ms)
-{
- return _impl->addMergeStatus(bucket, ms);
-}
-
-MergeStatus&
-FileStorHandler::editMergeStatus(const document::Bucket& bucket)
-{
- return _impl->editMergeStatus(bucket);
-}
-
-bool
-FileStorHandler::isMerging(const document::Bucket& bucket) const
-{
- return _impl->isMerging(bucket);
-}
-
-uint32_t
-FileStorHandler::getNumActiveMerges() const
-{
- return _impl->getNumActiveMerges();
-}
-
-uint32_t
-FileStorHandler::getNextStripeId() {
- return _impl->getNextStripeId();
-}
-
-void
-FileStorHandler::clearMergeStatus(const document::Bucket& bucket,
- const api::ReturnCode& code)
-{
- return _impl->clearMergeStatus(bucket, &code);
-}
-
-void
-FileStorHandler::clearMergeStatus(const document::Bucket& bucket)
-{
- return _impl->clearMergeStatus(bucket, 0);
-}
-
-void
-FileStorHandler::abortQueuedOperations(const AbortBucketOperationsCommand& cmd)
-{
- _impl->abortQueuedOperations(cmd);
-}
-
-void
-FileStorHandler::setGetNextMessageTimeout(vespalib::duration timeout)
-{
- _impl->setGetNextMessageTimeout(timeout);
-}
-
-std::string
-FileStorHandler::dumpQueue() const
-{
- return _impl->dumpQueue();
-}
-
-} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
index f0421970d08..4427a2f45e8 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandler.h
@@ -49,7 +49,7 @@ public:
class BucketLockInterface {
public:
- typedef std::shared_ptr<BucketLockInterface> SP;
+ using SP = std::shared_ptr<BucketLockInterface>;
virtual const document::Bucket &getBucket() const = 0;
virtual api::LockingRequirements lockingRequirements() const noexcept = 0;
@@ -57,7 +57,7 @@ public:
virtual ~BucketLockInterface() = default;
};
- typedef std::pair<BucketLockInterface::SP, api::StorageMessage::SP> LockedMessage;
+ using LockedMessage = std::pair<BucketLockInterface::SP, api::StorageMessage::SP>;
enum DiskState {
AVAILABLE,
@@ -65,12 +65,8 @@ public:
CLOSED
};
- FileStorHandler(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
- ServiceLayerComponentRegister&);
- FileStorHandler(MessageSender&, FileStorMetrics&, ServiceLayerComponentRegister&);
- ~FileStorHandler();
+ virtual ~FileStorHandler() = default;
- // Commands used by file stor manager
/**
* Waits for the filestor queues to be empty. Providing no new load is
@@ -79,10 +75,10 @@ public:
* @param killPendingMerges If true, clear out all pending merges and reply
* to them with failure.
*/
- void flush(bool killPendingMerges);
+ virtual void flush(bool killPendingMerges) = 0;
- void setDiskState(DiskState state);
- DiskState getDiskState();
+ virtual void setDiskState(DiskState state) = 0;
+ virtual DiskState getDiskState() const = 0;
/** Check whether it is enabled or not. */
bool enabled() { return (getDiskState() == AVAILABLE); }
@@ -93,26 +89,26 @@ public:
*/
void disable() { setDiskState(DISABLED); }
/** Closes all disk threads. */
- void close();
+ virtual void close() = 0;
/**
* Makes sure no operations are active, then stops any new operations
* from being performed, until the ResumeGuard is destroyed.
*/
- ResumeGuard pause();
+ virtual ResumeGuard pause() = 0;
/**
* Schedule a storage message to be processed
* @return True if we maanged to schedule operation. False if not
*/
- bool schedule(const std::shared_ptr<api::StorageMessage>&);
+ virtual bool schedule(const std::shared_ptr<api::StorageMessage>&) = 0;
/**
* Used by file stor threads to get their next message to process.
*
* @param stripe The stripe to get messages for
*/
- LockedMessage getNextMessage(uint32_t stripeId);
+ virtual LockedMessage getNextMessage(uint32_t stripeId) = 0;
/**
* Lock a bucket. By default, each file stor thread has the locks of all
@@ -128,7 +124,7 @@ public:
*
*
*/
- BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq);
+ virtual BucketLockInterface::SP lock(const document::Bucket&, api::LockingRequirements lockReq) = 0;
/**
* Called by FileStorThread::onBucketDiskMove() after moving file, in case
@@ -140,7 +136,7 @@ public:
* requeststatus - Ignore
* readbucketinfo/bucketdiskmove/internalbucketjoin - Fail and log errors
*/
- void remapQueueAfterDiskMove(const document::Bucket &bucket);
+ virtual void remapQueueAfterDiskMove(const document::Bucket &bucket) = 0;
/**
* Called by FileStorThread::onJoin() after joining a bucket into another,
@@ -157,7 +153,7 @@ public:
* requeststatus/deletebucket - Ignore
* readbucketinfo/internalbucketjoin - Fail and log errors
*/
- void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target);
+ virtual void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) = 0;
/**
* Called by FileStorThread::onSplit() after splitting a bucket,
@@ -177,9 +173,9 @@ public:
* requeststatus/deletebucket - Ignore
* readbucketinfo/internalbucketjoin - Fail and log errors
*/
- void remapQueueAfterSplit(const RemapInfo& source,
- RemapInfo& target1,
- RemapInfo& target2);
+ virtual void remapQueueAfterSplit(const RemapInfo& source,
+ RemapInfo& target1,
+ RemapInfo& target2) = 0;
struct DeactivateCallback {
virtual ~DeactivateCallback() {}
@@ -190,12 +186,12 @@ public:
* Fail all operations towards a single bucket currently queued to the
* given thread with the given error code.
*/
- void failOperations(const document::Bucket&, const api::ReturnCode&);
+ virtual void failOperations(const document::Bucket&, const api::ReturnCode&) = 0;
/**
* Add a new merge state to the registry.
*/
- void addMergeStatus(const document::Bucket&, MergeStatus::SP);
+ virtual void addMergeStatus(const document::Bucket&, MergeStatus::SP) = 0;
/**
* Returns the reference to the current merge status for the given bucket.
@@ -204,7 +200,7 @@ public:
*
* @param bucket The bucket to start merging.
*/
- MergeStatus& editMergeStatus(const document::Bucket& bucket);
+ virtual MergeStatus& editMergeStatus(const document::Bucket& bucket) = 0;
/**
* Returns true if the bucket is currently being merged on this node.
@@ -212,41 +208,33 @@ public:
* @param bucket The bucket to check merge status for
* @return Returns true if the bucket is being merged.
*/
- bool isMerging(const document::Bucket& bucket) const;
+ virtual bool isMerging(const document::Bucket& bucket) const = 0;
/**
* @return Returns the number of active merges on the node.
*/
- uint32_t getNumActiveMerges() const;
+ virtual uint32_t getNumActiveMerges() const = 0;
/// Provides the next stripe id for a certain disk.
- uint32_t getNextStripeId();
+ virtual uint32_t getNextStripeId() = 0;
/** Removes the merge status for the given bucket. */
- void clearMergeStatus(const document::Bucket&);
- void clearMergeStatus(const document::Bucket&, const api::ReturnCode&);
+ virtual void clearMergeStatus(const document::Bucket&) = 0;
+ virtual void clearMergeStatus(const document::Bucket&, const api::ReturnCode&) = 0;
- void abortQueuedOperations(const AbortBucketOperationsCommand& cmd);
-
- /** Send the given command back out of the persistence layer. */
- void sendCommand(const api::StorageCommand::SP&) override;
- /** Send the given reply back out of the persistence layer. */
- void sendReply(const api::StorageReply::SP&) override;
- void sendReplyDirectly(const std::shared_ptr<api::StorageReply>&) override;
+ virtual void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) = 0;
/** Writes status page. */
- void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const;
+ virtual void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const = 0;
/** Utility function to fetch total size of queue. */
- uint32_t getQueueSize() const;
+ virtual uint32_t getQueueSize() const = 0;
// Commands used by testing
- void setGetNextMessageTimeout(vespalib::duration timeout);
+ virtual void setGetNextMessageTimeout(vespalib::duration timeout) = 0;
- std::string dumpQueue() const;
+ virtual std::string dumpQueue() const = 0;
-private:
- std::unique_ptr<FileStorHandlerImpl> _impl;
};
} // storage
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
index 82541f5e9b4..31cf06dfda4 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.cpp
@@ -37,6 +37,12 @@ uint32_t per_stripe_merge_limit(uint32_t num_threads, uint32_t num_stripes) noex
}
+FileStorHandlerImpl::FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
+ ServiceLayerComponentRegister& compReg)
+ : FileStorHandlerImpl(1, 1, sender, metrics, compReg)
+{
+}
+
FileStorHandlerImpl::FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender& sender,
FileStorMetrics& metrics,
ServiceLayerComponentRegister& compReg)
@@ -106,6 +112,18 @@ FileStorHandlerImpl::getNumActiveMerges() const
}
void
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket)
+{
+ clearMergeStatus(bucket, nullptr);
+}
+
+void
+FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code)
+{
+ clearMergeStatus(bucket, &code);
+}
+
+void
FileStorHandlerImpl::clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode* code)
{
std::lock_guard mlock(_mergeStatesLock);
@@ -707,6 +725,25 @@ FileStorHandlerImpl::remapQueueNoLock(const RemapInfo& source, std::vector<Remap
}
void
+FileStorHandlerImpl::remapQueueAfterDiskMove(const document::Bucket& bucket)
+{
+ RemapInfo target(bucket);
+ remapQueue(RemapInfo(bucket), target, FileStorHandlerImpl::MOVE);
+}
+
+void
+FileStorHandlerImpl::remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target)
+{
+ remapQueue(source, target, FileStorHandlerImpl::JOIN);
+}
+
+void
+FileStorHandlerImpl::remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2)
+{
+ remapQueue(source, target1, target2, FileStorHandlerImpl::SPLIT);
+}
+
+void
FileStorHandlerImpl::remapQueue(const RemapInfo& source, RemapInfo& target, Operation op)
{
// Use a helper class to lock to solve issue that some buckets might be
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
index dfb830d72a5..b4fd18bd2e2 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
+++ b/storage/src/vespa/storage/persistence/filestorage/filestorhandlerimpl.h
@@ -43,10 +43,8 @@ namespace bmi = boost::multi_index;
class FileStorHandlerImpl : private framework::MetricUpdateHook,
private ResumeGuard::Callback,
- public MessageSender {
+ public FileStorHandler {
public:
- typedef FileStorHandler::DiskState DiskState;
- typedef FileStorHandler::RemapInfo RemapInfo;
struct MessageEntry {
std::shared_ptr<api::StorageMessage> _command;
@@ -172,54 +170,68 @@ public:
api::LockingRequirements _lockReq;
};
+
+ FileStorHandlerImpl(MessageSender& sender, FileStorMetrics& metrics,
+ ServiceLayerComponentRegister& compReg);
FileStorHandlerImpl(uint32_t numThreads, uint32_t numStripes, MessageSender&, FileStorMetrics&,
ServiceLayerComponentRegister&);
~FileStorHandlerImpl();
- void setGetNextMessageTimeout(vespalib::duration timeout) { _getNextMessageTimeout = timeout; }
+ void setGetNextMessageTimeout(vespalib::duration timeout) override { _getNextMessageTimeout = timeout; }
+
+ void flush(bool killPendingMerges) override;
+ void setDiskState(DiskState state) override;
+ DiskState getDiskState() const override;
+ void close() override;
+ bool schedule(const std::shared_ptr<api::StorageMessage>&) override;
- void flush(bool killPendingMerges);
- void setDiskState(DiskState state);
- DiskState getDiskState() const;
- void close();
- bool schedule(const std::shared_ptr<api::StorageMessage>&);
+ FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId) override;
- FileStorHandler::LockedMessage getNextMessage(uint32_t stripeId);
+ void remapQueueAfterDiskMove(const document::Bucket& bucket) override;
+ void remapQueueAfterJoin(const RemapInfo& source, RemapInfo& target) override;
+ void remapQueueAfterSplit(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2) override;
enum Operation { MOVE, SPLIT, JOIN };
void remapQueue(const RemapInfo& source, RemapInfo& target, Operation op);
void remapQueue(const RemapInfo& source, RemapInfo& target1, RemapInfo& target2, Operation op);
- void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) {
+ void failOperations(const document::Bucket & bucket, const api::ReturnCode & code) override {
stripe(bucket).failOperations(bucket, code);
}
+
+ // Implements MessageSender
void sendCommand(const std::shared_ptr<api::StorageCommand>&) override;
void sendReply(const std::shared_ptr<api::StorageReply>&) override;
void sendReplyDirectly(const api::StorageReply::SP& msg) override;
- void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const;
+ void getStatus(std::ostream& out, const framework::HttpUrlPath& path) const override;
- uint32_t getQueueSize() const;
- uint32_t getNextStripeId() {
+ uint32_t getQueueSize() const override;
+ uint32_t getNextStripeId() override {
return (_nextStripeId++) % _stripes.size();
}
std::shared_ptr<FileStorHandler::BucketLockInterface>
- lock(const document::Bucket & bucket, api::LockingRequirements lockReq) {
+ lock(const document::Bucket & bucket, api::LockingRequirements lockReq) override {
return stripe(bucket).lock(bucket, lockReq);
}
- void addMergeStatus(const document::Bucket&, MergeStatus::SP);
- MergeStatus& editMergeStatus(const document::Bucket&);
- bool isMerging(const document::Bucket&) const;
- uint32_t getNumActiveMerges() const;
+ void addMergeStatus(const document::Bucket&, MergeStatus::SP) override;
+ MergeStatus& editMergeStatus(const document::Bucket&) override;
+ bool isMerging(const document::Bucket&) const override;
+ uint32_t getNumActiveMerges() const override;
+ void clearMergeStatus(const document::Bucket& bucket) override;
+ void clearMergeStatus(const document::Bucket& bucket, const api::ReturnCode& code) override;
+
void clearMergeStatus(const document::Bucket&, const api::ReturnCode*);
- std::string dumpQueue() const;
- ResumeGuard pause();
+ std::string dumpQueue() const override;
+ ResumeGuard pause() override;
+ void abortQueuedOperations(const AbortBucketOperationsCommand& cmd) override;
+
+ // Implements ResumeGuard::Callback
void resume() override;
- void abortQueuedOperations(const AbortBucketOperationsCommand& cmd);
private:
ServiceLayerComponent _component;
@@ -269,7 +281,7 @@ private:
static std::unique_ptr<api::StorageReply> makeQueueTimeoutReply(api::StorageMessage& msg);
static bool messageMayBeAborted(const api::StorageMessage& msg);
- // Update hook
+ // Implements framework::MetricUpdateHook
void updateMetrics(const MetricLockGuard &) override;
document::Bucket
diff --git a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
index 97d7feac5e9..fdb52635292 100644
--- a/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
+++ b/storage/src/vespa/storage/persistence/filestorage/filestormanager.cpp
@@ -1,5 +1,6 @@
// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "filestorhandlerimpl.h"
#include "filestormanager.h"
#include <vespa/storage/bucketdb/lockablemap.hpp>
@@ -126,7 +127,7 @@ FileStorManager::configure(std::unique_ptr<vespa::config::content::StorFilestorC
size_t numStripes = std::max(size_t(1u), numThreads / 2);
_metrics->initDiskMetrics(1, _component.getLoadTypes()->getMetricLoadTypes(), numStripes, numThreads);
- _filestorHandler = std::make_unique<FileStorHandler>(numThreads, numStripes, *this, *_metrics, _compReg);
+ _filestorHandler = std::make_unique<FileStorHandlerImpl>(numThreads, numStripes, *this, *_metrics, _compReg);
uint32_t numResponseThreads = computeNumResponseThreads(_config->numResponseThreads);
_sequencedExecutor = vespalib::SequencedTaskExecutor::create(numResponseThreads, 10000, selectSequencer(_config->responseSequencerType));
assert(_sequencedExecutor);