diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-04 21:46:32 +0200 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-04 22:24:49 +0000 |
commit | 9d769dca411691680c16e73094ef27958b260959 (patch) | |
tree | ef92f32222f18bb53e73d37554e989be3495b360 /persistence | |
parent | 8bf5ae859e0664c8fd797243328baf6dc1717f7e (diff) |
Implement async put
Implement async remove.
Diffstat (limited to 'persistence')
5 files changed, 64 insertions, 6 deletions
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index e7abe137b89..99a8b244431 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -48,6 +48,13 @@ AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp, return remove(b, timestamp, id, context); } +void +AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp, + const DocumentId& id, Context& context, OperationComplete::UP onComplete) +{ + removeAsync(b, timestamp, id, context, std::move(onComplete)); +} + BucketIdListResult AbstractPersistenceProvider::getModifiedBuckets(BucketSpace) const { diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index e346fdaa3cb..9de96944c31 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -39,6 +39,7 @@ public: * Default impl is remove(). */ RemoveResult removeIfFound(const Bucket&, Timestamp, const DocumentId&, Context&) override; + void removeIfFoundAsync(const Bucket&, Timestamp, const DocumentId&, Context&, OperationComplete::UP) override; /** * Default impl empty. diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h index fa386e274f2..18a3c250e24 100644 --- a/persistence/src/vespa/persistence/spi/operationcomplete.h +++ b/persistence/src/vespa/persistence/spi/operationcomplete.h @@ -8,6 +8,12 @@ namespace storage::spi { class Result; +class ResultHandler { +public: + virtual ~ResultHandler() = default; + virtual void handle(const Result &) const = 0; +}; + /** * This is the callback interface when using the async operations * in the persistence provider. @@ -18,6 +24,7 @@ public: using UP = std::unique_ptr<OperationComplete>; virtual ~OperationComplete() = default; virtual void onComplete(std::unique_ptr<Result> result) = 0; + virtual void addResultHandler(const ResultHandler * resultHandler) = 0; }; }
\ No newline at end of file diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index c60ac615644..b00ae340fa6 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -2,6 +2,7 @@ #include "persistenceprovider.h" #include <future> +#include <cassert> namespace storage::spi { @@ -9,14 +10,20 @@ PersistenceProvider::~PersistenceProvider() = default; class CatchResult : public OperationComplete { public: + CatchResult() : _promisedResult(), _resulthandler(nullptr) {} std::future<Result::UP> future_result() { - return promisedResult.get_future(); + return _promisedResult.get_future(); } void onComplete(Result::UP result) override { - promisedResult.set_value(std::move(result)); + _promisedResult.set_value(std::move(result)); + } + void addResultHandler(const ResultHandler * resultHandler) override { + assert(_resulthandler == nullptr); + _resulthandler = resultHandler; } private: - std::promise<Result::UP> promisedResult; + std::promise<Result::UP> _promisedResult; + const ResultHandler *_resulthandler; }; Result @@ -29,9 +36,42 @@ PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP d void PersistenceProvider::putAsync(const Bucket &bucket, Timestamp timestamp, DocumentSP doc, Context &context, - OperationComplete::UP onComplete) { + OperationComplete::UP onComplete) +{ Result result = put(bucket, timestamp, std::move(doc), context); onComplete->onComplete(std::make_unique<Result>(result)); } +RemoveResult +PersistenceProvider::remove(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + removeAsync(bucket, timestamp, docId, context, std::move(catcher)); + return dynamic_cast<const RemoveResult &>(*future.get()); +} + +void +PersistenceProvider::removeAsync(const Bucket &bucket, Timestamp timestamp, const DocumentId & docId, Context &context, + OperationComplete::UP onComplete) +{ + RemoveResult result = remove(bucket, timestamp, docId, context); + onComplete->onComplete(std::make_unique<RemoveResult>(result)); +} + +RemoveResult +PersistenceProvider::removeIfFound(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + removeIfFoundAsync(bucket, timestamp, docId, context, std::move(catcher)); + return dynamic_cast<const RemoveResult &>(*future.get()); +} + +void +PersistenceProvider::removeIfFoundAsync(const Bucket &bucket, Timestamp timestamp, const DocumentId & docId, Context &context, + OperationComplete::UP onComplete) +{ + RemoveResult result = removeIfFound(bucket, timestamp, docId, context); + onComplete->onComplete(std::make_unique<RemoveResult>(result)); +} + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 338fa6e03b0..6a5e3e05933 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -169,7 +169,9 @@ struct PersistenceProvider * @param timestamp The timestamp for the new bucket entry. * @param id The ID to remove */ - virtual RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) = 0; + virtual RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&); + virtual void removeAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP); + /** * @see remove() * <p/> @@ -184,7 +186,8 @@ struct PersistenceProvider * @param timestamp The timestamp for the new bucket entry. * @param id The ID to remove */ - virtual RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) = 0; + virtual RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&); + virtual void removeIfFoundAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP); /** * Remove any trace of the entry with the given timestamp. (Be it a document |