diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-05 10:06:41 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2020-05-05 10:06:41 +0000 |
commit | bbfebdcf506f8b638ce40b28e47c3657cf002055 (patch) | |
tree | a176b0bf30c7e74e3e792fa393ec3ba805c789cd | |
parent | 2b83b031718d466df9fc0c4abd89f7fe0bcdbbf8 (diff) |
Add async update and followup on PR comments.
17 files changed, 171 insertions, 158 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index 984bbcd845f..5720a0ba662 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -4,6 +4,7 @@ #include <vespa/document/select/parser.h> #include <vespa/document/base/documentid.h> #include <vespa/document/fieldvalue/document.h> +#include <vespa/document/update/documentupdate.h> #include <vespa/document/bucket/fixed_bucket_spaces.h> #include <vespa/vespalib/util/crc.h> #include <vespa/document/fieldset/fieldsetrepo.h> @@ -30,7 +31,7 @@ BucketContent::BucketContent() _outdatedInfo(true), _active(false) { } -BucketContent::~BucketContent() { } +BucketContent::~BucketContent() = default; uint32_t BucketContent::computeEntryChecksum(const BucketEntry& e) const @@ -306,11 +307,10 @@ DummyPersistence::DummyPersistence( _clusterState() {} -DummyPersistence::~DummyPersistence() {} +DummyPersistence::~DummyPersistence() = default; document::select::Node::UP -DummyPersistence::parseDocumentSelection(const string& documentSelection, - bool allowLeaf) +DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf) { document::select::Node::UP ret; try { @@ -464,6 +464,37 @@ DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&) return Result(); } +UpdateResult +DummyPersistence::update(const Bucket& bucket, Timestamp ts, DocumentUpdateSP upd, Context& context) +{ + GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context); + + if (getResult.hasError()) { + return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage()); + } + + auto docToUpdate = getResult.getDocumentPtr(); + Timestamp updatedTs = getResult.getTimestamp(); + if (!docToUpdate) { + if (!upd->getCreateIfNonExistent()) { + return UpdateResult(); + } else { + docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId()); + updatedTs = ts; + } + } + + upd->applyTo(*docToUpdate); + + Result putResult = put(bucket, ts, std::move(docToUpdate), context); + + if (putResult.hasError()) { + return UpdateResult(putResult.getErrorCode(), putResult.getErrorMessage()); + } + + return UpdateResult(updatedTs); +} + RemoveResult DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) { diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 27ed95bd6ee..94c2e1cd9a4 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -156,6 +156,7 @@ public: Result put(const Bucket&, Timestamp, DocumentSP, Context&) override; GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override; + UpdateResult update(const Bucket&, Timestamp, DocumentUpdateSP, Context&) override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fs, diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index 99a8b244431..45c2c7901f9 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -8,39 +8,6 @@ namespace storage::spi { -UpdateResult -AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts, - const DocumentUpdate::SP& upd, Context& context) -{ - GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context); - - if (getResult.hasError()) { - return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage()); - } - - auto docToUpdate = getResult.getDocumentPtr(); - Timestamp updatedTs = getResult.getTimestamp(); - if (!docToUpdate) { - if (!upd->getCreateIfNonExistent()) { - return UpdateResult(); - } else { - docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId()); - updatedTs = ts; - } - } - - upd->applyTo(*docToUpdate); - - Result putResult = put(bucket, ts, std::move(docToUpdate), context); - - if (putResult.hasError()) { - return UpdateResult(putResult.getErrorCode(), - putResult.getErrorMessage()); - } - - return UpdateResult(updatedTs); -} - RemoveResult AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp, const DocumentId& id, Context& context) @@ -50,7 +17,7 @@ AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp, void AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp, - const DocumentId& id, Context& context, OperationComplete::UP onComplete) + const DocumentId& id, Context& context, OperationComplete::UP onComplete) { removeAsync(b, timestamp, id, context, std::move(onComplete)); } diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h index 9de96944c31..813050222a9 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h @@ -20,12 +20,6 @@ public: Result initialize() override { return Result(); }; /** - * Updates the document by calling get(), updating the document, - * then calling put() on the result. - */ - UpdateResult update(const Bucket&, Timestamp, const DocumentUpdateSP&, Context&) override; - - /** * Default impl empty. */ Result createBucket(const Bucket&, Context&) override { return Result(); } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index b00ae340fa6..38fcd2fd072 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -74,4 +74,20 @@ PersistenceProvider::removeIfFoundAsync(const Bucket &bucket, Timestamp timestam onComplete->onComplete(std::make_unique<RemoveResult>(result)); } +UpdateResult +PersistenceProvider::update(const Bucket& bucket, Timestamp timestamp, DocumentUpdateSP upd, Context& context) { + auto catcher = std::make_unique<CatchResult>(); + auto future = catcher->future_result(); + updateAsync(bucket, timestamp, std::move(upd), context, std::move(catcher)); + return dynamic_cast<const UpdateResult &>(*future.get()); +} + +void +PersistenceProvider::updateAsync(const Bucket &bucket, Timestamp timestamp, DocumentUpdateSP upd, Context &context, + OperationComplete::UP onComplete) +{ + UpdateResult result = update(bucket, timestamp, std::move(upd), context); + onComplete->onComplete(std::make_unique<UpdateResult>(result)); +} + } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 6a5e3e05933..2e7d215fc33 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -204,7 +204,8 @@ struct PersistenceProvider * @param timestamp The timestamp to use for the new update entry. * @param update The document update to apply to the stored document. */ - virtual UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) = 0; + virtual UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&); + virtual void updateAsync(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&, OperationComplete::UP); /** * Retrieves the latest version of the document specified by the diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index 8d8665b645e..aa87c383c33 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -83,7 +83,7 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&) return errorResult; } -UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&) +UpdateResult DownPersistence::update(const Bucket&, Timestamp, DocumentUpdate::SP, Context&) { return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 2716b0ee3b4..10e3d9c1ad7 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -36,7 +36,7 @@ public: RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; Result removeEntry(const Bucket&, Timestamp, Context&) override; - UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override; + UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&) override; GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet, diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 4fb4abcb7c5..2927457a6a5 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -458,12 +458,12 @@ TEST_F("require that puts are routed to handler", SimpleFixture) storage::spi::LoadType loadType(0, "default"); Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); f.engine.put(bucket1, tstamp1, doc1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); f.engine.put(bucket1, tstamp1, doc2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), f.engine.put(bucket1, tstamp1, doc3, context)); @@ -490,14 +490,14 @@ TEST_F("require that updates are routed to handler", SimpleFixture) Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); f.hset.handler1.setExistingTimestamp(tstamp2); UpdateResult ur = f.engine.update(bucket1, tstamp1, upd1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_EQUAL(tstamp2, ur.getExistingTimestamp()); f.hset.handler2.setExistingTimestamp(tstamp3); ur = f.engine.update(bucket1, tstamp1, upd2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_EQUAL(tstamp3, ur.getExistingTimestamp()); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), @@ -533,31 +533,31 @@ TEST_F("require that removes are routed to handlers", SimpleFixture) storage::spi::LoadType loadType(0, "default"); Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); RemoveResult rr = f.engine.remove(bucket1, tstamp1, docId3, context); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_FALSE(rr.wasFound()); EXPECT_TRUE(rr.hasError()); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), rr); f.hset.handler1.setExistingTimestamp(tstamp2); rr = f.engine.remove(bucket1, tstamp1, docId1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_TRUE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); f.hset.handler1.setExistingTimestamp(tstamp0); f.hset.handler2.setExistingTimestamp(tstamp3); rr = f.engine.remove(bucket1, tstamp1, docId2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_TRUE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); f.hset.handler2.setExistingTimestamp(tstamp0); rr = f.engine.remove(bucket1, tstamp1, docId2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_FALSE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 1b64578607f..20c32e0048a 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -368,34 +368,34 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d } -PersistenceEngine::UpdateResult -PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP& upd, Context&) +void +PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd, Context&, OperationComplete::UP onComplete) { if (!_writeFilter.acceptWriteOperation()) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation()) { - return UpdateResult(Result::ErrorType::RESOURCE_EXHAUSTED, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED, make_string("Update operation rejected for document '%s': '%s'", - upd->getId().toString().c_str(), state.message().c_str())); + upd->getId().toString().c_str(), state.message().c_str()))); } } try { upd->eagerDeserialize(); } catch (document::FieldNotFoundException & e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'", - upd->getId().toString().c_str(), upd->getType().getName().c_str())); + upd->getId().toString().c_str(), upd->getType().getName().c_str()))); } catch (document::DocumentTypeNotFoundException & e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s'.", - upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())); + upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()))); } catch (document::WrongTensorTypeException &e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'", upd->getId().toString().c_str(), upd->getType().getName().c_str(), - e.getMessage().c_str())); + e.getMessage().c_str()))); } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(upd->getType()); @@ -403,24 +403,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(), upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false")); if (!upd->getId().hasDocType()) { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()))); } if (upd->getId().getDocType() != docType.getName()) { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()))); } IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); - if (handler) { - TransportLatch latch(1); - LOG(debug, "update = %s", upd->toXml().c_str()); - handler->handleUpdate(feedtoken::make(latch), b, t, upd); - latch.await(); - return latch.getUpdateResult(); - } else { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())); + if ( handler == nullptr) { + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } + auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index a5a0c164499..230f8c411aa 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -103,7 +103,7 @@ public: BucketInfoResult getBucketInfo(const Bucket&) const override; void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override; - UpdateResult update(const Bucket&, Timestamp, const std::shared_ptr<document::DocumentUpdate>&, Context&) override; + void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override; GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet&, const Selection&, IncludedVersions, Context&) override; diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 862d1fb758a..13a254e7a76 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -124,12 +124,12 @@ PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket, spi::UpdateResult PersistenceProviderWrapper::update(const spi::Bucket& bucket, spi::Timestamp timestamp, - const document::DocumentUpdate::SP& upd, + document::DocumentUpdate::SP upd, spi::Context& context) { LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")"); CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE); - return _spi.update(bucket, timestamp, upd, context); + return _spi.update(bucket, timestamp, std::move(upd), context); } spi::GetResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 25365e64bfc..9ca08becabe 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -91,9 +91,8 @@ public: spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; - spi::GetResult get(const spi::Bucket&, const document::FieldSet&, - const spi::DocumentId&, spi::Context&) const override ; + spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; + spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index ca441c71816..578a90081c7 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -171,7 +171,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -184,7 +184,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -196,7 +196,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -205,7 +205,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index f281f14e884..a77eea1073e 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -176,19 +176,17 @@ PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) return trackerUP; } + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); if (_sequencedExecutor == nullptr) { - spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), - tracker.context()); + spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context()); tracker.checkForError(response); } else { - _spi.putAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), spi::Timestamp(cmd.getTimestamp()), - std::move(cmd.getDocument()), tracker.context(), - std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), - makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); - tracker->sendReply(); - }))); + auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } return trackerUP; } @@ -205,10 +203,9 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac return trackerUP; } + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); if (_sequencedExecutor == nullptr) { - spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), - tracker.context()); + spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context()); if (tracker.checkForError(response)) { tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } @@ -216,42 +213,56 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac metrics.notFound.inc(); } } else { - _spi.removeIfFoundAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), - std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), - makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { - const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); - if (tracker->checkForError(response)) { - tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - } - if (!response.wasFound()) { - metrics.notFound.inc(); - } - tracker->sendReply(); - }))); + auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP); + if (tracker->checkForError(response)) { + tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); + } + if (!response.wasFound()) { + metrics.notFound.inc(); + } + tracker->sendReply(); + }); + _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), + std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } return trackerUP; } MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker) +PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) { + MessageTracker & tracker = *trackerUP; auto& metrics = _env._metrics.update[cmd.getLoadType()]; - tracker->setMetric(metrics); + tracker.setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) { - return tracker; + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) { + return trackerUP; } - - spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context()); - if (tracker->checkForError(response)) { - auto reply = std::make_shared<api::UpdateReply>(cmd); - reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(std::move(reply)); + + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); + if (_sequencedExecutor == nullptr) { + spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context()); + if (tracker.checkForError(response)) { + auto reply = std::make_shared<api::UpdateReply>(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker.setReply(std::move(reply)); + } + } else { + auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + const spi::UpdateResult & response = dynamic_cast<const spi::UpdateResult &>(*responseUP); + if (tracker->checkForError(response)) { + auto reply = std::make_shared<api::UpdateReply>(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker->setReply(std::move(reply)); + } + tracker->sendReply(); + }); + _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), + std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } - return tracker; + return trackerUP; } namespace { diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 204d32b00ca..a5564282d17 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -84,47 +84,35 @@ ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::Doc } spi::RemoveResult -ProviderErrorWrapper::remove(const spi::Bucket& bucket, - spi::Timestamp ts, - const document::DocumentId& docId, - spi::Context& context) +ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) { return checkResult(_impl.remove(bucket, ts, docId, context)); } spi::RemoveResult -ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, - spi::Timestamp ts, - const document::DocumentId& docId, - spi::Context& context) +ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts, + const document::DocumentId& docId, spi::Context& context) { return checkResult(_impl.removeIfFound(bucket, ts, docId, context)); } spi::UpdateResult -ProviderErrorWrapper::update(const spi::Bucket& bucket, - spi::Timestamp ts, - const spi::DocumentUpdateSP& docUpdate, - spi::Context& context) +ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts, + spi::DocumentUpdateSP docUpdate, spi::Context& context) { - return checkResult(_impl.update(bucket, ts, docUpdate, context)); + return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context)); } spi::GetResult -ProviderErrorWrapper::get(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const document::DocumentId& docId, - spi::Context& context) const +ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const document::DocumentId& docId, spi::Context& context) const { return checkResult(_impl.get(bucket, fieldSet, docId, context)); } spi::CreateIteratorResult -ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const spi::Selection& selection, - spi::IncludedVersions versions, - spi::Context& context) +ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const spi::Selection& selection, spi::IncludedVersions versions, spi::Context& context) { return checkResult(_impl.createIterator(bucket, fieldSet, selection, versions, context)); } @@ -187,7 +175,7 @@ ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, void ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc, - spi::Context &context, spi::OperationComplete::UP onComplete) + spi::Context &context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); _impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete)); @@ -203,10 +191,18 @@ ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, void ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, - spi::Context & context, spi::OperationComplete::UP onComplete) + spi::Context & context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); _impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete)); } +void +ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd, + spi::Context &context, spi::OperationComplete::UP onComplete) +{ + onComplete->addResultHandler(this); + _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete)); +} + } // ns storage diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 4d25de3cdab..602877e0b02 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -50,7 +50,7 @@ public: spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; + spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; @@ -76,6 +76,7 @@ public: void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; private: template <typename ResultType> |