summaryrefslogtreecommitdiffstats
path: root/searchcore/src
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-20 13:02:35 +0100
committerGitHub <noreply@github.com>2021-01-20 13:02:35 +0100
commit669dcc1c35f21672fb63363a5a7f2f7989f20cef (patch)
treee2dd3036aeda242958c21cd137575cbabfbf1312 /searchcore/src
parent6f649d5dd2234c7dd581f9ac206663cbfa6b8ae8 (diff)
parentff551559deacaacc3bb77699686bb6c65e08a818 (diff)
Merge pull request #16098 from vespa-engine/balder/implement-bucketexecutor-for-filestormanager
Implement bucketexecutor interface and.
Diffstat (limited to 'searchcore/src')
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp26
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h4
2 files changed, 30 insertions, 0 deletions
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 0d5d8ad3144..8bec6f9dd68 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -3,6 +3,7 @@
#include "persistenceengine.h"
#include "ipersistenceengineowner.h"
#include "transport_latch.h"
+#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/stllike/hash_set.h>
#include <vespa/document/fieldvalue/document.h>
#include <vespa/document/datatype/documenttype.h>
@@ -18,6 +19,7 @@ LOG_SETUP(".proton.persistenceengine.persistenceengine");
using document::Document;
using document::DocumentId;
using storage::spi::BucketChecksum;
+using storage::spi::BucketExecutor;
using storage::spi::BucketIdListResult;
using storage::spi::BucketInfo;
using storage::spi::BucketInfoResult;
@@ -737,4 +739,28 @@ PersistenceEngine::getWLock() const
return WriteGuard(_rwMutex);
}
+namespace {
+
+class SyncExecutorOnDestruction : public vespalib::IDestructorCallback {
+public:
+ explicit SyncExecutorOnDestruction(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
+ ~SyncExecutorOnDestruction() override {
+ if (_executor) {
+ _executor->sync();
+ }
+ }
+private:
+ std::shared_ptr<BucketExecutor> _executor;
+};
+
+}
+
+std::unique_ptr<vespalib::IDestructorCallback>
+PersistenceEngine::register_executor(std::shared_ptr<BucketExecutor> executor)
+{
+ assert(_bucket_executor.expired());
+ _bucket_executor = executor;
+ return std::make_unique<SyncExecutorOnDestruction>(executor);
+}
+
} // storage
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index 659156fdea0..b5a99525575 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -39,6 +39,7 @@ private:
using TimestampList = storage::spi::TimestampList;
using UpdateResult = storage::spi::UpdateResult;
using OperationComplete = storage::spi::OperationComplete;
+ using BucketExecutor = storage::spi::BucketExecutor;
struct IteratorEntry {
PersistenceHandlerSequence handler_sequence;
@@ -73,6 +74,7 @@ private:
mutable ExtraModifiedBuckets _extraModifiedBuckets;
mutable std::shared_mutex _rwMutex;
std::shared_ptr<ResourceUsageTracker> _resource_usage_tracker;
+ std::weak_ptr<BucketExecutor> _bucket_executor;
using ReadGuard = std::shared_lock<std::shared_mutex>;
using WriteGuard = std::unique_lock<std::shared_mutex>;
@@ -116,12 +118,14 @@ public:
Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override;
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
void destroyIterators();
void propagateSavedClusterState(BucketSpace bucketSpace, IPersistenceHandler &handler);
void grabExtraModifiedBuckets(BucketSpace bucketSpace, IPersistenceHandler &handler);
void populateInitialBucketDB(const WriteGuard & guard, BucketSpace bucketSpace, IPersistenceHandler &targetHandler);
WriteGuard getWLock() const;
ResourceUsageTracker &get_resource_usage_tracker() noexcept { return *_resource_usage_tracker; }
+ std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); }
};
}