From bbfebdcf506f8b638ce40b28e47c3657cf002055 Mon Sep 17 00:00:00 2001 From: Henning Baldersheim Date: Tue, 5 May 2020 10:06:41 +0000 Subject: Add async update and followup on PR comments. --- .../common/persistenceproviderwrapper.cpp | 4 +- .../common/persistenceproviderwrapper.h | 5 +- storage/src/tests/persistence/testandsettest.cpp | 8 +- .../storage/persistence/persistencethread.cpp | 87 ++++++++++++---------- .../storage/persistence/provider_error_wrapper.cpp | 44 +++++------ .../storage/persistence/provider_error_wrapper.h | 3 +- 6 files changed, 79 insertions(+), 72 deletions(-) (limited to 'storage/src') diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 862d1fb758a..13a254e7a76 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -124,12 +124,12 @@ PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket, spi::UpdateResult PersistenceProviderWrapper::update(const spi::Bucket& bucket, spi::Timestamp timestamp, - const document::DocumentUpdate::SP& upd, + document::DocumentUpdate::SP upd, spi::Context& context) { LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")"); CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE); - return _spi.update(bucket, timestamp, upd, context); + return _spi.update(bucket, timestamp, std::move(upd), context); } spi::GetResult diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index 25365e64bfc..9ca08becabe 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -91,9 +91,8 @@ public: spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; - spi::GetResult get(const spi::Bucket&, const document::FieldSet&, - const spi::DocumentId&, spi::Context&) const override ; + spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; + spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp index ca441c71816..578a90081c7 100644 --- a/storage/src/tests/persistence/testandsettest.cpp +++ b/storage/src/tests/persistence/testandsettest.cpp @@ -171,7 +171,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) { putTestDocument(false, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID)); @@ -184,7 +184,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) { putTestDocument(true, timestampOne); auto updateUp = conditional_update_test(false, timestampTwo); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) + expectedDocEntryString(timestampTwo, testDocId), dumpBucket(BUCKET_ID)); @@ -196,7 +196,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(false, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED); EXPECT_EQ("", dumpBucket(BUCKET_ID)); } @@ -205,7 +205,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre api::Timestamp updateTimestamp = 200; auto updateUp = conditional_update_test(true, updateTimestamp); - ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK); + ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK); EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID)); assertTestDocumentFoundAndMatchesContent(NEW_CONTENT); } diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp index f281f14e884..a77eea1073e 100644 --- a/storage/src/vespa/storage/persistence/persistencethread.cpp +++ b/storage/src/vespa/storage/persistence/persistencethread.cpp @@ -176,19 +176,17 @@ PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP) return trackerUP; } + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); if (_sequencedExecutor == nullptr) { - spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), - tracker.context()); + spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context()); tracker.checkForError(response); } else { - _spi.putAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), spi::Timestamp(cmd.getTimestamp()), - std::move(cmd.getDocument()), tracker.context(), - std::make_unique(*_sequencedExecutor, cmd.getBucketId(), - makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { - tracker->checkForError(*response); - tracker->sendReply(); - }))); + auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->sendReply(); + }); + _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(), + std::make_unique(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } return trackerUP; } @@ -205,10 +203,9 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac return trackerUP; } + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); if (_sequencedExecutor == nullptr) { - spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), - tracker.context()); + spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context()); if (tracker.checkForError(response)) { tracker.setReply(std::make_shared(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); } @@ -216,42 +213,56 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac metrics.notFound.inc(); } } else { - _spi.removeIfFoundAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), - std::make_unique(*_sequencedExecutor, cmd.getBucketId(), - makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { - const spi::RemoveResult & response = dynamic_cast(*responseUP); - if (tracker->checkForError(response)) { - tracker->setReply(std::make_shared(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); - } - if (!response.wasFound()) { - metrics.notFound.inc(); - } - tracker->sendReply(); - }))); + auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + const spi::RemoveResult & response = dynamic_cast(*responseUP); + if (tracker->checkForError(response)) { + tracker->setReply(std::make_shared(cmd, response.wasFound() ? cmd.getTimestamp() : 0)); + } + if (!response.wasFound()) { + metrics.notFound.inc(); + } + tracker->sendReply(); + }); + _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(), + std::make_unique(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } return trackerUP; } MessageTracker::UP -PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker) +PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP) { + MessageTracker & tracker = *trackerUP; auto& metrics = _env._metrics.update[cmd.getLoadType()]; - tracker->setMetric(metrics); + tracker.setMetric(metrics); metrics.request_size.addValue(cmd.getApproxByteSize()); - if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) { - return tracker; + if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) { + return trackerUP; } - - spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()), - spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context()); - if (tracker->checkForError(response)) { - auto reply = std::make_shared(cmd); - reply->setOldTimestamp(response.getExistingTimestamp()); - tracker->setReply(std::move(reply)); + + spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket()); + if (_sequencedExecutor == nullptr) { + spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context()); + if (tracker.checkForError(response)) { + auto reply = std::make_shared(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker.setReply(std::move(reply)); + } + } else { + auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) { + const spi::UpdateResult & response = dynamic_cast(*responseUP); + if (tracker->checkForError(response)) { + auto reply = std::make_shared(cmd); + reply->setOldTimestamp(response.getExistingTimestamp()); + tracker->setReply(std::move(reply)); + } + tracker->sendReply(); + }); + _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(), + std::make_unique(*_sequencedExecutor, cmd.getBucketId(), std::move(task))); } - return tracker; + return trackerUP; } namespace { diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 204d32b00ca..a5564282d17 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -84,47 +84,35 @@ ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::Doc } spi::RemoveResult -ProviderErrorWrapper::remove(const spi::Bucket& bucket, - spi::Timestamp ts, - const document::DocumentId& docId, - spi::Context& context) +ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context) { return checkResult(_impl.remove(bucket, ts, docId, context)); } spi::RemoveResult -ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, - spi::Timestamp ts, - const document::DocumentId& docId, - spi::Context& context) +ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts, + const document::DocumentId& docId, spi::Context& context) { return checkResult(_impl.removeIfFound(bucket, ts, docId, context)); } spi::UpdateResult -ProviderErrorWrapper::update(const spi::Bucket& bucket, - spi::Timestamp ts, - const spi::DocumentUpdateSP& docUpdate, - spi::Context& context) +ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts, + spi::DocumentUpdateSP docUpdate, spi::Context& context) { - return checkResult(_impl.update(bucket, ts, docUpdate, context)); + return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context)); } spi::GetResult -ProviderErrorWrapper::get(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const document::DocumentId& docId, - spi::Context& context) const +ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const document::DocumentId& docId, spi::Context& context) const { return checkResult(_impl.get(bucket, fieldSet, docId, context)); } spi::CreateIteratorResult -ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, - const document::FieldSet& fieldSet, - const spi::Selection& selection, - spi::IncludedVersions versions, - spi::Context& context) +ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet, + const spi::Selection& selection, spi::IncludedVersions versions, spi::Context& context) { return checkResult(_impl.createIterator(bucket, fieldSet, selection, versions, context)); } @@ -187,7 +175,7 @@ ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts, void ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc, - spi::Context &context, spi::OperationComplete::UP onComplete) + spi::Context &context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); _impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete)); @@ -203,10 +191,18 @@ ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, void ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, - spi::Context & context, spi::OperationComplete::UP onComplete) + spi::Context & context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); _impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete)); } +void +ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd, + spi::Context &context, spi::OperationComplete::UP onComplete) +{ + onComplete->addResultHandler(this); + _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete)); +} + } // ns storage diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 4d25de3cdab..602877e0b02 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -50,7 +50,7 @@ public: spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override; spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override; - spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override; + spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override; spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&, spi::IncludedVersions versions, spi::Context&) override; @@ -76,6 +76,7 @@ public: void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; private: template -- cgit v1.2.3