summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 10:06:41 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 10:06:41 +0000
commitbbfebdcf506f8b638ce40b28e47c3657cf002055 (patch)
treea176b0bf30c7e74e3e792fa393ec3ba805c789cd
parent2b83b031718d466df9fc0c4abd89f7fe0bcdbbf8 (diff)
Add async update and followup on PR comments.
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp39
-rw-r--r--persistence/src/vespa/persistence/dummyimpl/dummypersistence.h1
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp35
-rw-r--r--persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h6
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.cpp16
-rw-r--r--persistence/src/vespa/persistence/spi/persistenceprovider.h3
-rw-r--r--searchcore/src/apps/proton/downpersistence.cpp2
-rw-r--r--searchcore/src/apps/proton/downpersistence.h2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h2
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp4
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h5
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp87
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h3
17 files changed, 171 insertions, 158 deletions
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
index 984bbcd845f..5720a0ba662 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.cpp
@@ -4,6 +4,7 @@
#include <vespa/document/select/parser.h>
#include <vespa/document/base/documentid.h>
#include <vespa/document/fieldvalue/document.h>
+#include <vespa/document/update/documentupdate.h>
#include <vespa/document/bucket/fixed_bucket_spaces.h>
#include <vespa/vespalib/util/crc.h>
#include <vespa/document/fieldset/fieldsetrepo.h>
@@ -30,7 +31,7 @@ BucketContent::BucketContent()
_outdatedInfo(true),
_active(false)
{ }
-BucketContent::~BucketContent() { }
+BucketContent::~BucketContent() = default;
uint32_t
BucketContent::computeEntryChecksum(const BucketEntry& e) const
@@ -306,11 +307,10 @@ DummyPersistence::DummyPersistence(
_clusterState()
{}
-DummyPersistence::~DummyPersistence() {}
+DummyPersistence::~DummyPersistence() = default;
document::select::Node::UP
-DummyPersistence::parseDocumentSelection(const string& documentSelection,
- bool allowLeaf)
+DummyPersistence::parseDocumentSelection(const string& documentSelection, bool allowLeaf)
{
document::select::Node::UP ret;
try {
@@ -464,6 +464,37 @@ DummyPersistence::put(const Bucket& b, Timestamp t, Document::SP doc, Context&)
return Result();
}
+UpdateResult
+DummyPersistence::update(const Bucket& bucket, Timestamp ts, DocumentUpdateSP upd, Context& context)
+{
+ GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context);
+
+ if (getResult.hasError()) {
+ return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage());
+ }
+
+ auto docToUpdate = getResult.getDocumentPtr();
+ Timestamp updatedTs = getResult.getTimestamp();
+ if (!docToUpdate) {
+ if (!upd->getCreateIfNonExistent()) {
+ return UpdateResult();
+ } else {
+ docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId());
+ updatedTs = ts;
+ }
+ }
+
+ upd->applyTo(*docToUpdate);
+
+ Result putResult = put(bucket, ts, std::move(docToUpdate), context);
+
+ if (putResult.hasError()) {
+ return UpdateResult(putResult.getErrorCode(), putResult.getErrorMessage());
+ }
+
+ return UpdateResult(updatedTs);
+}
+
RemoveResult
DummyPersistence::remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&)
{
diff --git a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
index 27ed95bd6ee..94c2e1cd9a4 100644
--- a/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
+++ b/persistence/src/vespa/persistence/dummyimpl/dummypersistence.h
@@ -156,6 +156,7 @@ public:
Result put(const Bucket&, Timestamp, DocumentSP, Context&) override;
GetResult get(const Bucket&, const document::FieldSet&, const DocumentId&, Context&) const override;
RemoveResult remove(const Bucket& b, Timestamp t, const DocumentId& did, Context&) override;
+ UpdateResult update(const Bucket&, Timestamp, DocumentUpdateSP, Context&) override;
CreateIteratorResult createIterator(const Bucket&,
const document::FieldSet& fs,
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
index 99a8b244431..45c2c7901f9 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.cpp
@@ -8,39 +8,6 @@
namespace storage::spi {
-UpdateResult
-AbstractPersistenceProvider::update(const Bucket& bucket, Timestamp ts,
- const DocumentUpdate::SP& upd, Context& context)
-{
- GetResult getResult = get(bucket, document::AllFields(), upd->getId(), context);
-
- if (getResult.hasError()) {
- return UpdateResult(getResult.getErrorCode(), getResult.getErrorMessage());
- }
-
- auto docToUpdate = getResult.getDocumentPtr();
- Timestamp updatedTs = getResult.getTimestamp();
- if (!docToUpdate) {
- if (!upd->getCreateIfNonExistent()) {
- return UpdateResult();
- } else {
- docToUpdate = std::make_shared<document::Document>(upd->getType(), upd->getId());
- updatedTs = ts;
- }
- }
-
- upd->applyTo(*docToUpdate);
-
- Result putResult = put(bucket, ts, std::move(docToUpdate), context);
-
- if (putResult.hasError()) {
- return UpdateResult(putResult.getErrorCode(),
- putResult.getErrorMessage());
- }
-
- return UpdateResult(updatedTs);
-}
-
RemoveResult
AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp,
const DocumentId& id, Context& context)
@@ -50,7 +17,7 @@ AbstractPersistenceProvider::removeIfFound(const Bucket& b, Timestamp timestamp,
void
AbstractPersistenceProvider::removeIfFoundAsync(const Bucket& b, Timestamp timestamp,
- const DocumentId& id, Context& context, OperationComplete::UP onComplete)
+ const DocumentId& id, Context& context, OperationComplete::UP onComplete)
{
removeAsync(b, timestamp, id, context, std::move(onComplete));
}
diff --git a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
index 9de96944c31..813050222a9 100644
--- a/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/abstractpersistenceprovider.h
@@ -20,12 +20,6 @@ public:
Result initialize() override { return Result(); };
/**
- * Updates the document by calling get(), updating the document,
- * then calling put() on the result.
- */
- UpdateResult update(const Bucket&, Timestamp, const DocumentUpdateSP&, Context&) override;
-
- /**
* Default impl empty.
*/
Result createBucket(const Bucket&, Context&) override { return Result(); }
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
index b00ae340fa6..38fcd2fd072 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.cpp
@@ -74,4 +74,20 @@ PersistenceProvider::removeIfFoundAsync(const Bucket &bucket, Timestamp timestam
onComplete->onComplete(std::make_unique<RemoveResult>(result));
}
+UpdateResult
+PersistenceProvider::update(const Bucket& bucket, Timestamp timestamp, DocumentUpdateSP upd, Context& context) {
+ auto catcher = std::make_unique<CatchResult>();
+ auto future = catcher->future_result();
+ updateAsync(bucket, timestamp, std::move(upd), context, std::move(catcher));
+ return dynamic_cast<const UpdateResult &>(*future.get());
+}
+
+void
+PersistenceProvider::updateAsync(const Bucket &bucket, Timestamp timestamp, DocumentUpdateSP upd, Context &context,
+ OperationComplete::UP onComplete)
+{
+ UpdateResult result = update(bucket, timestamp, std::move(upd), context);
+ onComplete->onComplete(std::make_unique<UpdateResult>(result));
+}
+
}
diff --git a/persistence/src/vespa/persistence/spi/persistenceprovider.h b/persistence/src/vespa/persistence/spi/persistenceprovider.h
index 6a5e3e05933..2e7d215fc33 100644
--- a/persistence/src/vespa/persistence/spi/persistenceprovider.h
+++ b/persistence/src/vespa/persistence/spi/persistenceprovider.h
@@ -204,7 +204,8 @@ struct PersistenceProvider
* @param timestamp The timestamp to use for the new update entry.
* @param update The document update to apply to the stored document.
*/
- virtual UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) = 0;
+ virtual UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&);
+ virtual void updateAsync(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&, OperationComplete::UP);
/**
* Retrieves the latest version of the document specified by the
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp
index 8d8665b645e..aa87c383c33 100644
--- a/searchcore/src/apps/proton/downpersistence.cpp
+++ b/searchcore/src/apps/proton/downpersistence.cpp
@@ -83,7 +83,7 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&)
return errorResult;
}
-UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&)
+UpdateResult DownPersistence::update(const Bucket&, Timestamp, DocumentUpdate::SP, Context&)
{
return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h
index 2716b0ee3b4..10e3d9c1ad7 100644
--- a/searchcore/src/apps/proton/downpersistence.h
+++ b/searchcore/src/apps/proton/downpersistence.h
@@ -36,7 +36,7 @@ public:
RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
Result removeEntry(const Bucket&, Timestamp, Context&) override;
- UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override;
+ UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&) override;
GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet,
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 4fb4abcb7c5..2927457a6a5 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -458,12 +458,12 @@ TEST_F("require that puts are routed to handler", SimpleFixture)
storage::spi::LoadType loadType(0, "default");
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
f.engine.put(bucket1, tstamp1, doc1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
f.engine.put(bucket1, tstamp1, doc2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"),
f.engine.put(bucket1, tstamp1, doc3, context));
@@ -490,14 +490,14 @@ TEST_F("require that updates are routed to handler", SimpleFixture)
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
f.hset.handler1.setExistingTimestamp(tstamp2);
UpdateResult ur = f.engine.update(bucket1, tstamp1, upd1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_EQUAL(tstamp2, ur.getExistingTimestamp());
f.hset.handler2.setExistingTimestamp(tstamp3);
ur = f.engine.update(bucket1, tstamp1, upd2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_EQUAL(tstamp3, ur.getExistingTimestamp());
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"),
@@ -533,31 +533,31 @@ TEST_F("require that removes are routed to handlers", SimpleFixture)
storage::spi::LoadType loadType(0, "default");
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
RemoveResult rr = f.engine.remove(bucket1, tstamp1, docId3, context);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_FALSE(rr.wasFound());
EXPECT_TRUE(rr.hasError());
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), rr);
f.hset.handler1.setExistingTimestamp(tstamp2);
rr = f.engine.remove(bucket1, tstamp1, docId1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_TRUE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
f.hset.handler1.setExistingTimestamp(tstamp0);
f.hset.handler2.setExistingTimestamp(tstamp3);
rr = f.engine.remove(bucket1, tstamp1, docId2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_TRUE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
f.hset.handler2.setExistingTimestamp(tstamp0);
rr = f.engine.remove(bucket1, tstamp1, docId2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_FALSE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 1b64578607f..20c32e0048a 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -368,34 +368,34 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d
}
-PersistenceEngine::UpdateResult
-PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP& upd, Context&)
+void
+PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd, Context&, OperationComplete::UP onComplete)
{
if (!_writeFilter.acceptWriteOperation()) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation()) {
- return UpdateResult(Result::ErrorType::RESOURCE_EXHAUSTED,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED,
make_string("Update operation rejected for document '%s': '%s'",
- upd->getId().toString().c_str(), state.message().c_str()));
+ upd->getId().toString().c_str(), state.message().c_str())));
}
}
try {
upd->eagerDeserialize();
} catch (document::FieldNotFoundException & e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'",
- upd->getId().toString().c_str(), upd->getType().getName().c_str()));
+ upd->getId().toString().c_str(), upd->getType().getName().c_str())));
} catch (document::DocumentTypeNotFoundException & e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s'.",
- upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()));
+ upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())));
} catch (document::WrongTensorTypeException &e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'",
upd->getId().toString().c_str(),
upd->getType().getName().c_str(),
- e.getMessage().c_str()));
+ e.getMessage().c_str())));
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(upd->getType());
@@ -403,24 +403,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(),
upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false"));
if (!upd->getId().hasDocType()) {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())));
}
if (upd->getId().getDocType() != docType.getName()) {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())));
}
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
- if (handler) {
- TransportLatch latch(1);
- LOG(debug, "update = %s", upd->toXml().c_str());
- handler->handleUpdate(feedtoken::make(latch), b, t, upd);
- latch.await();
- return latch.getUpdateResult();
- } else {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()));
+ if ( handler == nullptr) {
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())));
}
+ auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd));
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index a5a0c164499..230f8c411aa 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -103,7 +103,7 @@ public:
BucketInfoResult getBucketInfo(const Bucket&) const override;
void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override;
- UpdateResult update(const Bucket&, Timestamp, const std::shared_ptr<document::DocumentUpdate>&, Context&) override;
+ void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override;
GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet&, const Selection&,
IncludedVersions, Context&) override;
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 862d1fb758a..13a254e7a76 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -124,12 +124,12 @@ PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket,
spi::UpdateResult
PersistenceProviderWrapper::update(const spi::Bucket& bucket,
spi::Timestamp timestamp,
- const document::DocumentUpdate::SP& upd,
+ document::DocumentUpdate::SP upd,
spi::Context& context)
{
LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")");
CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE);
- return _spi.update(bucket, timestamp, upd, context);
+ return _spi.update(bucket, timestamp, std::move(upd), context);
}
spi::GetResult
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 25365e64bfc..9ca08becabe 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -91,9 +91,8 @@ public:
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
- spi::GetResult get(const spi::Bucket&, const document::FieldSet&,
- const spi::DocumentId&, spi::Context&) const override ;
+ spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
+ spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index ca441c71816..578a90081c7 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -171,7 +171,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -184,7 +184,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -196,7 +196,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -205,7 +205,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index f281f14e884..a77eea1073e 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -176,19 +176,17 @@ PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP)
return trackerUP;
}
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
if (_sequencedExecutor == nullptr) {
- spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),
- tracker.context());
+ spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context());
tracker.checkForError(response);
} else {
- _spi.putAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), spi::Timestamp(cmd.getTimestamp()),
- std::move(cmd.getDocument()), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(),
- makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
- tracker->checkForError(*response);
- tracker->sendReply();
- })));
+ auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
return trackerUP;
}
@@ -205,10 +203,9 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac
return trackerUP;
}
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
if (_sequencedExecutor == nullptr) {
- spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),
- tracker.context());
+ spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context());
if (tracker.checkForError(response)) {
tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
@@ -216,42 +213,56 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac
metrics.notFound.inc();
}
} else {
- _spi.removeIfFoundAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(),
- makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
- const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
- if (tracker->checkForError(response)) {
- tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- }
- if (!response.wasFound()) {
- metrics.notFound.inc();
- }
- tracker->sendReply();
- })));
+ auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
+ }
+ if (!response.wasFound()) {
+ metrics.notFound.inc();
+ }
+ tracker->sendReply();
+ });
+ _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
return trackerUP;
}
MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker)
+PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP)
{
+ MessageTracker & tracker = *trackerUP;
auto& metrics = _env._metrics.update[cmd.getLoadType()];
- tracker->setMetric(metrics);
+ tracker.setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) {
- return tracker;
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) {
+ return trackerUP;
}
-
- spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context());
- if (tracker->checkForError(response)) {
- auto reply = std::make_shared<api::UpdateReply>(cmd);
- reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(std::move(reply));
+
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
+ if (_sequencedExecutor == nullptr) {
+ spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context());
+ if (tracker.checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker.setReply(std::move(reply));
+ }
+ } else {
+ auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ const spi::UpdateResult & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker->setReply(std::move(reply));
+ }
+ tracker->sendReply();
+ });
+ _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
- return tracker;
+ return trackerUP;
}
namespace {
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 204d32b00ca..a5564282d17 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -84,47 +84,35 @@ ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::Doc
}
spi::RemoveResult
-ProviderErrorWrapper::remove(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const document::DocumentId& docId,
- spi::Context& context)
+ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context)
{
return checkResult(_impl.remove(bucket, ts, docId, context));
}
spi::RemoveResult
-ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const document::DocumentId& docId,
- spi::Context& context)
+ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts,
+ const document::DocumentId& docId, spi::Context& context)
{
return checkResult(_impl.removeIfFound(bucket, ts, docId, context));
}
spi::UpdateResult
-ProviderErrorWrapper::update(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const spi::DocumentUpdateSP& docUpdate,
- spi::Context& context)
+ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts,
+ spi::DocumentUpdateSP docUpdate, spi::Context& context)
{
- return checkResult(_impl.update(bucket, ts, docUpdate, context));
+ return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context));
}
spi::GetResult
-ProviderErrorWrapper::get(const spi::Bucket& bucket,
- const document::FieldSet& fieldSet,
- const document::DocumentId& docId,
- spi::Context& context) const
+ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
+ const document::DocumentId& docId, spi::Context& context) const
{
return checkResult(_impl.get(bucket, fieldSet, docId, context));
}
spi::CreateIteratorResult
-ProviderErrorWrapper::createIterator(const spi::Bucket& bucket,
- const document::FieldSet& fieldSet,
- const spi::Selection& selection,
- spi::IncludedVersions versions,
- spi::Context& context)
+ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
+ const spi::Selection& selection, spi::IncludedVersions versions, spi::Context& context)
{
return checkResult(_impl.createIterator(bucket, fieldSet, selection, versions, context));
}
@@ -187,7 +175,7 @@ ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts,
void
ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc,
- spi::Context &context, spi::OperationComplete::UP onComplete)
+ spi::Context &context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
_impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete));
@@ -203,10 +191,18 @@ ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts,
void
ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId,
- spi::Context & context, spi::OperationComplete::UP onComplete)
+ spi::Context & context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
_impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete));
}
+void
+ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd,
+ spi::Context &context, spi::OperationComplete::UP onComplete)
+{
+ onComplete->addResultHandler(this);
+ _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete));
+}
+
} // ns storage
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 4d25de3cdab..602877e0b02 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -50,7 +50,7 @@ public:
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
+ spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
@@ -76,6 +76,7 @@ public:
void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override;
void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
private:
template <typename ResultType>