From 5e676a8ffc0034848366293ad744e0850c585884 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 19 Jan 2021 07:46:55 +0000 Subject: Implement bucketexecutor interface and. --- .../persistence/dummyimpl/dummypersistence.cpp | 25 ++++++++++++++++ .../vespa/persistence/dummyimpl/dummypersistence.h | 3 ++ .../spi/abstractpersistenceprovider.cpp | 6 ---- .../persistence/spi/abstractpersistenceprovider.h | 2 -- .../src/vespa/persistence/spi/bucket_tasks.h | 33 ++++++++++++++++++++++ .../src/vespa/persistence/spi/bucketexecutor.h | 5 ++-- 6 files changed, 64 insertions(+), 10 deletions(-) create mode 100644 persistence/src/vespa/persistence/spi/bucket_tasks.h (limited to 'persistence') diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 0c1c5db69d6..086191504d8 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include #include #include @@ -863,6 +864,30 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste return {}; } +namespace { + +class UnRegisterExecutor : public vespalib::IDestructorCallback { +public: + UnRegisterExecutor(std::shared_ptr executor) : _executor(std::move(executor)) { } + ~UnRegisterExecutor() override { + if (_executor) { + _executor->sync(); + } + } +private: + std::shared_ptr _executor; +}; + +} + +std::unique_ptr +DummyPersistence::register_executor(std::shared_ptr executor) +{ + assert(_bucket_executor.expired()); + _bucket_executor = executor; + return std::make_unique(executor); +} + std::string DummyPersistence::dumpBucket(const Bucket& b) const { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c37af0d33eb..ad50648abaf 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -176,6 +176,8 @@ public: Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; std::unique_ptr register_resource_usage_listener(IResourceUsageListener& listener) override; + std::unique_ptr register_executor(std::shared_ptr) override; + std::shared_ptr get_bucket_executor() noexcept { return _bucket_executor.lock(); } /** * The following methods are used only for unit testing. @@ -213,6 +215,7 @@ private: std::condition_variable _cond; std::unique_ptr _clusterState; + std::weak_ptr _bucket_executor; std::unique_ptr parseDocumentSelection( const string& documentSelection, diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index facdb2cadfa..1d873e9a20e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -30,10 +30,4 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const return BucketIdListResult(list); } -std::unique_ptr -AbstractPersistenceProvider::register_executor(std::shared_ptr) -{ - return {}; -} - } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 5023febe9a2..016928ab10e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -48,8 +48,6 @@ public: * Default impl empty. */ BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - - std::unique_ptr register_executor(std::shared_ptr) override; }; } diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h new file mode 100644 index 00000000000..12daa463e2b --- /dev/null +++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucketexecutor.h" + +namespace storage::spi { + +template +class LambdaBucketTask : public BucketTask { +public: + explicit LambdaBucketTask(FunctionType &&func) + : _func(std::move(func)) + {} + + ~LambdaBucketTask() override = default; + + void run(const Bucket & bucket, std::shared_ptr onComplete) override { + _func(bucket, std::move(onComplete)); + } + +private: + FunctionType _func; +}; + +template +std::unique_ptr +makeBucketTask(FunctionType &&function) { + return std::make_unique>> + (std::forward(function)); +} + +} diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h index 07c86fd5ffb..8237b78cca0 100644 --- a/persistence/src/vespa/persistence/spi/bucketexecutor.h +++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h @@ -3,7 +3,8 @@ #pragma once #include "bucket.h" -#include "operationcomplete.h" + +namespace vespalib { class IDestructorCallback; } namespace storage::spi { @@ -17,7 +18,7 @@ namespace storage::spi { class BucketTask { public: virtual ~BucketTask() = default; - virtual void run(const Bucket & bucket, std::unique_ptr onComplete) = 0; + virtual void run(const Bucket & bucket, std::shared_ptr onComplete) = 0; }; /** -- cgit v1.2.3 From c5c86dd6b6f0baf2b807ed9cd2dbc16bb507cab8 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 20 Jan 2021 09:47:08 +0000 Subject: Add class comments. --- persistence/src/vespa/persistence/spi/bucket_tasks.h | 3 +++ storage/src/vespa/storage/persistence/messages.h | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) (limited to 'persistence') diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h index 12daa463e2b..0b9c283817d 100644 --- a/persistence/src/vespa/persistence/spi/bucket_tasks.h +++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h @@ -6,6 +6,9 @@ namespace storage::spi { +/** + * Simple Bucket task that wraps a lambda that does the job. + */ template class LambdaBucketTask : public BucketTask { public: diff --git a/storage/src/vespa/storage/persistence/messages.h b/storage/src/vespa/storage/persistence/messages.h index 55ca87a1941..3b49365d560 100644 --- a/storage/src/vespa/storage/persistence/messages.h +++ b/storage/src/vespa/storage/persistence/messages.h @@ -238,6 +238,9 @@ public: void print(std::ostream& out, bool verbose, const std::string& indent) const override; }; + +// Internal Command task for bringing along a Bucket and a BucketTask in +// the inner workings of the storagelink chain. class RunTaskCommand : public api::InternalCommand { public: static const uint32_t ID = 1011; @@ -252,9 +255,10 @@ public: private: std::unique_ptr _task; - spi::Bucket _bucket; + spi::Bucket _bucket; }; +// Simple reply for matching the RunTaskCommand class RunTaskReply : public api::InternalReply { public: @@ -263,6 +267,5 @@ private: static const uint32_t ID = 1012; }; - } // ns storage -- cgit v1.2.3 From ff551559deacaacc3bb77699686bb6c65e08a818 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Wed, 20 Jan 2021 12:01:13 +0000 Subject: Add debug dumping and other minor followup on PR comments. --- .../persistence/dummyimpl/dummypersistence.cpp | 8 +++---- .../proton/persistenceengine/persistenceengine.cpp | 8 +++---- .../filestorage/filestormanagertest.cpp | 6 ++--- storage/src/vespa/storage/persistence/messages.cpp | 20 +++++++++++++++++ storage/src/vespa/storage/persistence/messages.h | 26 ++++++++++++---------- 5 files changed, 45 insertions(+), 23 deletions(-) (limited to 'persistence') diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 086191504d8..0865500d3c0 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -866,10 +866,10 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste namespace { -class UnRegisterExecutor : public vespalib::IDestructorCallback { +class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { public: - UnRegisterExecutor(std::shared_ptr executor) : _executor(std::move(executor)) { } - ~UnRegisterExecutor() override { + explicit SyncExecutorOnDestruction(std::shared_ptr executor) : _executor(std::move(executor)) { } + ~SyncExecutorOnDestruction() override { if (_executor) { _executor->sync(); } @@ -885,7 +885,7 @@ DummyPersistence::register_executor(std::shared_ptr executor) { assert(_bucket_executor.expired()); _bucket_executor = executor; - return std::make_unique(executor); + return std::make_unique(executor); } std::string diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 308b5e67f61..8bec6f9dd68 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -741,10 +741,10 @@ PersistenceEngine::getWLock() const namespace { -class UnRegisterExecutor : public vespalib::IDestructorCallback { +class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { public: - UnRegisterExecutor(std::shared_ptr executor) : _executor(std::move(executor)) { } - ~UnRegisterExecutor() override { + explicit SyncExecutorOnDestruction(std::shared_ptr executor) : _executor(std::move(executor)) { } + ~SyncExecutorOnDestruction() override { if (_executor) { _executor->sync(); } @@ -760,7 +760,7 @@ PersistenceEngine::register_executor(std::shared_ptr executor) { assert(_bucket_executor.expired()); _bucket_executor = executor; - return std::make_unique(executor); + return std::make_unique(executor); } } // storage diff --git a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp index 2cd9f34c0f3..acccbb8b9b9 100644 --- a/storage/src/tests/persistence/filestorage/filestormanagertest.cpp +++ b/storage/src/tests/persistence/filestorage/filestormanagertest.cpp @@ -419,14 +419,14 @@ TEST_F(FileStorManagerTest, running_task_against_unknown_bucket_fails) { EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); auto executor = getDummyPersistence().get_bucket_executor(); - EXPECT_TRUE(executor); + ASSERT_TRUE(executor); spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); std::atomic numInvocations(0); auto response = executor->execute(b1, spi::makeBucketTask([&numInvocations](const spi::Bucket &, std::shared_ptr) { numInvocations++; })); - EXPECT_TRUE(response); + ASSERT_TRUE(response); EXPECT_EQ(0, numInvocations); response->run(spi::Bucket(), {}); EXPECT_EQ(1, numInvocations); @@ -439,7 +439,7 @@ TEST_F(FileStorManagerTest, running_task_against_existing_bucket_works) { EXPECT_TRUE(getDummyPersistence().getClusterState().nodeUp()); auto executor = getDummyPersistence().get_bucket_executor(); - EXPECT_TRUE(executor); + ASSERT_TRUE(executor); spi::Bucket b1 = makeSpiBucket(document::BucketId(1)); diff --git a/storage/src/vespa/storage/persistence/messages.cpp b/storage/src/vespa/storage/persistence/messages.cpp index 528a19ba7dc..7ccb3ee895d 100644 --- a/storage/src/vespa/storage/persistence/messages.cpp +++ b/storage/src/vespa/storage/persistence/messages.cpp @@ -193,8 +193,28 @@ RunTaskCommand::RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr UP; typedef std::shared_ptr SP; @@ -51,7 +51,7 @@ private: public: typedef std::unique_ptr UP; typedef std::shared_ptr SP; - static const uint32_t ID = 1002; + static constexpr uint32_t ID = 1002; explicit GetIterReply(GetIterCommand& cmd); ~GetIterReply() override; @@ -81,7 +81,7 @@ class CreateIteratorCommand : public api::InternalCommand spi::ReadConsistency _readConsistency; public: - static const uint32_t ID = 1003; + static constexpr uint32_t ID = 1003; typedef std::unique_ptr UP; typedef std::shared_ptr SP; @@ -115,7 +115,7 @@ class CreateIteratorReply : public api::InternalReply document::Bucket _bucket; spi::IteratorId _iteratorId; public: - static const uint32_t ID = 1004; + static constexpr uint32_t ID = 1004; typedef std::unique_ptr UP; typedef std::shared_ptr SP; @@ -133,7 +133,7 @@ class DestroyIteratorCommand : public api::InternalCommand { spi::IteratorId _iteratorId; public: - static const uint32_t ID = 1005; + static constexpr uint32_t ID = 1005; typedef std::unique_ptr UP; typedef std::shared_ptr SP; @@ -151,7 +151,7 @@ class DestroyIteratorReply : public api::InternalReply { spi::IteratorId _iteratorId; public: - static const uint32_t ID = 1006; + static constexpr uint32_t ID = 1006; typedef std::unique_ptr UP; typedef std::shared_ptr SP; @@ -165,7 +165,7 @@ class RecheckBucketInfoCommand : public api::InternalCommand { document::Bucket _bucket; public: - static const uint32_t ID = 1007; + static constexpr uint32_t ID = 1007; typedef std::shared_ptr SP; typedef std::unique_ptr UP; @@ -183,7 +183,7 @@ class RecheckBucketInfoReply : public api::InternalReply { document::Bucket _bucket; public: - static const uint32_t ID = 1008; + static constexpr uint32_t ID = 1008; typedef std::shared_ptr SP; typedef std::unique_ptr UP; @@ -207,7 +207,7 @@ public: } }; - static const uint32_t ID = 1009; + static constexpr uint32_t ID = 1009; typedef std::shared_ptr SP; typedef std::shared_ptr CSP; private: @@ -228,7 +228,7 @@ public: class AbortBucketOperationsReply : public api::InternalReply { public: - static const uint32_t ID = 1010; + static constexpr uint32_t ID = 1010; typedef std::shared_ptr SP; typedef std::shared_ptr CSP; @@ -243,7 +243,7 @@ public: // the inner workings of the storagelink chain. class RunTaskCommand : public api::InternalCommand { public: - static const uint32_t ID = 1011; + static constexpr uint32_t ID = 1011; RunTaskCommand(const spi::Bucket &bucket, std::unique_ptr task); ~RunTaskCommand(); @@ -253,6 +253,7 @@ public: return *_task; } + void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: std::unique_ptr _task; spi::Bucket _bucket; @@ -263,8 +264,9 @@ class RunTaskReply : public api::InternalReply { public: explicit RunTaskReply(const RunTaskCommand&); + void print(std::ostream& out, bool verbose, const std::string& indent) const override; private: - static const uint32_t ID = 1012; + static constexpr uint32_t ID = 1012; }; } // ns storage -- cgit v1.2.3