From eb7b71781ca079b5577a13b300beafee388bc1ce Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 28 Apr 2020 08:24:57 +0000 Subject: - Add async interface to put - Use MessageTracker for keeping context. - implement putAsync, but still use it synchronously. --- .../persistence/dummyimpl/dummypersistence.cpp | 18 +++++---------- .../vespa/persistence/dummyimpl/dummypersistence.h | 23 +++++-------------- .../spi/abstractpersistenceprovider.cpp | 2 +- .../src/vespa/persistence/spi/operationcomplete.h | 22 ++++++++++++++++++ .../vespa/persistence/spi/persistenceprovider.cpp | 26 ++++++++++++++++++++++ .../vespa/persistence/spi/persistenceprovider.h | 4 +++- 6 files changed, 62 insertions(+), 33 deletions(-) create mode 100644 persistence/src/vespa/persistence/spi/operationcomplete.h (limited to 'persistence/src') diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 6834f453695..3f05ed36802 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -435,8 +435,7 @@ DummyPersistence::getBucketInfo(const Bucket& b) const } Result -DummyPersistence::put(const Bucket& b, Timestamp t, const Document::SP& doc, - Context&) +DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "put(%s, %" PRIu64 ", %s)", @@ -461,14 +460,13 @@ DummyPersistence::put(const Bucket& b, Timestamp t, const Document::SP& doc, LOG(spam, "Inserting document %s", doc->toString(true).c_str()); - DocEntry::UP entry(new DocEntry(t, NONE, Document::UP(doc->clone()))); + auto entry = std::make_unique(t, NONE, Document::UP(doc->clone())); (*bc)->insert(std::move(entry)); return Result(); } Result -DummyPersistence::maintain(const Bucket& b, - MaintenanceLevel) +DummyPersistence::maintain(const Bucket& b, MaintenanceLevel) { assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); if (_simulateMaintainFailure) { @@ -489,10 +487,7 @@ DummyPersistence::maintain(const Bucket& b, } RemoveResult -DummyPersistence::remove(const Bucket& b, - Timestamp t, - const DocumentId& did, - Context&) +DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "remove(%s, %" PRIu64 ", %s)", @@ -518,10 +513,7 @@ DummyPersistence::remove(const Bucket& b, } GetResult -DummyPersistence::get(const Bucket& b, - const document::FieldSet& fieldSet, - const DocumentId& did, - Context&) const +DummyPersistence::get(const Bucket& b, const document::FieldSet& fieldSet, const DocumentId& did, Context&) const { DUMMYPERSISTENCE_VERIFY_INITIALIZED; LOG(debug, "get(%s, %s)", diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index c97aab822ac..88e17a90a98 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -153,16 +153,9 @@ public: Result setClusterState(BucketSpace bucketSpace, const ClusterState& newState) override; Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; - Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override; - GetResult get(const Bucket&, - const document::FieldSet& fieldSet, - const DocumentId&, - Context&) const override; - - RemoveResult remove(const Bucket& b, - Timestamp t, - const DocumentId& did, - Context&) override; + Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; + GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; + RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fs, @@ -176,15 +169,9 @@ public: Result createBucket(const Bucket&, Context&) override; Result deleteBucket(const Bucket&, Context&) override; - Result split(const Bucket& source, - const Bucket& target1, - const Bucket& target2, - Context&) override; + Result split(const Bucket& source, const Bucket& target1, const Bucket& target2, Context&) override; - Result join(const Bucket& source1, - const Bucket& source2, - const Bucket& target, - Context&) override; + Result join(const Bucket& source1, const Bucket& source2, const Bucket& target, Context&) override; Result revert(const Bucket&, Timestamp, Context&); Result maintain(const Bucket& bucket, MaintenanceLevel level) override; diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index e35a6a74bde..e7abe137b89 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -31,7 +31,7 @@ AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts, upd->applyTo(*docToUpdate); - Result putResult = put(bucket, ts, docToUpdate, context); + Result putResult = put(bucket, ts, std::move(docToUpdate), context); if (putResult.hasError()) { return UpdateResult(putResult.getErrorCode(), diff --git a/persistence/src/vespa/persistence/spi/operationcomplete.h b/persistence/src/vespa/persistence/spi/operationcomplete.h new file mode 100644 index 00000000000..1a548e613dd --- /dev/null +++ b/persistence/src/vespa/persistence/spi/operationcomplete.h @@ -0,0 +1,22 @@ +// Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include + +namespace storage::spi { + +class Result; + +/** + * This is the callback interface when using the async operations + * in the persistence provider. + */ +class OperationComplete +{ +public: + using UP = std::unique_ptr; + virtual ~OperationComplete() = default; + virtual void onComplete(std::unique_ptr result) = 0; +}; + +} \ No newline at end of file diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 61d141c0229..02fb1bb4719 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -1,9 +1,35 @@ // Copyright 2017 Yahoo Holdings. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "persistenceprovider.h" +#include namespace storage::spi { PersistenceProvider::~PersistenceProvider() = default; +class CatchResult : public OperationComplete { +public: + std::future waitResult() { + return promisedResult.get_future(); + } + void onComplete(Result::UP result) override { + promisedResult.set_value(std::move(result)); + } +private: + std::promise promisedResult; +}; +Result +PersistenceProvider::put(const Bucket& bucket, Timestamp timestamp, DocumentSP doc, Context& context) { + auto catcher = std::make_unique(); + auto future = catcher->waitResult(); + putAsync(bucket, timestamp, std::move(doc), context, std::move(catcher)); + return *future.get(); +} +void +PersistenceProvider::putAsync(const Bucket &bucket, Timestamp timestamp, DocumentSP doc, Context &context, + OperationComplete::UP onComplete) { + Result result = put(bucket, timestamp, std::move(doc), context); + onComplete->onComplete(std::make_unique(result)); +} + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index c70d5e3f1c3..70645b31902 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -10,6 +10,7 @@ #include "result.h" #include "selection.h" #include "clusterstate.h" +#include "operationcomplete.h" namespace document { class FieldSet; } @@ -109,7 +110,8 @@ struct PersistenceProvider /** * Store the given document at the given microsecond time. */ - virtual Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) = 0; + virtual Result put(const Bucket&, Timestamp, DocumentSP, Context&); + virtual void putAsync(const Bucket &, Timestamp , DocumentSP , Context &, OperationComplete::UP ); /** * This remove function assumes that there exist something to be removed. -- cgit v1.2.3