summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2021-11-18 13:38:28 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2021-11-18 22:17:03 +0000
commit7465d0401ce84cbd2264e254f3a1a1b3b400ac6f (patch)
treeb810e1622227ab47ce4e61e6a6eef97371660659 /storage
parent0654fa7787092a5f101572629c11860f5480e058 (diff)
Let removeAsync handle list of documents.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp8
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/processalltest.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp22
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
7 files changed, 29 insertions, 26 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 02b43a32df3..1c47170de6c 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -106,12 +106,14 @@ PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp t
}
void
-PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id,
+PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, std::vector<TimeStampAndDocumentId> ids,
spi::Context& context, spi::OperationComplete::UP onComplete)
{
- LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")");
+ for (const TimeStampAndDocumentId & stampedId : ids) {
+ LOG_SPI("remove(" << bucket << ", " << stampedId.first << ", " << stampedId.second << ")");
+ }
CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete);
- _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete));
+ _spi.removeAsync(bucket, std::move(ids), context, std::move(onComplete));
}
void
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index cfc7002a643..1552a955221 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -100,7 +100,7 @@ public:
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
- void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId> ids, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 0c862f11b05..51fe4ef4bd3 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -41,11 +41,14 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory);
auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ std::shared_ptr<api::StorageMessage> msg;
+ ASSERT_TRUE(_replySender.queue.getNext(msg, 60s));
+
EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n",
dumpBucket(bucketId));
- auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP());
+ auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg);
ASSERT_TRUE(reply);
EXPECT_EQ(2u, reply->documents_removed());
}
@@ -65,6 +68,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ std::shared_ptr<api::StorageMessage> msg;
+ ASSERT_TRUE(_replySender.queue.getNext(msg, 60s));
+
EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
"DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n"
@@ -77,7 +83,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
"DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n",
dumpBucket(bucketId));
- auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP());
+ auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg);
ASSERT_TRUE(reply);
EXPECT_EQ(5u, reply->documents_removed());
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index aac2b0748c4..b5161673af3 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -413,20 +413,14 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
std::make_shared<document::DocIdOnly>(),
processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());
- std::vector<std::future<std::unique_ptr<spi::Result>>> results;
- results.reserve(to_remove.size());
- for (auto & entry : to_remove) {
- auto catcher = std::make_unique<spi::CatchResult>();
- results.push_back(catcher->future_result());
- _spi.removeAsync(bucket, entry.first, entry.second, tracker->context(), std::move(catcher));
- }
- for (auto & future : results) {
- auto result = future.get();
- if (result->getErrorCode() != spi::Result::ErrorType::NONE) {
- throw std::runtime_error(fmt("Failed to do remove for removelocation: %s", result->getErrorMessage().c_str()));
- }
- }
- tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, to_remove.size()));
+ auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed));
+ tracker->sendReply();
+ });
+
+ _spi.removeAsync(bucket, std::move(to_remove), tracker->context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7d7c8b79e0c..254a26aa454 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -506,9 +506,10 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
} else {
- DocumentId docId(e._docName);
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), docId, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
- _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete));
+ std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids;
+ ids.emplace_back(timestamp, e._docName);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ _spi.removeAsync(bucket, std::move(ids), context, std::move(complete));
}
}
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 9ccd901744b..752c1175f22 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -154,11 +154,11 @@ ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi
}
void
-ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId,
+ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, std::vector<TimeStampAndDocumentId> ids,
spi::Context & context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
- _impl.removeAsync(bucket, ts, docId, context, std::move(onComplete));
+ _impl.removeAsync(bucket, std::move(ids), context, std::move(onComplete));
}
void
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 14d20cf8a52..7285c405d5c 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -58,7 +58,7 @@ public:
void register_error_listener(std::shared_ptr<ProviderErrorListener> listener);
void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override;
- void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId>, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;