diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-01-20 13:02:35 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-01-20 13:02:35 +0100 |
commit | 669dcc1c35f21672fb63363a5a7f2f7989f20cef (patch) | |
tree | e2dd3036aeda242958c21cd137575cbabfbf1312 /searchcore/src | |
parent | 6f649d5dd2234c7dd581f9ac206663cbfa6b8ae8 (diff) | |
parent | ff551559deacaacc3bb77699686bb6c65e08a818 (diff) |
Merge pull request #16098 from vespa-engine/balder/implement-bucketexecutor-for-filestormanager
Implement bucketexecutor interface and.
Diffstat (limited to 'searchcore/src')
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp | 26 | ||||
-rw-r--r-- | searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h | 4 |
2 files changed, 30 insertions, 0 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 0d5d8ad3144..8bec6f9dd68 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -3,6 +3,7 @@ #include "persistenceengine.h" #include "ipersistenceengineowner.h" #include "transport_latch.h" +#include <vespa/persistence/spi/bucketexecutor.h> #include <vespa/vespalib/stllike/hash_set.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/datatype/documenttype.h> @@ -18,6 +19,7 @@ LOG_SETUP(".proton.persistenceengine.persistenceengine"); using document::Document; using document::DocumentId; using storage::spi::BucketChecksum; +using storage::spi::BucketExecutor; using storage::spi::BucketIdListResult; using storage::spi::BucketInfo; using storage::spi::BucketInfoResult; @@ -737,4 +739,28 @@ PersistenceEngine::getWLock() const return WriteGuard(_rwMutex); } +namespace { + +class SyncExecutorOnDestruction : public vespalib::IDestructorCallback { +public: + explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { } + ~SyncExecutorOnDestruction() override { + if (_executor) { + _executor->sync(); + } + } +private: + std::shared_ptr<BucketExecutor> _executor; +}; + +} + +std::unique_ptr<vespalib::IDestructorCallback> +PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor) +{ + assert(_bucket_executor.expired()); + _bucket_executor = executor; + return std::make_unique<SyncExecutorOnDestruction>(executor); +} + } // storage diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 659156fdea0..b5a99525575 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -39,6 +39,7 @@ private: using TimestampList = storage::spi::TimestampList; using UpdateResult = storage::spi::UpdateResult; using OperationComplete = storage::spi::OperationComplete; + using BucketExecutor = storage::spi::BucketExecutor; struct IteratorEntry { PersistenceHandlerSequence handler_sequence; @@ -73,6 +74,7 @@ private: mutable ExtraModifiedBuckets _extraModifiedBuckets; mutable std::shared_mutex _rwMutex; std::shared_ptr<ResourceUsageTracker> _resource_usage_tracker; + std::weak_ptr<BucketExecutor> _bucket_executor; using ReadGuard = std::shared_lock<std::shared_mutex>; using WriteGuard = std::unique_lock<std::shared_mutex>; @@ -116,12 +118,14 @@ public: Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override; + std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override; void destroyIterators(); void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler); void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler); void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler); WriteGuard getWLock() const; ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; } + std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); } }; } |