diff options
19 files changed, 171 insertions, 72 deletions
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp index 495522d3cf5..810f2ad2356 100644 --- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp +++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp @@ -3,13 +3,13 @@ #include <vespa/document/base/testdocman.h> #include <vespa/persistence/conformancetest/conformancetest.h> #include <vespa/persistence/spi/test.h> +#include <vespa/persistence/spi/catchresult.h> #include <vespa/persistence/spi/resource_usage_listener.h> #include <vespa/document/fieldset/fieldsets.h> #include <vespa/document/update/documentupdate.h> #include <vespa/document/update/assignvalueupdate.h> #include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_bucket_space.h> -#include <vespa/document/util/bytebuffer.h> #include <vespa/vdslib/state/state.h> #include <vespa/vdslib/state/node.h> #include <vespa/vdslib/state/nodestate.h> @@ -18,7 +18,6 @@ #include <vespa/vespalib/util/idestructorcallback.h> #include <vespa/vespalib/util/size_literals.h> #include <vespa/config-stor-distribution.h> -#include <algorithm> #include <limits> #include <gtest/gtest.h> @@ -796,6 +795,40 @@ TEST_F(ConformanceTest, testRemove) EXPECT_FALSE(getResult.hasDocument()); } +TEST_F(ConformanceTest, testRemoveMulti) +{ + document::TestDocMan testDocMan; + _factory->clear(); + PersistenceProviderUP spi(getSpi(*_factory, testDocMan)); + + BucketId bucketId1(8, 0x01); + Bucket bucket1(makeSpiBucket(bucketId1)); + Context context(Priority(0), Trace::TraceLevel(0)); + spi->createBucket(bucket1, context); + + std::vector<Document::SP> docs; + for (size_t i(0); i < 30; i++) { + docs.push_back(testDocMan.createRandomDocumentAtLocation(0x01, i)); + } + + std::vector<PersistenceProvider::TimeStampAndDocumentId> ids; + for (size_t i(0); i < docs.size(); i++) { + spi->put(bucket1, Timestamp(i), docs[i], context); + if (i & 0x1) { + ids.emplace_back(Timestamp(i), docs[i]->getId()); + } + } + + auto onDone = std::make_unique<CatchResult>(); + auto future = onDone->future_result(); + spi->removeAsync(bucket1, ids, context, std::move(onDone)); + auto result = future.get(); + ASSERT_TRUE(result); + auto removeResult = dynamic_cast<spi::RemoveResult *>(result.get()); + ASSERT_TRUE(removeResult != nullptr); + EXPECT_EQ(15u, removeResult->num_removed()); +} + TEST_F(ConformanceTest, testRemoveMerge) { document::TestDocMan testDocMan; diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp index c7365f39f02..9d9f31b63a3 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp @@ -471,30 +471,33 @@ DummyPersistence::updateAsync(const Bucket& bucket, Timestamp ts, DocumentUpdate } void -DummyPersistence::removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context &, OperationComplete::UP onComplete) +DummyPersistence::removeAsync(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, Context &, OperationComplete::UP onComplete) { DUMMYPERSISTENCE_VERIFY_INITIALIZED; - LOG(debug, "remove(%s, %" PRIu64 ", %s)", - b.toString().c_str(), - uint64_t(t), - did.toString().c_str()); assert(b.getBucketSpace() == FixedBucketSpaces::default_space()); - BucketContentGuard::UP bc(acquireBucketWithLock(b)); - while (!bc) { - internal_create_bucket(b); - bc = acquireBucketWithLock(b); - } - DocEntry::SP entry((*bc)->getEntry(did)); - bool foundPut(entry.get() && !entry->isRemove()); - auto remEntry = std::make_unique<DocEntry>(t, REMOVE_ENTRY, did); - - if ((*bc)->hasTimestamp(t)) { - (*bc)->eraseEntry(t); + + uint32_t numRemoves(0); + for (const TimeStampAndDocumentId & stampedId : ids) { + const DocumentId & id = stampedId.second; + Timestamp t = stampedId.first; + LOG(debug, "remove(%s, %" PRIu64 ", %s)", b.toString().c_str(), uint64_t(t), id.toString().c_str()); + + while (!bc) { + internal_create_bucket(b); + bc = acquireBucketWithLock(b); + } + DocEntry::SP entry((*bc)->getEntry(id)); + numRemoves += (entry.get() && !entry->isRemove()) ? 1 : 0; + auto remEntry = std::make_unique<DocEntry>(t, REMOVE_ENTRY, id); + + if ((*bc)->hasTimestamp(t)) { + (*bc)->eraseEntry(t); + } + (*bc)->insert(std::move(remEntry)); } - (*bc)->insert(std::move(remEntry)); bc.reset(); - onComplete->onComplete(std::make_unique<RemoveResult>(foundPut)); + onComplete->onComplete(std::make_unique<RemoveResult>(numRemoves)); } GetResult diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h index 4e0d088d5d6..a7a784d0479 100644 --- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h +++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h @@ -159,7 +159,7 @@ public: BucketInfoResult getBucketInfo(const Bucket&) const override; GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override; void putAsync(const Bucket&, Timestamp, DocumentSP, Context&, OperationComplete::UP) override; - void removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context&, OperationComplete::UP) override; + void removeAsync(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP) override; void updateAsync(const Bucket&, Timestamp, DocumentUpdateSP, Context&, OperationComplete::UP) override; CreateIteratorResult diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp index 951dbf97cff..35654240ec7 100644 --- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp @@ -1,10 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "abstractpersistenceprovider.h" -#include <vespa/document/datatype/documenttype.h> -#include <vespa/document/update/documentupdate.h> -#include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/fieldvalue/document.h> +#include <vespa/document/base/documentid.h> #include <vespa/vespalib/util/idestructorcallback.h> namespace storage::spi { @@ -13,7 +10,9 @@ void AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp, const DocumentId& id, Context& context, OperationComplete::UP onComplete) { - removeAsync(b, timestamp, id, context, std::move(onComplete)); + std::vector<TimeStampAndDocumentId> ids; + ids.emplace_back(timestamp, id); + removeAsync(b, std::move(ids), context, std::move(onComplete)); } BucketIdListResult diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp index 31db08a6f4f..e6733dc4150 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp @@ -2,6 +2,7 @@ #include "persistenceprovider.h" #include "catchresult.h" +#include <vespa/document/base/documentid.h> #include <future> namespace storage::spi { @@ -44,7 +45,9 @@ RemoveResult PersistenceProvider::remove(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) { auto catcher = std::make_unique<CatchResult>(); auto future = catcher->future_result(); - removeAsync(bucket, timestamp, docId, context, std::move(catcher)); + std::vector<TimeStampAndDocumentId> ids; + ids.emplace_back(timestamp, docId); + removeAsync(bucket, std::move(ids), context, std::move(catcher)); return dynamic_cast<const RemoveResult &>(*future.get()); } diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h index 269175f7d26..8c62e691daf 100644 --- a/persistence/src/vespa/persistence/spi/persistenceprovider.h +++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h @@ -54,6 +54,7 @@ struct PersistenceProvider { using BucketSpace = document::BucketSpace; using FieldSetSP = std::shared_ptr<document::FieldSet>; + using TimeStampAndDocumentId = std::pair<Timestamp, DocumentId>; virtual ~PersistenceProvider(); @@ -168,7 +169,7 @@ struct PersistenceProvider * @param timestamp The timestamp for the new bucket entry. * @param id The ID to remove */ - virtual void removeAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP) = 0; + virtual void removeAsync(const Bucket&, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP) = 0; /** * @see remove() diff --git a/persistence/src/vespa/persistence/spi/result.h b/persistence/src/vespa/persistence/spi/result.h index 73838e99a18..70bd37590a1 100644 --- a/persistence/src/vespa/persistence/spi/result.h +++ b/persistence/src/vespa/persistence/spi/result.h @@ -142,6 +142,7 @@ public: : _numRemoved(numRemoved) { } bool wasFound() const { return _numRemoved > 0; } uint32_t num_removed() const { return _numRemoved; } + void inc_num_removed(uint32_t add) { _numRemoved += add; } private: uint32_t _numRemoved; diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp index eccae8fe8ab..d5421eaaeca 100644 --- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp +++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp @@ -134,7 +134,9 @@ SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& docum auto provider = get_provider(bucket); if (provider) { Bucket spi_bucket(bucket); - provider->removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker)); + std::vector<storage::spi::PersistenceProvider::TimeStampAndDocumentId> ids; + ids.emplace_back(Timestamp(timestamp), document_id); + provider->removeAsync(spi_bucket, std::move(ids), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker)); } else { ++_errors; } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 9d9e1e0a344..7e21e619419 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -368,23 +368,61 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do } void -PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context&, OperationComplete::UP onComplete) +PersistenceEngine::removeAsync(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP onComplete) +{ + if (ids.size() == 1) { + removeAsyncSingle(b, ids[0].first, ids[0].second, std::move(onComplete)); + } else { + removeAsyncMulti(b, std::move(ids), std::move(onComplete)); + } +} + +void +PersistenceEngine::removeAsyncMulti(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, OperationComplete::UP onComplete) { + ReadGuard rguard(_rwMutex); + //TODO Group per document type/handler and handle in one go. + for (const TimeStampAndDocumentId & stampedId : ids) { + const document::DocumentId & id = stampedId.second; + if (!id.hasDocType()) { + return onComplete->onComplete( + std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, + fmt("Old id scheme not supported in elastic mode (%s)", id.toString().c_str()))); + } + DocTypeName docType(id.getDocType()); + IPersistenceHandler *handler = getHandler(rguard, b.getBucketSpace(), docType); + if (!handler) { + return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, + fmt("No handler for document type '%s'", + docType.toString().c_str()))); + } + } + auto transportContext = std::make_shared<AsyncRemoveTransportContext>(ids.size(), std::move(onComplete)); + for (const TimeStampAndDocumentId & stampedId : ids) { + const document::DocumentId & id = stampedId.second; + DocTypeName docType(id.getDocType()); + IPersistenceHandler *handler = getHandler(rguard, b.getBucketSpace(), docType); + handler->handleRemove(feedtoken::make(transportContext), b, stampedId.first, id); + } +} + +void +PersistenceEngine::removeAsyncSingle(const Bucket& b, Timestamp t, const DocumentId& id, OperationComplete::UP onComplete) { ReadGuard rguard(_rwMutex); LOG(spam, "remove(%s, %" PRIu64 ", \"%s\")", b.toString().c_str(), - static_cast<uint64_t>(t.getValue()), did.toString().c_str()); - if (!did.hasDocType()) { + static_cast<uint64_t>(t.getValue()), id.toString().c_str()); + if (!id.hasDocType()) { return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, - fmt("Old id scheme not supported in elastic mode (%s)", did.toString().c_str()))); + fmt("Old id scheme not supported in elastic mode (%s)", id.toString().c_str()))); } - DocTypeName docType(did.getDocType()); + DocTypeName docType(id.getDocType()); IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); if (!handler) { return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR, fmt("No handler for document type '%s'", docType.toString().c_str()))); } auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete)); - handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did); + handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, id); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index fe564d01459..7c8040fae9d 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -88,7 +88,8 @@ private: void saveClusterState(BucketSpace bucketSpace, const ClusterState &calc); ClusterState::SP savedClusterState(BucketSpace bucketSpace) const; std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); } - + void removeAsyncSingle(const Bucket&, Timestamp, const document::DocumentId &id, OperationComplete::UP); + void removeAsyncMulti(const Bucket&, std::vector<TimeStampAndDocumentId> ids, OperationComplete::UP); public: typedef std::unique_ptr<PersistenceEngine> UP; @@ -106,7 +107,7 @@ public: void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override; BucketInfoResult getBucketInfo(const Bucket&) const override; void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; - void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override; + void removeAsync(const Bucket&, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP) override; void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override; GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override; CreateIteratorResult diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp index 8a0955b3147..e40c72fb3c1 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp @@ -5,6 +5,7 @@ using vespalib::make_string; using storage::spi::Result; +using storage::spi::RemoveResult; namespace proton { @@ -30,14 +31,19 @@ TransportMerger::mergeResult(ResultUP result, bool documentWasFound) { } } +Result::UP +TransportMerger::merge(ResultUP accum, ResultUP incomming, bool documentWasFound) { + return documentWasFound ? std::move(incomming) : std::move(accum); +} + void TransportMerger::mergeWithLock(ResultUP result, bool documentWasFound) { if (!_result) { _result = std::move(result); } else if (result->hasError()) { _result = std::make_unique<Result>(mergeErrorResults(*_result, *result)); - } else if (documentWasFound) { - _result = std::move(result); + } else { + _result = merge(std::move(_result), std::move(result), documentWasFound); } completeIfDone(); } @@ -93,4 +99,11 @@ AsyncTransportContext::send(ResultUP result, bool documentWasFound) mergeResult(std::move(result), documentWasFound); } +Result::UP +AsyncRemoveTransportContext::merge(ResultUP accum, ResultUP incomming, bool) { + // TODO This can be static cast if necessary. + dynamic_cast<RemoveResult *>(accum.get())->inc_num_removed(dynamic_cast<RemoveResult *>(incomming.get())->num_removed()); + return accum; +} + } // proton diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h index 91cadfedcd5..9e83102f464 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h @@ -23,6 +23,7 @@ protected: ~TransportMerger() override; void mergeResult(ResultUP result, bool documentWasFound); virtual void completeIfDone() { } // Called with lock held if necessary on every merge + virtual ResultUP merge(ResultUP accum, ResultUP incomming, bool documentWasFound); ResultUP _result; private: @@ -47,15 +48,10 @@ public: void await() { _latch.await(); } - const UpdateResult &getUpdateResult() const { - return dynamic_cast<const UpdateResult &>(*_result); - } + const Result &getResult() const { return *_result; } - const RemoveResult &getRemoveResult() const { - return dynamic_cast<const RemoveResult &>(*_result); - } }; @@ -77,5 +73,11 @@ public: void send(ResultUP result, bool documentWasFound) override; }; -} // namespace proton +class AsyncRemoveTransportContext : public AsyncTransportContext { +public: + using AsyncTransportContext::AsyncTransportContext; +protected: + ResultUP merge(ResultUP accum, ResultUP incomming, bool documentWasFound) override; +}; +} diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp index 02b43a32df3..1c47170de6c 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp @@ -106,12 +106,14 @@ PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp t } void -PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id, +PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, std::vector<TimeStampAndDocumentId> ids, spi::Context& context, spi::OperationComplete::UP onComplete) { - LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")"); + for (const TimeStampAndDocumentId & stampedId : ids) { + LOG_SPI("remove(" << bucket << ", " << stampedId.first << ", " << stampedId.second << ")"); + } CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete); - _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete)); + _spi.removeAsync(bucket, std::move(ids), context, std::move(onComplete)); } void diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h index cfc7002a643..1552a955221 100644 --- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h +++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h @@ -100,7 +100,7 @@ public: spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override; spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override; void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override; - void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId> ids, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override; spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override; diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp index 0c862f11b05..51fe4ef4bd3 100644 --- a/storage/src/tests/persistence/processalltest.cpp +++ b/storage/src/tests/persistence/processalltest.cpp @@ -41,11 +41,14 @@ TEST_F(ProcessAllHandlerTest, remove_location) { AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory); auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + std::shared_ptr<api::StorageMessage> msg; + ASSERT_TRUE(_replySender.queue.getNext(msg, 60s)); + EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n", dumpBucket(bucketId)); - auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP()); + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg); ASSERT_TRUE(reply); EXPECT_EQ(2u, reply->documents_removed()); } @@ -65,6 +68,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket); auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket)); + std::shared_ptr<api::StorageMessage> msg; + ASSERT_TRUE(_replySender.queue.getNext(msg, 60s)); + EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n" "DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n" "DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n" @@ -77,7 +83,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) { "DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n", dumpBucket(bucketId)); - auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP()); + auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg); ASSERT_TRUE(reply); EXPECT_EQ(5u, reply->documents_removed()); } diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp index aac2b0748c4..b5161673af3 100644 --- a/storage/src/vespa/storage/persistence/asynchandler.cpp +++ b/storage/src/vespa/storage/persistence/asynchandler.cpp @@ -413,20 +413,14 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack std::make_shared<document::DocIdOnly>(), processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context()); - std::vector<std::future<std::unique_ptr<spi::Result>>> results; - results.reserve(to_remove.size()); - for (auto & entry : to_remove) { - auto catcher = std::make_unique<spi::CatchResult>(); - results.push_back(catcher->future_result()); - _spi.removeAsync(bucket, entry.first, entry.second, tracker->context(), std::move(catcher)); - } - for (auto & future : results) { - auto result = future.get(); - if (result->getErrorCode() != spi::Result::ErrorType::NONE) { - throw std::runtime_error(fmt("Failed to do remove for removelocation: %s", result->getErrorMessage().c_str())); - } - } - tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, to_remove.size())); + auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) { + tracker->checkForError(*response); + tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed)); + tracker->sendReply(); + }); + + _spi.removeAsync(bucket, std::move(to_remove), tracker->context(), + std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task))); return tracker; } diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp index 7d7c8b79e0c..254a26aa454 100644 --- a/storage/src/vespa/storage/persistence/mergehandler.cpp +++ b/storage/src/vespa/storage/persistence/mergehandler.cpp @@ -506,9 +506,10 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency); _spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete)); } else { - DocumentId docId(e._docName); - auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), docId, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); - _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete)); + std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids; + ids.emplace_back(timestamp, e._docName); + auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency); + _spi.removeAsync(bucket, std::move(ids), context, std::move(complete)); } } diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp index 9ccd901744b..752c1175f22 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp @@ -154,11 +154,11 @@ ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi } void -ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId, +ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, std::vector<TimeStampAndDocumentId> ids, spi::Context & context, spi::OperationComplete::UP onComplete) { onComplete->addResultHandler(this); - _impl.removeAsync(bucket, ts, docId, context, std::move(onComplete)); + _impl.removeAsync(bucket, std::move(ids), context, std::move(onComplete)); } void diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h index 14d20cf8a52..7285c405d5c 100644 --- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h +++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h @@ -58,7 +58,7 @@ public: void register_error_listener(std::shared_ptr<ProviderErrorListener> listener); void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override; - void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; + void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId>, spi::Context&, spi::OperationComplete::UP) override; void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override; void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override; void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override; |