diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-20 13:02:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-20 13:02:35 +0100 |
commit | 669dcc1c35f21672fb63363a5a7f2f7989f20cef (patch) | |
tree | e2dd3036aeda242958c21cd137575cbabfbf1312 /persistence/src | |
parent | 6f649d5dd2234c7dd581f9ac206663cbfa6b8ae8 (diff) | |
parent | ff551559deacaacc3bb77699686bb6c65e08a818 (diff) |
Merge pull request #16098 from vespa-engine/balder/implement-bucketexecutor-for-filestormanager
Implement bucketexecutor interface and.
Diffstat (limited to 'persistence/src')
6 files changed, 67 insertions, 10 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 0c1c5db69d6..0865500d3c0 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -8,6 +8,7 @@ #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/persistence/spi/i_resource_usage_listener.h> #include <vespa/persistence/spi/resource_usage.h> +#include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/util/crc.h> #include <vespa/document/fieldset/fieldsetrepo.h> #include <vespa/vespalib/stllike/asciistream.h> @@ -863,6 +864,30 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste return {}; } +namespace { + +class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { +public: + explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~SyncExecutorOnDestruction() override { + if (_executor) { + _executor->sync(); + } + } +private: + std::shared_ptr<BucketExecutor> _executor; +}; + +} + +std::unique_ptr<vespalib::IDestructorCallback> +DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor) +{ + assert(_bucket_executor.expired()); + _bucket_executor = executor; + return std::make_unique<SyncExecutorOnDestruction>(executor); +} + std::string DummyPersistence::dumpBucket(const Bucket& b) const { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c37af0d33eb..ad50648abaf 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -176,6 +176,8 @@ public: Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; + std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); } /** * The following methods are used only for unit testing. @@ -213,6 +215,7 @@ private: std::condition_variable _cond; std::unique_ptr<ClusterState> _clusterState; + std::weak_ptr<BucketExecutor> _bucket_executor; std::unique_ptr<document::select::Node> parseDocumentSelection( const string& documentSelection, diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index facdb2cadfa..1d873e9a20e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -30,10 +30,4 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const return BucketIdListResult(list); } -std::unique_ptr<vespalib::IDestructorCallback> -AbstractPersistenceProvider::register_executor(std::shared_ptr<BucketExecutor>) -{ - return {}; -} - } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 5023febe9a2..016928ab10e 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -48,8 +48,6 @@ public: * Default impl empty. */ BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override; - - std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; }; } diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h new file mode 100644 index 00000000000..0b9c283817d --- /dev/null +++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h @@ -0,0 +1,36 @@ +// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. + +#pragma once + +#include "bucketexecutor.h" + +namespace storage::spi { + +/** + * Simple Bucket task that wraps a lambda that does the job. + */ +template<class FunctionType> +class LambdaBucketTask : public BucketTask { +public: + explicit LambdaBucketTask(FunctionType &&func) + : _func(std::move(func)) + {} + + ~LambdaBucketTask() override = default; + + void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) override { + _func(bucket, std::move(onComplete)); + } + +private: + FunctionType _func; +}; + +template<class FunctionType> +std::unique_ptr<BucketTask> +makeBucketTask(FunctionType &&function) { + return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>>> + (std::forward<FunctionType>(function)); +} + +} diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h index 07c86fd5ffb..8237b78cca0 100644 --- a/persistence/src/vespa/persistence/spi/bucketexecutor.h +++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h @@ -3,7 +3,8 @@ #pragma once #include "bucket.h" -#include "operationcomplete.h" + +namespace vespalib { class IDestructorCallback; } namespace storage::spi { @@ -17,7 +18,7 @@ namespace storage::spi { class BucketTask { public: virtual ~BucketTask() = default; - virtual void run(const Bucket & bucket, std::unique_ptr<OperationComplete> onComplete) = 0; + virtual void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) = 0; }; /** |