diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-18 13:38:28 +0000 |
---|---|---|
committer | Henning Baldersheim <balder@yahoo-inc.com> | 2021-11-18 22:17:03 +0000 |
commit | 7465d0401ce84cbd2264e254f3a1a1b3b400ac6f (patch) | |
tree | b810e1622227ab47ce4e61e6a6eef97371660659 /storage | |
parent | 0654fa7787092a5f101572629c11860f5480e058 (diff) |
Let removeAsync handle list of documents.
Diffstat (limited to 'storage')
7 files changed, 29 insertions, 26 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 02b43a32df3..1c47170de6c 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -106,12 +106,14 @@ PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp t } void -PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id, +PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, std::vector<TimeStampAndDocumentId> ids, spi::Context& context, spi::OperationComplete::UP onComplete) { - LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")"); + for (const TimeStampAndDocumentId & stampedId : ids) { + LOG_SPI("remove(" << bucket << ", " << stampedId.first << ", " << stampedId.second << ")"); + } CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete); - _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete)); + _spi.removeAsync(bucket, std::move(ids), context, std::move(onComplete)); } void diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index cfc7002a643..1552a955221 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -100,7 +100,7 @@ public: spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; - void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId> ids, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 0c862f11b05..51fe4ef4bd3 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -41,11 +41,14 @@ TEST_F(ProcessAllHandlerTest, remove_location) { AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory); auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + std::shared_ptr<api::StorageMessage> msg; + ASSERT_TRUE(_replySender.queue.getNext(msg, 60s)); + EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", dumpBucket(bucketId)); - auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP()); + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg); ASSERT_TRUE(reply); EXPECT_EQ(2u, reply->documents_removed()); } @@ -65,6 +68,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + std::shared_ptr<api::StorageMessage> msg; + ASSERT_TRUE(_replySender.queue.getNext(msg, 60s)); + EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" "DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n" @@ -77,7 +83,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n", dumpBucket(bucketId)); - auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP()); + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg); ASSERT_TRUE(reply); EXPECT_EQ(5u, reply->documents_removed()); } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index aac2b0748c4..b5161673af3 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -413,20 +413,14 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack std::make_shared<document::DocIdOnly>(), processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context()); - std::vector<std::future<std::unique_ptr<spi::Result>>> results; - results.reserve(to_remove.size()); - for (auto & entry : to_remove) { - auto catcher = std::make_unique<spi::CatchResult>(); - results.push_back(catcher->future_result()); - _spi.removeAsync(bucket, entry.first, entry.second, tracker->context(), std::move(catcher)); - } - for (auto & future : results) { - auto result = future.get(); - if (result->getErrorCode() != spi::Result::ErrorType::NONE) { - throw std::runtime_error(fmt("Failed to do remove for removelocation: %s", result->getErrorMessage().c_str())); - } - } - tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, to_remove.size())); + auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed)); + tracker->sendReply(); + }); + + _spi.removeAsync(bucket, std::move(to_remove), tracker->context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return tracker; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7d7c8b79e0c..254a26aa454 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -506,9 +506,10 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); } else { - DocumentId docId(e._docName); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), docId, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); - _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete)); + std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids; + ids.emplace_back(timestamp, e._docName); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); + _spi.removeAsync(bucket, std::move(ids), context, std::move(complete)); } } diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 9ccd901744b..752c1175f22 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -154,11 +154,11 @@ ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi } void -ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, +ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, std::vector<TimeStampAndDocumentId> ids, spi::Context & context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.removeAsync(bucket, ts, docId, context, std::move(onComplete)); + _impl.removeAsync(bucket, std::move(ids), context, std::move(onComplete)); } void diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 14d20cf8a52..7285c405d5c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -58,7 +58,7 @@ public: void register_error_listener(std::shared_ptr<ProviderErrorListener> listener); void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; - void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId>, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; |