summaryrefslogtreecommitdiffstats
path: root/persistence
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-01-19 07:46:55 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-01-19 13:54:30 +0000
commit5e676a8ffc0034848366293ad744e0850c585884 (patch)
tree7f440a67a5af24f340dcbf8bd3c0a94087b7ca17 /persistence
parent317fedff48f7211e9d48c7d407d0512bd4ee65b9 (diff)
Implement bucketexecutor interface and.
Diffstat (limited to 'persistence')
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp25
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h3
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp6
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h2
-rw-r--r--persistence/src/vespa/persistence/spi/bucket_tasks.h33
-rw-r--r--persistence/src/vespa/persistence/spi/bucketexecutor.h5
6 files changed, 64 insertions, 10 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 0c1c5db69d6..086191504d8 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -8,6 +8,7 @@
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/persistence/spi/i_resource_usage_listener.h>
#include <vespa/persistence/spi/resource_usage.h>
+#include <vespa/persistence/spi/bucketexecutor.h>
#include <vespa/vespalib/util/crc.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
#include <vespa/vespalib/stllike/asciistream.h>
@@ -863,6 +864,30 @@ DummyPersistence::register_resource_usage_listener(IResourceUsageListener &liste
return {};
}
+namespace {
+
+class UnRegisterExecutor : public vespalib::IDestructorCallback {
+public:
+ UnRegisterExecutor(std::shared_ptr<BucketExecutor> executor) : _executor(std::move(executor)) { }
+ ~UnRegisterExecutor() override {
+ if (_executor) {
+ _executor->sync();
+ }
+ }
+private:
+ std::shared_ptr<BucketExecutor> _executor;
+};
+
+}
+
+std::unique_ptr<vespalib::IDestructorCallback>
+DummyPersistence::register_executor(std::shared_ptr<BucketExecutor> executor)
+{
+ assert(_bucket_executor.expired());
+ _bucket_executor = executor;
+ return std::make_unique<UnRegisterExecutor>(executor);
+}
+
std::string
DummyPersistence::dumpBucket(const Bucket& b) const
{
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index c37af0d33eb..ad50648abaf 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -176,6 +176,8 @@ public:
Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override;
std::unique_ptr<vespalib::IDestructorCallback> register_resource_usage_listener(IResourceUsageListener& listener) override;
+ std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
+ std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); }
/**
* The following methods are used only for unit testing.
@@ -213,6 +215,7 @@ private:
std::condition_variable _cond;
std::unique_ptr<ClusterState> _clusterState;
+ std::weak_ptr<BucketExecutor> _bucket_executor;
std::unique_ptr<document::select::Node> parseDocumentSelection(
const string& documentSelection,
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index facdb2cadfa..1d873e9a20e 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -30,10 +30,4 @@ AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const
return BucketIdListResult(list);
}
-std::unique_ptr<vespalib::IDestructorCallback>
-AbstractPersistenceProvider::register_executor(std::shared_ptr<BucketExecutor>)
-{
- return {};
-}
-
}
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index 5023febe9a2..016928ab10e 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -48,8 +48,6 @@ public:
* Default impl empty.
*/
BucketIdListResult getModifiedBuckets(BucketSpace bucketSpace) const override;
-
- std::unique_ptr<vespalib::IDestructorCallback> register_executor(std::shared_ptr<BucketExecutor>) override;
};
}
diff --git a/persistence/src/vespa/persistence/spi/bucket_tasks.h b/persistence/src/vespa/persistence/spi/bucket_tasks.h
new file mode 100644
index 00000000000..12daa463e2b
--- /dev/null
+++ b/persistence/src/vespa/persistence/spi/bucket_tasks.h
@@ -0,0 +1,33 @@
+// Copyright Verizon Media. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+
+#pragma once
+
+#include "bucketexecutor.h"
+
+namespace storage::spi {
+
+template<class FunctionType>
+class LambdaBucketTask : public BucketTask {
+public:
+ explicit LambdaBucketTask(FunctionType &&func)
+ : _func(std::move(func))
+ {}
+
+ ~LambdaBucketTask() override = default;
+
+ void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) override {
+ _func(bucket, std::move(onComplete));
+ }
+
+private:
+ FunctionType _func;
+};
+
+template<class FunctionType>
+std::unique_ptr<BucketTask>
+makeBucketTask(FunctionType &&function) {
+ return std::make_unique<LambdaBucketTask<std::decay_t<FunctionType>>>
+ (std::forward<FunctionType>(function));
+}
+
+}
diff --git a/persistence/src/vespa/persistence/spi/bucketexecutor.h b/persistence/src/vespa/persistence/spi/bucketexecutor.h
index 07c86fd5ffb..8237b78cca0 100644
--- a/persistence/src/vespa/persistence/spi/bucketexecutor.h
+++ b/persistence/src/vespa/persistence/spi/bucketexecutor.h
@@ -3,7 +3,8 @@
#pragma once
#include "bucket.h"
-#include "operationcomplete.h"
+
+namespace vespalib { class IDestructorCallback; }
namespace storage::spi {
@@ -17,7 +18,7 @@ namespace storage::spi {
class BucketTask {
public:
virtual ~BucketTask() = default;
- virtual void run(const Bucket & bucket, std::unique_ptr<OperationComplete> onComplete) = 0;
+ virtual void run(const Bucket & bucket, std::shared_ptr<vespalib::IDestructorCallback> onComplete) = 0;
};
/**