diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-19 07:46:55 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-19 13:54:30 +0000 |
commit | 5e676a8ffc0034848366293ad744e0850c585884 (patch) | |
tree | 7f440a67a5af24f340dcbf8bd3c0a94087b7ca17 /persistence | |
parent | 317fedff48f7211e9d48c7d407d0512bd4ee65b9 (diff) |
Implement bucketexecutor interface and.
Diffstat (limited to 'persistence')
6 files changed, 64 insertions, 10 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 0c1c5db69d6..086191504d8 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -8,6 +8,7 @@ #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/persistence/spi/i_resource_usage_listener.h> #include <vespa/persistence/spi/resource_usage.h> +#include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/util/crc.h> #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -863,6 +864,30 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste return {}; } +namespace { + +class UnRegisterExecutor : public vespalib::IDestructorCallback { +public: + UnRegisterExecutor(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~UnRegisterExecutor() override { + if (_executor) { + _executor->sync(); + } + } +private: + std::shared_ptr<BucketExecutor> _executor; +}; + +} + +std::unique_ptr<vespalib::IDestructorCallback> +DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor) +{ + assert(_bucket_executor.expired()); + _bucket_executor = executor; + return std::make_unique<UnRegisterExecutor>(executor); +} + std::string DummyPersistence::dumpBucket(const Bucket& b) const { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c37af0d33eb..ad50648abaf 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -176,6 +176,8 @@ public: Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; + std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); } /** * The following methods are used only for unit testing. @@ -213,6 +215,7 @@ private: std::condition_variable _cond; std::unique_ptr<ClusterState> _clusterState; + std::weak_ptr<BucketExecutor> _bucket_executor; std::unique_ptr<document::select::Node> parseDocumentSelection( const string& documentSelection, diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index facdb2cadfa..1d873e9a20e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -30,10 +30,4 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const return BucketIdListResult(list); } -std::unique_ptr<vespalib::IDestructorCallback> -AbstractPersistenceProvider::register_executor(std::shared_ptr<BucketExecutor>) -{ - return {}; -} - } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 5023febe9a2..016928ab10e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -48,8 +48,6 @@ public: * Default impl empty. */ BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - - std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; }; } diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h new file mode 100644 index 00000000000..12daa463e2b --- /dev/null +++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h @@ -0,0 +1,33 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucketexecutor.h" + +namespace storage::spi { + +template<class FunctionType> +class LambdaBucketTask : public BucketTask { +public: + explicit LambdaBucketTask(FunctionType &&func) + : _func(std::move(func)) + {} + + ~LambdaBucketTask() override = default; + + void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) override { + _func(bucket, std::move(onComplete)); + } + +private: + FunctionType _func; +}; + +template<class FunctionType> +std::unique_ptr<BucketTask> +makeBucketTask(FunctionType &&function) { + return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + +} diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h index 07c86fd5ffb..8237b78cca0 100644 --- a/persistence/src/vespa/persistence/spi/bucketexecutor.h +++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h @@ -3,7 +3,8 @@ #pragma once #include "bucket.h" -#include "operationcomplete.h" + +namespace vespalib { class IDestructorCallback; } namespace storage::spi { @@ -17,7 +18,7 @@ namespace storage::spi { class BucketTask { public: virtual ~BucketTask() = default; - virtual void run(const Bucket & bucket, std::unique_ptr<OperationComplete> onComplete) = 0; + virtual void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) = 0; }; /** |