diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-04-30 15:33:30 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-30 15:33:30 +0200 |
commit | 48ff6fe2efd6901796a9b8a0ceb8161232bcea15 (patch) | |
tree | 23fe33f72f246bb19b938acd0b719e5515dada9d /searchcore | |
parent | 6e35291a4a5d190e0c09e627e99a5af09b00e33b (diff) | |
parent | 704b6635543a6e0b1489f6371de865383575c6e3 (diff) |
Merge pull request #13113 from vespa-engine/balder/make-put-async
- Add async interface to put
Diffstat (limited to 'searchcore')
21 files changed, 228 insertions, 138 deletions
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index d4ec9cee395..511d0e3fee8 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -19,9 +19,7 @@ DownPersistence::DownPersistence(const vespalib::string &downReason) { } -DownPersistence::~DownPersistence() -{ -} +DownPersistence::~DownPersistence() = default; Result DownPersistence::initialize() @@ -40,8 +38,7 @@ DownPersistence::getPartitionStates() const BucketIdListResult DownPersistence::listBuckets(BucketSpace, PartitionId) const { - return BucketIdListResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result @@ -59,30 +56,25 @@ DownPersistence:: setActiveState(const Bucket&, BucketInfo::ActiveState) BucketInfoResult DownPersistence:: getBucketInfo(const Bucket&) const { - return BucketInfoResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return BucketInfoResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result -DownPersistence::put(const Bucket&, Timestamp, const Document::SP&, Context&) +DownPersistence::put(const Bucket&, Timestamp, Document::SP, Context&) { return errorResult; } RemoveResult -DownPersistence:: remove(const Bucket&, Timestamp, - const DocumentId&, Context&) +DownPersistence:: remove(const Bucket&, Timestamp, const DocumentId&, Context&) { - return RemoveResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } RemoveResult -DownPersistence::removeIfFound(const Bucket&, Timestamp, - const DocumentId&, Context&) +DownPersistence::removeIfFound(const Bucket&, Timestamp,const DocumentId&, Context&) { - return RemoveResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return RemoveResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result @@ -91,35 +83,28 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&) return errorResult; } -UpdateResult DownPersistence::update(const Bucket&, Timestamp, - const DocumentUpdate::SP&, Context&) +UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&) { - return UpdateResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } GetResult -DownPersistence::get(const Bucket&, const document::FieldSet&, - const DocumentId&, Context&) const +DownPersistence::get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const { - return GetResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return GetResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } CreateIteratorResult DownPersistence::createIterator(const Bucket&, const document::FieldSet&, - const Selection&, IncludedVersions, - Context&) + const Selection&, IncludedVersions, Context&) { - return CreateIteratorResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return CreateIteratorResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } IterateResult DownPersistence::iterate(IteratorId, uint64_t, Context&) const { - return IterateResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return IterateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } Result @@ -144,8 +129,7 @@ DownPersistence::deleteBucket(const Bucket&, Context&) BucketIdListResult DownPersistence::getModifiedBuckets(BucketSpace) const { - return BucketIdListResult(errorResult.getErrorCode(), - errorResult.getErrorMessage()); + return BucketIdListResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 8cdac7aaa1b..2ae0605fdb6 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -32,7 +32,7 @@ public: Result setClusterState(BucketSpace, const ClusterState&) override; Result setActiveState(const Bucket&, BucketInfo::ActiveState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; - Result put(const Bucket&, Timestamp, const DocumentSP&, Context&) override; + Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; Result removeEntry(const Bucket&, Timestamp, Context&) override; diff --git a/searchcore/src/tests/proton/docsummary/docsummary.cpp b/searchcore/src/tests/proton/docsummary/docsummary.cpp index 55d524519b8..e1785e1e48d 100644 --- a/searchcore/src/tests/proton/docsummary/docsummary.cpp +++ b/searchcore/src/tests/proton/docsummary/docsummary.cpp @@ -255,8 +255,7 @@ public: storage::spi::Timestamp ts(0); DbDocumentId dbdId(lid); DbDocumentId prevDbdId(0); - document::Document::SP xdoc(new document::Document(doc)); - PutOperation op(bucketId, ts, xdoc); + PutOperation op(bucketId, ts, std::make_shared<document::Document>(doc)); op.setSerialNum(serialNum); op.setDbDocumentId(dbdId); op.setPrevDbDocumentId(prevDbdId); diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 796ec4436fe..18235116d27 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -544,7 +544,7 @@ TEST_F("require that outdated remove is ignored", FeedHandlerFixture) TEST_F("require that outdated put is ignored", FeedHandlerFixture) { DocumentContext doc_context("id:ns:searchdocument::foo", *f.schema.builder); - auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), doc_context.doc); + auto op =std::make_unique<PutOperation>(doc_context.bucketId, Timestamp(10), std::move(doc_context.doc)); static_cast<DocumentOperation &>(*op).setPrevTimestamp(Timestamp(10000)); FeedTokenContext token_context; f.handler.performOperation(std::move(token_context.token), std::move(op)); @@ -673,7 +673,7 @@ TEST_F("require that put is rejected if resource limit is reached", FeedHandlerF f.writeFilter._message = "Attribute resource limit reached"; DocumentContext docCtx("id:test:searchdocument::foo", *f.schema.builder); - auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), docCtx.doc); + auto op = std::make_unique<PutOperation>(docCtx.bucketId, Timestamp(10), std::move(docCtx.doc)); FeedTokenContext token; f.handler.performOperation(std::move(token.token), std::move(op)); EXPECT_EQUAL(0, f.feedView.put_count); @@ -801,7 +801,7 @@ TEST_F("require that put with different document type repo is ok", FeedHandlerFi TwoFieldsSchemaContext schema; DocumentContext doc_context("id:ns:searchdocument::foo", *schema.builder); auto op = std::make_unique<PutOperation>(doc_context.bucketId, - Timestamp(10), doc_context.doc); + Timestamp(10), std::move(doc_context.doc)); FeedTokenContext token_context; EXPECT_EQUAL(schema.getRepo().get(), op->getDocument()->getRepo()); EXPECT_NOT_EQUAL(f.schema.getRepo().get(), op->getDocument()->getRepo()); diff --git a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp index 5fffd70f11d..53fdcb7757f 100644 --- a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp +++ b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp @@ -190,7 +190,7 @@ TEST("require that toString() on derived classes are meaningful") PutOperation().toString()); EXPECT_EQUAL("Put(id::::, BucketId(0x000000000000002a), timestamp=10, dbdId=(subDbId=0, lid=0), " "prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)", - PutOperation(bucket_id1, timestamp, doc).toString()); + PutOperation(bucket_id1, timestamp, std::move(doc)).toString()); EXPECT_EQUAL("Remove(id::::, BucketId(0x0000000000000000), timestamp=0, dbdId=(subDbId=0, lid=0), " "prevDbdId=(subDbId=0, lid=0), prevMarkedAsRemoved=false, prevTimestamp=0, serialNum=0)", diff --git a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp index 35590cc68f6..0c72d11899a 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistence_handler_map/persistence_handler_map_test.cpp @@ -15,7 +15,7 @@ using HandlerSnapshot = PersistenceHandlerMap::HandlerSnapshot; struct DummyPersistenceHandler : public IPersistenceHandler { using SP = std::shared_ptr<DummyPersistenceHandler>; void initialize() override {} - void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::Document::SP &) override {} + void handlePut(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, DocumentSP) override {} void handleUpdate(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentUpdate::SP &) override {} void handleRemove(FeedToken, const storage::spi::Bucket &, storage::spi::Timestamp, const document::DocumentId &) override {} void handleListBuckets(IBucketIdListResultHandler &) override {} diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 19d9d41c3e4..4fb4abcb7c5 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -9,7 +9,6 @@ #include <vespa/document/update/documentupdate.h> #include <vespa/persistence/spi/documentselection.h> #include <vespa/persistence/spi/test.h> -#include <vespa/persistence/spi/test.h> #include <vespa/searchcore/proton/persistenceengine/bucket_guard.h> #include <vespa/searchcore/proton/persistenceengine/ipersistenceengineowner.h> #include <vespa/searchcore/proton/persistenceengine/persistenceengine.h> @@ -199,7 +198,7 @@ struct MyHandler : public IPersistenceHandler, IBucketFreezer { void initialize() override { initialized = true; } void handlePut(FeedToken token, const Bucket& bucket, - Timestamp timestamp, const document::Document::SP& doc) override { + Timestamp timestamp, DocumentSP doc) override { token->setResult(ResultUP(new storage::spi::Result()), false); handle(token, bucket, timestamp, doc->getId()); } diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index b6223dd41ab..e0bc7cb551f 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -35,4 +35,8 @@ State::fail() } } +OwningState::~OwningState() { + ack(); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h index ab3fdea3345..cf6c1f5a7e9 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.h +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.h @@ -9,42 +9,68 @@ namespace proton { typedef std::unique_ptr<storage::spi::Result> ResultUP; +namespace feedtoken { + /** * This class is used by the FeedEngine to encapsulate the necessary information * for an IFeedHandler to perform an async reply to an operation. A unique * instance of this class is passed to every invokation of the IFeedHandler. */ -namespace feedtoken { - class ITransport { - public: - virtual ~ITransport() { } - virtual void send(ResultUP result, bool documentWasFound) = 0; - }; - - class State : public search::IDestructorCallback { - public: - State(const State &) = delete; - State & operator = (const State &) = delete; - State(ITransport & transport); - ~State() override; - void fail(); - void setResult(ResultUP result, bool documentWasFound) { - _documentWasFound = documentWasFound; - _result = std::move(result); - } - const storage::spi::Result &getResult() { return *_result; } - private: - void ack(); - ITransport &_transport; - ResultUP _result; - bool _documentWasFound; - std::atomic<bool> _alreadySent; - }; - - inline std::shared_ptr<State> - make(ITransport & latch) { - return std::make_shared<State>(latch); +class ITransport { +public: + virtual ~ITransport() { } + virtual void send(ResultUP result, bool documentWasFound) = 0; +}; + +/** + * This holds the result of the feed operation until it is either failed or acked. + * Guarantees that the result is propagated back to the invoker via ITransport interface. + */ +class State : public search::IDestructorCallback { +public: + State(const State &) = delete; + State & operator = (const State &) = delete; + State(ITransport & transport); + ~State() override; + void fail(); + void setResult(ResultUP result, bool documentWasFound) { + _documentWasFound = documentWasFound; + _result = std::move(result); } + const storage::spi::Result &getResult() { return *_result; } +protected: + void ack(); +private: + ITransport &_transport; + ResultUP _result; + bool _documentWasFound; + std::atomic<bool> _alreadySent; +}; + +/** + * This takes ownership ov the transport object, so that it can be used fully asynchronous + * without invoker needing to hold any state. + */ +class OwningState : public State { +public: + OwningState(std::unique_ptr<ITransport> transport) + : State(*transport), + _owned(std::move(transport)) + {} + ~OwningState() override; +private: + std::unique_ptr<ITransport> _owned; +}; + +inline std::shared_ptr<State> +make(ITransport & latch) { + return std::make_shared<State>(latch); +} +inline std::shared_ptr<State> +make(std::unique_ptr<ITransport> transport) { + return std::make_shared<OwningState>(std::move(transport)); +} + } using FeedToken = std::shared_ptr<feedtoken::State>; diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp index f1161c8ebdd..56224889b78 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.cpp @@ -26,7 +26,7 @@ DocumentOperation::DocumentOperation(Type type) } -DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const Timestamp ×tamp) +DocumentOperation::DocumentOperation(Type type, BucketId bucketId, Timestamp timestamp) : FeedOperation(type), _bucketId(bucketId), _timestamp(timestamp), @@ -38,6 +38,8 @@ DocumentOperation::DocumentOperation(Type type, const BucketId &bucketId, const { } +DocumentOperation::~DocumentOperation() = default; + void DocumentOperation::assertValidBucketId(const document::DocumentId &docId) const { diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h index 6847dbfd943..044d44b8276 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/documentoperation.h @@ -22,15 +22,14 @@ protected: DocumentOperation(Type type); - DocumentOperation(Type type, const document::BucketId &bucketId, - const storage::spi::Timestamp ×tamp); + DocumentOperation(Type type, document::BucketId bucketId, storage::spi::Timestamp timestamp); void assertValidBucketId(const document::DocumentId &docId) const; void assertValidBucketId(const document::GlobalId &docId) const; vespalib::string docArgsToString() const; public: - ~DocumentOperation() override {} + ~DocumentOperation() override; const document::BucketId &getBucketId() const { return _bucketId; } storage::spi::Timestamp getTimestamp() const { return _timestamp; } diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp index efac7297c67..dd001348093 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp @@ -17,16 +17,12 @@ PutOperation::PutOperation() { } -PutOperation::PutOperation(const BucketId &bucketId, - const Timestamp ×tamp, - const Document::SP &doc) - : DocumentOperation(FeedOperation::PUT, - bucketId, - timestamp), - _doc(doc) +PutOperation::PutOperation(BucketId bucketId, Timestamp timestamp, Document::SP doc) + : DocumentOperation(FeedOperation::PUT, bucketId, timestamp), + _doc(std::move(doc)) { } -PutOperation::~PutOperation() { } +PutOperation::~PutOperation() = default; void PutOperation::serialize(vespalib::nbostream &os) const @@ -40,8 +36,7 @@ PutOperation::serialize(vespalib::nbostream &os) const void -PutOperation::deserialize(vespalib::nbostream &is, - const DocumentTypeRepo &repo) +PutOperation::deserialize(vespalib::nbostream &is, const DocumentTypeRepo &repo) { DocumentOperation::deserialize(is, repo); size_t oldSize = is.size(); diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h index 33330692fab..5ca6d755e49 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h @@ -12,17 +12,16 @@ class PutOperation : public DocumentOperation public: PutOperation(); - PutOperation(const document::BucketId &bucketId, - const storage::spi::Timestamp ×tamp, - const DocumentSP &doc); - virtual ~PutOperation(); + PutOperation(document::BucketId bucketId, + storage::spi::Timestamp timestamp, + DocumentSP doc); + ~PutOperation() override; const DocumentSP &getDocument() const { return _doc; } void assertValid() const; - virtual void serialize(vespalib::nbostream &os) const override; - virtual void deserialize(vespalib::nbostream &is, - const document::DocumentTypeRepo &repo) override; + void serialize(vespalib::nbostream &os) const override; + void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override; void deserializeDocument(const document::DocumentTypeRepo &repo); - virtual vespalib::string toString() const override; + vespalib::string toString() const override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h index 82a9c8174fa..c95f51b29dc 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/ipersistencehandler.h @@ -42,7 +42,7 @@ public: virtual void initialize() = 0; virtual void handlePut(FeedToken token, const storage::spi::Bucket &bucket, - storage::spi::Timestamp timestamp, const DocumentSP &doc) = 0; + storage::spi::Timestamp timestamp, DocumentSP doc) = 0; virtual void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket, storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) = 0; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 2ede5d45f7e..48074d491c8 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -64,7 +64,7 @@ public: if (result.hasError()) { std::lock_guard<std::mutex> guard(_lock); if (_result.hasError()) { - _result = TransportLatch::mergeErrorResults(_result, result); + _result = TransportMerger::mergeErrorResults(_result, result); } else { _result = result; } @@ -319,34 +319,32 @@ PersistenceEngine::getBucketInfo(const Bucket& b) const } -Result -PersistenceEngine::put(const Bucket& b, Timestamp t, const document::Document::SP& doc, Context&) +void +PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::DocumentSP doc, Context &, OperationComplete::UP onComplete) { if (!_writeFilter.acceptWriteOperation()) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation()) { - return Result(Result::ErrorType::RESOURCE_EXHAUSTED, + return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::RESOURCE_EXHAUSTED, make_string("Put operation rejected for document '%s': '%s'", - doc->getId().toString().c_str(), state.message().c_str())); + doc->getId().toString().c_str(), state.message().c_str()))); } } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(doc->getType()); - LOG(spam, "put(%s, %" PRIu64 ", (\"%s\", \"%s\"))", b.toString().c_str(), static_cast<uint64_t>(t.getValue()), + LOG(spam, "putAsync(%s, %" PRIu64 ", (\"%s\", \"%s\"))", bucket.toString().c_str(), static_cast<uint64_t>(ts.getValue()), docType.toString().c_str(), doc->getId().toString().c_str()); if (!doc->getId().hasDocType()) { - return Result(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str())); + return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", doc->getId().toString().c_str()))); } - IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); + IPersistenceHandler * handler = getHandler(rguard, bucket.getBucketSpace(), docType); if (!handler) { - return Result(Result::ErrorType::PERMANENT_ERROR, - make_string("No handler for document type '%s'", docType.toString().c_str())); + return onComplete->onComplete(std::make_unique<Result>(Result::ErrorType::PERMANENT_ERROR, + make_string("No handler for document type '%s'", docType.toString().c_str()))); } - TransportLatch latch(1); - handler->handlePut(feedtoken::make(latch), b, t, doc); - latch.await(); - return latch.getResult(); + auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + handler->handlePut(feedtoken::make(std::move(transportContext)), bucket, ts, std::move(doc)); } PersistenceEngine::RemoveResult diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index 5d3be07c532..d28a00b909e 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -38,6 +38,7 @@ private: using Timestamp = storage::spi::Timestamp; using TimestampList = storage::spi::TimestampList; using UpdateResult = storage::spi::UpdateResult; + using OperationComplete = storage::spi::OperationComplete; struct IteratorEntry { PersistenceHandlerSequence handler_sequence; @@ -100,7 +101,7 @@ public: Result setClusterState(BucketSpace bucketSpace, const ClusterState& calc) override; Result setActiveState(const Bucket& bucket, BucketInfo::ActiveState newState) override; BucketInfoResult getBucketInfo(const Bucket&) const override; - Result put(const Bucket&, Timestamp, const std::shared_ptr<document::Document>&, Context&) override; + void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; RemoveResult remove(const Bucket&, Timestamp, const document::DocumentId&, Context&) override; UpdateResult update(const Bucket&, Timestamp, const std::shared_ptr<document::DocumentUpdate>&, Context&) override; diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp index 4a5dfb0a2a5..64159841efe 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp @@ -8,10 +8,50 @@ using storage::spi::Result; namespace proton { +std::unique_ptr<std::mutex> createOptionalLock(bool needLocking) { + return needLocking + ? std::make_unique<std::mutex>() + : std::unique_ptr<std::mutex>(); +} +TransportMerger::TransportMerger(bool needLocking) + : _result(), + _lock(createOptionalLock(needLocking)) +{ +} +TransportMerger::~TransportMerger() = default; + +void +TransportMerger::mergeResult(ResultUP result, bool documentWasFound) { + if (_lock) { + std::lock_guard<std::mutex> guard(*_lock); + mergeWithLock(std::move(result), documentWasFound); + } else { + mergeWithLock(std::move(result), documentWasFound); + } +} + +void +TransportMerger::mergeWithLock(ResultUP result, bool documentWasFound) { + if (!_result) { + _result = std::move(result); + } else if (result->hasError()) { + _result = std::make_unique<Result>(mergeErrorResults(*_result, *result)); + } else if (documentWasFound) { + _result = std::move(result); + } + completeIfDone(); +} + +Result +TransportMerger::mergeErrorResults(const Result &lhs, const Result &rhs) +{ + Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode(); + return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); +} + TransportLatch::TransportLatch(uint32_t cnt) - : _latch(cnt), - _lock(), - _result() + : TransportMerger(cnt > 1), + _latch(cnt) { if (cnt == 0u) { _result = std::make_unique<Result>(); @@ -23,24 +63,33 @@ TransportLatch::~TransportLatch() = default; void TransportLatch::send(ResultUP result, bool documentWasFound) { - { - std::lock_guard<std::mutex> guard(_lock); - if (!_result) { - _result = std::move(result); - } else if (result->hasError()) { - _result.reset(new Result(mergeErrorResults(*_result, *result))); - } else if (documentWasFound) { - _result = std::move(result); - } - } + mergeResult(std::move(result), documentWasFound); _latch.countDown(); } -Result -TransportLatch::mergeErrorResults(const Result &lhs, const Result &rhs) +AsyncTranportContext::AsyncTranportContext(uint32_t cnt, OperationComplete::UP onComplete) + : TransportMerger(cnt > 1), + _countDown(cnt), + _onComplete(std::move(onComplete)) { - Result::ErrorType error = (lhs.getErrorCode() > rhs.getErrorCode() ? lhs : rhs).getErrorCode(); - return Result(error, make_string("%s, %s", lhs.getErrorMessage().c_str(), rhs.getErrorMessage().c_str())); + if (cnt == 0u) { + _onComplete->onComplete(std::make_unique<Result>()); + } +} + +void +AsyncTranportContext::completeIfDone() { + _countDown--; + if (_countDown == 0) { + _onComplete->onComplete(std::make_unique<Result>()); + } +} +AsyncTranportContext::~AsyncTranportContext() = default; + +void +AsyncTranportContext::send(ResultUP result, bool documentWasFound) +{ + mergeResult(std::move(result), documentWasFound); } } // proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h index 2fb55b14fdf..da4e19d3584 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h @@ -10,21 +10,38 @@ namespace proton { /** + * Base implementation for merging results from multiple sources. + */ + +class TransportMerger : public feedtoken::ITransport { +public: + using Result = storage::spi::Result; + static Result mergeErrorResults(const Result &lhs, const Result &rhs); +protected: + TransportMerger(bool needLocking); + ~TransportMerger() override; + void mergeResult(ResultUP result, bool documentWasFound); + virtual void completeIfDone() { } // Called with lock held if necessary on every merge + ResultUP _result; + +private: + void mergeWithLock(ResultUP result, bool documentWasFound); + std::unique_ptr<std::mutex> _lock; +}; + +/** * Implementation of FeedToken::ITransport for handling the async reply for an operation. * Uses an internal count down latch to keep track the number of outstanding replies. */ -class TransportLatch : public feedtoken::ITransport { +class TransportLatch : public TransportMerger { private: - using Result = storage::spi::Result; using UpdateResult = storage::spi::UpdateResult; using RemoveResult = storage::spi::RemoveResult; vespalib::CountDownLatch _latch; - std::mutex _lock; - ResultUP _result; public: TransportLatch(uint32_t cnt); - ~TransportLatch(); + ~TransportLatch() override; void send(ResultUP result, bool documentWasFound) override; void await() { _latch.await(); @@ -38,7 +55,25 @@ public: const RemoveResult &getRemoveResult() const { return dynamic_cast<const RemoveResult &>(*_result); } - static Result mergeErrorResults(const Result &lhs, const Result &rhs); + +}; + +/** + * Implementation of FeedToken::ITransport for async handling of the async reply for an operation. + * Uses an internal count to keep track the number of the outstanding replies. + */ +class AsyncTranportContext : public TransportMerger { +private: + using Result = storage::spi::Result; + using OperationComplete = storage::spi::OperationComplete; + + int _countDown; + OperationComplete::UP _onComplete; + void completeIfDone() override; +public: + AsyncTranportContext(uint32_t cnt, OperationComplete::UP); + ~AsyncTranportContext() override; + void send(ResultUP result, bool documentWasFound) override; }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index 362de7ee780..4a87ee0009c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -154,7 +154,7 @@ FeedHandler::createNonExistingDocument(FeedToken token, const UpdateOperation &o auto doc = make_shared<Document>(op.getUpdate()->getType(), op.getUpdate()->getId()); doc->setRepo(*_activeFeedView->getDocumentTypeRepo()); op.getUpdate()->applyTo(*doc); - PutOperation putOp(op.getBucketId(), op.getTimestamp(), doc); + PutOperation putOp(op.getBucketId(), op.getTimestamp(), std::move(doc)); _activeFeedView->preparePut(putOp); storeOperation(putOp, token); if (token) { diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp index 16af4e87795..ba0db2bb77d 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.cpp @@ -39,9 +39,9 @@ PersistenceHandlerProxy::initialize() } void -PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, const DocumentSP &doc) +PersistenceHandlerProxy::handlePut(FeedToken token, const Bucket &bucket, Timestamp timestamp, DocumentSP doc) { - auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, doc); + auto op = std::make_unique<PutOperation>(bucket.getBucketId().stripUnused(), timestamp, std::move(doc)); _feedHandler.handleOperation(token, std::move(op)); } diff --git a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h index 3e3b3dd6fb6..5a82b46f91c 100644 --- a/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h +++ b/searchcore/src/vespa/searchcore/proton/server/persistencehandlerproxy.h @@ -25,7 +25,7 @@ public: void initialize() override; void handlePut(FeedToken token, const storage::spi::Bucket &bucket, - storage::spi::Timestamp timestamp, const DocumentSP &doc) override; + storage::spi::Timestamp timestamp, DocumentSP doc) override; void handleUpdate(FeedToken token, const storage::spi::Bucket &bucket, storage::spi::Timestamp timestamp, const DocumentUpdateSP &upd) override; |