aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--persistence/src/vespa/persistence/conformancetest/conformancetest.cpp37
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp39
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h2
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp9
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp5
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h3
-rw-r--r--persistence/src/vespa/persistence/spi/result.h1
-rw-r--r--searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp4
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp50
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h5
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp17
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h16
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp8
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h2
-rw-r--r--storage/src/tests/persistence/processalltest.cpp10
-rw-r--r--storage/src/vespa/storage/persistence/asynchandler.cpp22
-rw-r--r--storage/src/vespa/storage/persistence/mergehandler.cpp7
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp4
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h2
19 files changed, 171 insertions, 72 deletions
diff --git a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
index 495522d3cf5..810f2ad2356 100644
--- a/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
+++ b/persistence/src/vespa/persistence/conformancetest/conformancetest.cpp
@@ -3,13 +3,13 @@
#include <vespa/document/base/testdocman.h>
#include <vespa/persistence/conformancetest/conformancetest.h>
#include <vespa/persistence/spi/test.h>
+#include <vespa/persistence/spi/catchresult.h>
#include <vespa/persistence/spi/resource_usage_listener.h>
#include <vespa/document/fieldset/fieldsets.h>
#include <vespa/document/update/documentupdate.h>
#include <vespa/document/update/assignvalueupdate.h>
#include <vespa/document/repo/documenttyperepo.h>
#include <vespa/document/test/make_bucket_space.h>
-#include <vespa/document/util/bytebuffer.h>
#include <vespa/vdslib/state/state.h>
#include <vespa/vdslib/state/node.h>
#include <vespa/vdslib/state/nodestate.h>
@@ -18,7 +18,6 @@
#include <vespa/vespalib/util/idestructorcallback.h>
#include <vespa/vespalib/util/size_literals.h>
#include <vespa/config-stor-distribution.h>
-#include <algorithm>
#include <limits>
#include <gtest/gtest.h>
@@ -796,6 +795,40 @@ TEST_F(ConformanceTest, testRemove)
EXPECT_FALSE(getResult.hasDocument());
}
+TEST_F(ConformanceTest, testRemoveMulti)
+{
+ document::TestDocMan testDocMan;
+ _factory->clear();
+ PersistenceProviderUP spi(getSpi(*_factory, testDocMan));
+
+ BucketId bucketId1(8, 0x01);
+ Bucket bucket1(makeSpiBucket(bucketId1));
+ Context context(Priority(0), Trace::TraceLevel(0));
+ spi->createBucket(bucket1, context);
+
+ std::vector<Document::SP> docs;
+ for (size_t i(0); i < 30; i++) {
+ docs.push_back(testDocMan.createRandomDocumentAtLocation(0x01, i));
+ }
+
+ std::vector<PersistenceProvider::TimeStampAndDocumentId> ids;
+ for (size_t i(0); i < docs.size(); i++) {
+ spi->put(bucket1, Timestamp(i), docs[i], context);
+ if (i & 0x1) {
+ ids.emplace_back(Timestamp(i), docs[i]->getId());
+ }
+ }
+
+ auto onDone = std::make_unique<CatchResult>();
+ auto future = onDone->future_result();
+ spi->removeAsync(bucket1, ids, context, std::move(onDone));
+ auto result = future.get();
+ ASSERT_TRUE(result);
+ auto removeResult = dynamic_cast<spi::RemoveResult *>(result.get());
+ ASSERT_TRUE(removeResult != nullptr);
+ EXPECT_EQ(15u, removeResult->num_removed());
+}
+
TEST_F(ConformanceTest, testRemoveMerge)
{
document::TestDocMan testDocMan;
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index c7365f39f02..9d9f31b63a3 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -471,30 +471,33 @@ DummyPersistence::updateAsync(const Bucket& bucket, Timestamp ts, DocumentUpdate
}
void
-DummyPersistence::removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context &, OperationComplete::UP onComplete)
+DummyPersistence::removeAsync(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, Context &, OperationComplete::UP onComplete)
{
DUMMYPERSISTENCE_VERIFY_INITIALIZED;
- LOG(debug, "remove(%s, %" PRIu64 ", %s)",
- b.toString().c_str(),
- uint64_t(t),
- did.toString().c_str());
assert(b.getBucketSpace() == FixedBucketSpaces::default_space());
-
BucketContentGuard::UP bc(acquireBucketWithLock(b));
- while (!bc) {
- internal_create_bucket(b);
- bc = acquireBucketWithLock(b);
- }
- DocEntry::SP entry((*bc)->getEntry(did));
- bool foundPut(entry.get() && !entry->isRemove());
- auto remEntry = std::make_unique<DocEntry>(t, REMOVE_ENTRY, did);
-
- if ((*bc)->hasTimestamp(t)) {
- (*bc)->eraseEntry(t);
+
+ uint32_t numRemoves(0);
+ for (const TimeStampAndDocumentId & stampedId : ids) {
+ const DocumentId & id = stampedId.second;
+ Timestamp t = stampedId.first;
+ LOG(debug, "remove(%s, %" PRIu64 ", %s)", b.toString().c_str(), uint64_t(t), id.toString().c_str());
+
+ while (!bc) {
+ internal_create_bucket(b);
+ bc = acquireBucketWithLock(b);
+ }
+ DocEntry::SP entry((*bc)->getEntry(id));
+ numRemoves += (entry.get() && !entry->isRemove()) ? 1 : 0;
+ auto remEntry = std::make_unique<DocEntry>(t, REMOVE_ENTRY, id);
+
+ if ((*bc)->hasTimestamp(t)) {
+ (*bc)->eraseEntry(t);
+ }
+ (*bc)->insert(std::move(remEntry));
}
- (*bc)->insert(std::move(remEntry));
bc.reset();
- onComplete->onComplete(std::make_unique<RemoveResult>(foundPut));
+ onComplete->onComplete(std::make_unique<RemoveResult>(numRemoves));
}
GetResult
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 4e0d088d5d6..a7a784d0479 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -159,7 +159,7 @@ public:
BucketInfoResult getBucketInfo(const Bucket&) const override;
GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override;
void putAsync(const Bucket&, Timestamp, DocumentSP, Context&, OperationComplete::UP) override;
- void removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context&, OperationComplete::UP) override;
+ void removeAsync(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP) override;
void updateAsync(const Bucket&, Timestamp, DocumentUpdateSP, Context&, OperationComplete::UP) override;
CreateIteratorResult
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index 951dbf97cff..35654240ec7 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -1,10 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "abstractpersistenceprovider.h"
-#include <vespa/document/datatype/documenttype.h>
-#include <vespa/document/update/documentupdate.h>
-#include <vespa/document/fieldset/fieldsets.h>
-#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/base/documentid.h>
#include <vespa/vespalib/util/idestructorcallback.h>
namespace storage::spi {
@@ -13,7 +10,9 @@ void
AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp,
const DocumentId& id, Context& context, OperationComplete::UP onComplete)
{
- removeAsync(b, timestamp, id, context, std::move(onComplete));
+ std::vector<TimeStampAndDocumentId> ids;
+ ids.emplace_back(timestamp, id);
+ removeAsync(b, std::move(ids), context, std::move(onComplete));
}
BucketIdListResult
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index 31db08a6f4f..e6733dc4150 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -2,6 +2,7 @@
#include "persistenceprovider.h"
#include "catchresult.h"
+#include <vespa/document/base/documentid.h>
#include <future>
namespace storage::spi {
@@ -44,7 +45,9 @@ RemoveResult
PersistenceProvider::remove(const Bucket& bucket, Timestamp timestamp, const DocumentId & docId, Context& context) {
auto catcher = std::make_unique<CatchResult>();
auto future = catcher->future_result();
- removeAsync(bucket, timestamp, docId, context, std::move(catcher));
+ std::vector<TimeStampAndDocumentId> ids;
+ ids.emplace_back(timestamp, docId);
+ removeAsync(bucket, std::move(ids), context, std::move(catcher));
return dynamic_cast<const RemoveResult &>(*future.get());
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 269175f7d26..8c62e691daf 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -54,6 +54,7 @@ struct PersistenceProvider
{
using BucketSpace = document::BucketSpace;
using FieldSetSP = std::shared_ptr<document::FieldSet>;
+ using TimeStampAndDocumentId = std::pair<Timestamp, DocumentId>;
virtual ~PersistenceProvider();
@@ -168,7 +169,7 @@ struct PersistenceProvider
* @param timestamp The timestamp for the new bucket entry.
* @param id The ID to remove
*/
- virtual void removeAsync(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&, OperationComplete::UP) = 0;
+ virtual void removeAsync(const Bucket&, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP) = 0;
/**
* @see remove()
diff --git a/persistence/src/vespa/persistence/spi/result.h b/persistence/src/vespa/persistence/spi/result.h
index 73838e99a18..70bd37590a1 100644
--- a/persistence/src/vespa/persistence/spi/result.h
+++ b/persistence/src/vespa/persistence/spi/result.h
@@ -142,6 +142,7 @@ public:
: _numRemoved(numRemoved) { }
bool wasFound() const { return _numRemoved > 0; }
uint32_t num_removed() const { return _numRemoved; }
+ void inc_num_removed(uint32_t add) { _numRemoved += add; }
private:
uint32_t _numRemoved;
diff --git a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
index eccae8fe8ab..d5421eaaeca 100644
--- a/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
+++ b/searchcore/src/vespa/searchcore/bmcluster/spi_bm_feed_handler.cpp
@@ -134,7 +134,9 @@ SpiBmFeedHandler::remove(const document::Bucket& bucket, const DocumentId& docum
auto provider = get_provider(bucket);
if (provider) {
Bucket spi_bucket(bucket);
- provider->removeAsync(spi_bucket, Timestamp(timestamp), document_id, context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
+ std::vector<storage::spi::PersistenceProvider::TimeStampAndDocumentId> ids;
+ ids.emplace_back(Timestamp(timestamp), document_id);
+ provider->removeAsync(spi_bucket, std::move(ids), context, std::make_unique<MyOperationComplete>(provider, _errors, spi_bucket, tracker));
} else {
++_errors;
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 9d9e1e0a344..7e21e619419 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -368,23 +368,61 @@ PersistenceEngine::putAsync(const Bucket &bucket, Timestamp ts, storage::spi::Do
}
void
-PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& did, Context&, OperationComplete::UP onComplete)
+PersistenceEngine::removeAsync(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP onComplete)
+{
+ if (ids.size() == 1) {
+ removeAsyncSingle(b, ids[0].first, ids[0].second, std::move(onComplete));
+ } else {
+ removeAsyncMulti(b, std::move(ids), std::move(onComplete));
+ }
+}
+
+void
+PersistenceEngine::removeAsyncMulti(const Bucket& b, std::vector<TimeStampAndDocumentId> ids, OperationComplete::UP onComplete) {
+ ReadGuard rguard(_rwMutex);
+ //TODO Group per document type/handler and handle in one go.
+ for (const TimeStampAndDocumentId & stampedId : ids) {
+ const document::DocumentId & id = stampedId.second;
+ if (!id.hasDocType()) {
+ return onComplete->onComplete(
+ std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
+ fmt("Old id scheme not supported in elastic mode (%s)", id.toString().c_str())));
+ }
+ DocTypeName docType(id.getDocType());
+ IPersistenceHandler *handler = getHandler(rguard, b.getBucketSpace(), docType);
+ if (!handler) {
+ return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
+ fmt("No handler for document type '%s'",
+ docType.toString().c_str())));
+ }
+ }
+ auto transportContext = std::make_shared<AsyncRemoveTransportContext>(ids.size(), std::move(onComplete));
+ for (const TimeStampAndDocumentId & stampedId : ids) {
+ const document::DocumentId & id = stampedId.second;
+ DocTypeName docType(id.getDocType());
+ IPersistenceHandler *handler = getHandler(rguard, b.getBucketSpace(), docType);
+ handler->handleRemove(feedtoken::make(transportContext), b, stampedId.first, id);
+ }
+}
+
+void
+PersistenceEngine::removeAsyncSingle(const Bucket& b, Timestamp t, const DocumentId& id, OperationComplete::UP onComplete)
{
ReadGuard rguard(_rwMutex);
LOG(spam, "remove(%s, %" PRIu64 ", \"%s\")", b.toString().c_str(),
- static_cast<uint64_t>(t.getValue()), did.toString().c_str());
- if (!did.hasDocType()) {
+ static_cast<uint64_t>(t.getValue()), id.toString().c_str());
+ if (!id.hasDocType()) {
return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
- fmt("Old id scheme not supported in elastic mode (%s)", did.toString().c_str())));
+ fmt("Old id scheme not supported in elastic mode (%s)", id.toString().c_str())));
}
- DocTypeName docType(did.getDocType());
+ DocTypeName docType(id.getDocType());
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
if (!handler) {
return onComplete->onComplete(std::make_unique<RemoveResult>(Result::ErrorType::PERMANENT_ERROR,
fmt("No handler for document type '%s'", docType.toString().c_str())));
}
auto transportContext = std::make_shared<AsyncTransportContext>(1, std::move(onComplete));
- handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, did);
+ handler->handleRemove(feedtoken::make(std::move(transportContext)), b, t, id);
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index fe564d01459..7c8040fae9d 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -88,7 +88,8 @@ private:
void saveClusterState(BucketSpace bucketSpace, const ClusterState &calc);
ClusterState::SP savedClusterState(BucketSpace bucketSpace) const;
std::shared_ptr<BucketExecutor> get_bucket_executor() noexcept { return _bucket_executor.lock(); }
-
+ void removeAsyncSingle(const Bucket&, Timestamp, const document::DocumentId &id, OperationComplete::UP);
+ void removeAsyncMulti(const Bucket&, std::vector<TimeStampAndDocumentId> ids, OperationComplete::UP);
public:
typedef std::unique_ptr<PersistenceEngine> UP;
@@ -106,7 +107,7 @@ public:
void setActiveStateAsync(const Bucket&, BucketInfo::ActiveState, OperationComplete::UP) override;
BucketInfoResult getBucketInfo(const Bucket&) const override;
void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
- void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override;
+ void removeAsync(const Bucket&, std::vector<TimeStampAndDocumentId> ids, Context&, OperationComplete::UP) override;
void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override;
GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override;
CreateIteratorResult
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
index 8a0955b3147..e40c72fb3c1 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.cpp
@@ -5,6 +5,7 @@
using vespalib::make_string;
using storage::spi::Result;
+using storage::spi::RemoveResult;
namespace proton {
@@ -30,14 +31,19 @@ TransportMerger::mergeResult(ResultUP result, bool documentWasFound) {
}
}
+Result::UP
+TransportMerger::merge(ResultUP accum, ResultUP incomming, bool documentWasFound) {
+ return documentWasFound ? std::move(incomming) : std::move(accum);
+}
+
void
TransportMerger::mergeWithLock(ResultUP result, bool documentWasFound) {
if (!_result) {
_result = std::move(result);
} else if (result->hasError()) {
_result = std::make_unique<Result>(mergeErrorResults(*_result, *result));
- } else if (documentWasFound) {
- _result = std::move(result);
+ } else {
+ _result = merge(std::move(_result), std::move(result), documentWasFound);
}
completeIfDone();
}
@@ -93,4 +99,11 @@ AsyncTransportContext::send(ResultUP result, bool documentWasFound)
mergeResult(std::move(result), documentWasFound);
}
+Result::UP
+AsyncRemoveTransportContext::merge(ResultUP accum, ResultUP incomming, bool) {
+ // TODO This can be static cast if necessary.
+ dynamic_cast<RemoveResult *>(accum.get())->inc_num_removed(dynamic_cast<RemoveResult *>(incomming.get())->num_removed());
+ return accum;
+}
+
} // proton
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
index 91cadfedcd5..9e83102f464 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/transport_latch.h
@@ -23,6 +23,7 @@ protected:
~TransportMerger() override;
void mergeResult(ResultUP result, bool documentWasFound);
virtual void completeIfDone() { } // Called with lock held if necessary on every merge
+ virtual ResultUP merge(ResultUP accum, ResultUP incomming, bool documentWasFound);
ResultUP _result;
private:
@@ -47,15 +48,10 @@ public:
void await() {
_latch.await();
}
- const UpdateResult &getUpdateResult() const {
- return dynamic_cast<const UpdateResult &>(*_result);
- }
+
const Result &getResult() const {
return *_result;
}
- const RemoveResult &getRemoveResult() const {
- return dynamic_cast<const RemoveResult &>(*_result);
- }
};
@@ -77,5 +73,11 @@ public:
void send(ResultUP result, bool documentWasFound) override;
};
-} // namespace proton
+class AsyncRemoveTransportContext : public AsyncTransportContext {
+public:
+ using AsyncTransportContext::AsyncTransportContext;
+protected:
+ ResultUP merge(ResultUP accum, ResultUP incomming, bool documentWasFound) override;
+};
+}
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 02b43a32df3..1c47170de6c 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -106,12 +106,14 @@ PersistenceProviderWrapper::putAsync(const spi::Bucket& bucket, spi::Timestamp t
}
void
-PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, spi::Timestamp timestamp, const spi::DocumentId& id,
+PersistenceProviderWrapper::removeAsync(const spi::Bucket& bucket, std::vector<TimeStampAndDocumentId> ids,
spi::Context& context, spi::OperationComplete::UP onComplete)
{
- LOG_SPI("remove(" << bucket << ", " << timestamp << ", " << id << ")");
+ for (const TimeStampAndDocumentId & stampedId : ids) {
+ LOG_SPI("remove(" << bucket << ", " << stampedId.first << ", " << stampedId.second << ")");
+ }
CHECK_ERROR_ASYNC(spi::RemoveResult, FAIL_REMOVE, onComplete);
- _spi.removeAsync(bucket, timestamp, id, context, std::move(onComplete));
+ _spi.removeAsync(bucket, std::move(ids), context, std::move(onComplete));
}
void
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index cfc7002a643..1552a955221 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -100,7 +100,7 @@ public:
spi::BucketIdListResult listBuckets(BucketSpace bucketSpace) const override;
spi::BucketInfoResult getBucketInfo(const spi::Bucket&) const override;
void putAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&, spi::OperationComplete::UP) override;
- void removeAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId> ids, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&, spi::OperationComplete::UP) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
diff --git a/storage/src/tests/persistence/processalltest.cpp b/storage/src/tests/persistence/processalltest.cpp
index 0c862f11b05..51fe4ef4bd3 100644
--- a/storage/src/tests/persistence/processalltest.cpp
+++ b/storage/src/tests/persistence/processalltest.cpp
@@ -41,11 +41,14 @@ TEST_F(ProcessAllHandlerTest, remove_location) {
AsyncHandler handler(getEnv(), getPersistenceProvider(), _bucketOwnershipNotifier, *_sequenceTaskExecutor, _bucketIdFactory);
auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ std::shared_ptr<api::StorageMessage> msg;
+ ASSERT_TRUE(_replySender.queue.getNext(msg, 60s));
+
EXPECT_EQ("DocEntry(1234, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(2345, 1, id:mail:testdoctype1:n=4:4008.html)\n",
dumpBucket(bucketId));
- auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP());
+ auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg);
ASSERT_TRUE(reply);
EXPECT_EQ(2u, reply->documents_removed());
}
@@ -65,6 +68,9 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
auto cmd = std::make_shared<api::RemoveLocationCommand>("testdoctype1.headerval % 2 == 0", bucket);
auto tracker = handler.handleRemoveLocation(*cmd, createTracker(cmd, bucket));
+ std::shared_ptr<api::StorageMessage> msg;
+ ASSERT_TRUE(_replySender.queue.getNext(msg, 60s));
+
EXPECT_EQ("DocEntry(100, 1, id:mail:testdoctype1:n=4:3619.html)\n"
"DocEntry(101, 0, Doc(id:mail:testdoctype1:n=4:33113.html))\n"
"DocEntry(102, 1, id:mail:testdoctype1:n=4:62608.html)\n"
@@ -77,7 +83,7 @@ TEST_F(ProcessAllHandlerTest, remove_location_document_subset) {
"DocEntry(109, 0, Doc(id:mail:testdoctype1:n=4:6925.html))\n",
dumpBucket(bucketId));
- auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(std::move(*tracker).stealReplySP());
+ auto reply = std::dynamic_pointer_cast<api::RemoveLocationReply>(msg);
ASSERT_TRUE(reply);
EXPECT_EQ(5u, reply->documents_removed());
}
diff --git a/storage/src/vespa/storage/persistence/asynchandler.cpp b/storage/src/vespa/storage/persistence/asynchandler.cpp
index aac2b0748c4..b5161673af3 100644
--- a/storage/src/vespa/storage/persistence/asynchandler.cpp
+++ b/storage/src/vespa/storage/persistence/asynchandler.cpp
@@ -413,20 +413,14 @@ AsyncHandler::handleRemoveLocation(api::RemoveLocationCommand& cmd, MessageTrack
std::make_shared<document::DocIdOnly>(),
processor, spi::NEWEST_DOCUMENT_ONLY,tracker->context());
- std::vector<std::future<std::unique_ptr<spi::Result>>> results;
- results.reserve(to_remove.size());
- for (auto & entry : to_remove) {
- auto catcher = std::make_unique<spi::CatchResult>();
- results.push_back(catcher->future_result());
- _spi.removeAsync(bucket, entry.first, entry.second, tracker->context(), std::move(catcher));
- }
- for (auto & future : results) {
- auto result = future.get();
- if (result->getErrorCode() != spi::Result::ErrorType::NONE) {
- throw std::runtime_error(fmt("Failed to do remove for removelocation: %s", result->getErrorMessage().c_str()));
- }
- }
- tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, to_remove.size()));
+ auto task = makeResultTask([&cmd, tracker = std::move(tracker), removed = to_remove.size()](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->setReply(std::make_shared<api::RemoveLocationReply>(cmd, removed));
+ tracker->sendReply();
+ });
+
+ _spi.removeAsync(bucket, std::move(to_remove), tracker->context(),
+ std::make_unique<ResultTaskOperationDone>(_sequencedExecutor, cmd.getBucketId(), std::move(task)));
return tracker;
}
diff --git a/storage/src/vespa/storage/persistence/mergehandler.cpp b/storage/src/vespa/storage/persistence/mergehandler.cpp
index 7d7c8b79e0c..254a26aa454 100644
--- a/storage/src/vespa/storage/persistence/mergehandler.cpp
+++ b/storage/src/vespa/storage/persistence/mergehandler.cpp
@@ -506,9 +506,10 @@ MergeHandler::applyDiffEntry(std::shared_ptr<ApplyBucketDiffState> async_results
auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), std::move(docId), "put", _clock, _env._metrics.merge_handler_metrics.put_latency);
_spi.putAsync(bucket, timestamp, std::move(doc), context, std::move(complete));
} else {
- DocumentId docId(e._docName);
- auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), docId, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
- _spi.removeAsync(bucket, timestamp, docId, context, std::move(complete));
+ std::vector<spi::PersistenceProvider::TimeStampAndDocumentId> ids;
+ ids.emplace_back(timestamp, e._docName);
+ auto complete = std::make_unique<ApplyBucketDiffEntryComplete>(std::move(async_results), ids[0].second, "remove", _clock, _env._metrics.merge_handler_metrics.remove_latency);
+ _spi.removeAsync(bucket, std::move(ids), context, std::move(complete));
}
}
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 9ccd901744b..752c1175f22 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -154,11 +154,11 @@ ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi
}
void
-ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId,
+ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, std::vector<TimeStampAndDocumentId> ids,
spi::Context & context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
- _impl.removeAsync(bucket, ts, docId, context, std::move(onComplete));
+ _impl.removeAsync(bucket, std::move(ids), context, std::move(onComplete));
}
void
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 14d20cf8a52..7285c405d5c 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -58,7 +58,7 @@ public:
void register_error_listener(std::shared_ptr<ProviderErrorListener> listener);
void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override;
- void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void removeAsync(const spi::Bucket&, std::vector<TimeStampAndDocumentId>, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
void setActiveStateAsync(const spi::Bucket& b, spi::BucketInfo::ActiveState newState, spi::OperationComplete::UP onComplete) override;