summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 10:06:41 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 10:06:41 +0000
commitbbfebdcf506f8b638ce40b28e47c3657cf002055 (patch)
treea176b0bf30c7e74e3e792fa393ec3ba805c789cd /storage
parent2b83b031718d466df9fc0c4abd89f7fe0bcdbbf8 (diff)
Add async update and followup on PR comments.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.cpp4
-rw-r--r--storage/src/tests/persistence/common/persistenceproviderwrapper.h5
-rw-r--r--storage/src/tests/persistence/testandsettest.cpp8
-rw-r--r--storage/src/vespa/storage/persistence/persistencethread.cpp87
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.cpp44
-rw-r--r--storage/src/vespa/storage/persistence/provider_error_wrapper.h3
6 files changed, 79 insertions, 72 deletions
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
index 862d1fb758a..13a254e7a76 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.cpp
@@ -124,12 +124,12 @@ PersistenceProviderWrapper::removeIfFound(const spi::Bucket& bucket,
spi::UpdateResult
PersistenceProviderWrapper::update(const spi::Bucket& bucket,
spi::Timestamp timestamp,
- const document::DocumentUpdate::SP& upd,
+ document::DocumentUpdate::SP upd,
spi::Context& context)
{
LOG_SPI("update(" << bucket << ", " << timestamp << ", " << upd->getId() << ")");
CHECK_ERROR(spi::UpdateResult, FAIL_UPDATE);
- return _spi.update(bucket, timestamp, upd, context);
+ return _spi.update(bucket, timestamp, std::move(upd), context);
}
spi::GetResult
diff --git a/storage/src/tests/persistence/common/persistenceproviderwrapper.h b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
index 25365e64bfc..9ca08becabe 100644
--- a/storage/src/tests/persistence/common/persistenceproviderwrapper.h
+++ b/storage/src/tests/persistence/common/persistenceproviderwrapper.h
@@ -91,9 +91,8 @@ public:
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const spi::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
- spi::GetResult get(const spi::Bucket&, const document::FieldSet&,
- const spi::DocumentId&, spi::Context&) const override ;
+ spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
+ spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const spi::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
diff --git a/storage/src/tests/persistence/testandsettest.cpp b/storage/src/tests/persistence/testandsettest.cpp
index ca441c71816..578a90081c7 100644
--- a/storage/src/tests/persistence/testandsettest.cpp
+++ b/storage/src/tests/persistence/testandsettest.cpp
@@ -171,7 +171,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_on_condition_mismatch) {
putTestDocument(false, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId), dumpBucket(BUCKET_ID));
@@ -184,7 +184,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_on_condition_match) {
putTestDocument(true, timestampOne);
auto updateUp = conditional_update_test(false, timestampTwo);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(timestampOne, testDocId) +
expectedDocEntryString(timestampTwo, testDocId),
dumpBucket(BUCKET_ID));
@@ -196,7 +196,7 @@ TEST_F(TestAndSetTest, conditional_update_not_executed_when_no_document_and_no_a
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(false, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(),
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(),
api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED);
EXPECT_EQ("", dumpBucket(BUCKET_ID));
}
@@ -205,7 +205,7 @@ TEST_F(TestAndSetTest, conditional_update_executed_when_no_document_but_auto_cre
api::Timestamp updateTimestamp = 200;
auto updateUp = conditional_update_test(true, updateTimestamp);
- ASSERT_EQ(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))->getResult().getResult(), api::ReturnCode::Result::OK);
+ ASSERT_EQ(fetchResult(thread->handleUpdate(*updateUp, createTracker(updateUp, BUCKET))).getResult(), api::ReturnCode::Result::OK);
EXPECT_EQ(expectedDocEntryString(updateTimestamp, testDocId), dumpBucket(BUCKET_ID));
assertTestDocumentFoundAndMatchesContent(NEW_CONTENT);
}
diff --git a/storage/src/vespa/storage/persistence/persistencethread.cpp b/storage/src/vespa/storage/persistence/persistencethread.cpp
index f281f14e884..a77eea1073e 100644
--- a/storage/src/vespa/storage/persistence/persistencethread.cpp
+++ b/storage/src/vespa/storage/persistence/persistencethread.cpp
@@ -176,19 +176,17 @@ PersistenceThread::handlePut(api::PutCommand& cmd, MessageTracker::UP trackerUP)
return trackerUP;
}
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
if (_sequencedExecutor == nullptr) {
- spi::Result response = _spi.put(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),
- tracker.context());
+ spi::Result response = _spi.put(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()),tracker.context());
tracker.checkForError(response);
} else {
- _spi.putAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()), spi::Timestamp(cmd.getTimestamp()),
- std::move(cmd.getDocument()), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(),
- makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
- tracker->checkForError(*response);
- tracker->sendReply();
- })));
+ auto task = makeResultTask([tracker = std::move(trackerUP)](spi::Result::UP response) {
+ tracker->checkForError(*response);
+ tracker->sendReply();
+ });
+ _spi.putAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getDocument()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
return trackerUP;
}
@@ -205,10 +203,9 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac
return trackerUP;
}
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
if (_sequencedExecutor == nullptr) {
- spi::RemoveResult response = _spi.removeIfFound(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),
- tracker.context());
+ spi::RemoveResult response = _spi.removeIfFound(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(),tracker.context());
if (tracker.checkForError(response)) {
tracker.setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
}
@@ -216,42 +213,56 @@ PersistenceThread::handleRemove(api::RemoveCommand& cmd, MessageTracker::UP trac
metrics.notFound.inc();
}
} else {
- _spi.removeIfFoundAsync(getBucket(cmd.getDocumentId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
- std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(),
- makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
- const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
- if (tracker->checkForError(response)) {
- tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
- }
- if (!response.wasFound()) {
- metrics.notFound.inc();
- }
- tracker->sendReply();
- })));
+ auto task = makeResultTask([&metrics, &cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ const spi::RemoveResult & response = dynamic_cast<const spi::RemoveResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ tracker->setReply(std::make_shared<api::RemoveReply>(cmd, response.wasFound() ? cmd.getTimestamp() : 0));
+ }
+ if (!response.wasFound()) {
+ metrics.notFound.inc();
+ }
+ tracker->sendReply();
+ });
+ _spi.removeIfFoundAsync(bucket, spi::Timestamp(cmd.getTimestamp()), cmd.getDocumentId(), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
return trackerUP;
}
MessageTracker::UP
-PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP tracker)
+PersistenceThread::handleUpdate(api::UpdateCommand& cmd, MessageTracker::UP trackerUP)
{
+ MessageTracker & tracker = *trackerUP;
auto& metrics = _env._metrics.update[cmd.getLoadType()];
- tracker->setMetric(metrics);
+ tracker.setMetric(metrics);
metrics.request_size.addValue(cmd.getApproxByteSize());
- if (tasConditionExists(cmd) && !tasConditionMatches(cmd, *tracker, tracker->context(), cmd.getUpdate()->getCreateIfNonExistent())) {
- return tracker;
+ if (tasConditionExists(cmd) && !tasConditionMatches(cmd, tracker, tracker.context(), cmd.getUpdate()->getCreateIfNonExistent())) {
+ return trackerUP;
}
-
- spi::UpdateResult response = _spi.update(getBucket(cmd.getUpdate()->getId(), cmd.getBucket()),
- spi::Timestamp(cmd.getTimestamp()), cmd.getUpdate(), tracker->context());
- if (tracker->checkForError(response)) {
- auto reply = std::make_shared<api::UpdateReply>(cmd);
- reply->setOldTimestamp(response.getExistingTimestamp());
- tracker->setReply(std::move(reply));
+
+ spi::Bucket bucket = getBucket(cmd.getDocumentId(), cmd.getBucket());
+ if (_sequencedExecutor == nullptr) {
+ spi::UpdateResult response = _spi.update(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()),tracker.context());
+ if (tracker.checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker.setReply(std::move(reply));
+ }
+ } else {
+ auto task = makeResultTask([&cmd, tracker = std::move(trackerUP)](spi::Result::UP responseUP) {
+ const spi::UpdateResult & response = dynamic_cast<const spi::UpdateResult &>(*responseUP);
+ if (tracker->checkForError(response)) {
+ auto reply = std::make_shared<api::UpdateReply>(cmd);
+ reply->setOldTimestamp(response.getExistingTimestamp());
+ tracker->setReply(std::move(reply));
+ }
+ tracker->sendReply();
+ });
+ _spi.updateAsync(bucket, spi::Timestamp(cmd.getTimestamp()), std::move(cmd.getUpdate()), tracker.context(),
+ std::make_unique<ResultTaskOperationDone>(*_sequencedExecutor, cmd.getBucketId(), std::move(task)));
}
- return tracker;
+ return trackerUP;
}
namespace {
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
index 204d32b00ca..a5564282d17 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.cpp
@@ -84,47 +84,35 @@ ProviderErrorWrapper::put(const spi::Bucket& bucket, spi::Timestamp ts, spi::Doc
}
spi::RemoveResult
-ProviderErrorWrapper::remove(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const document::DocumentId& docId,
- spi::Context& context)
+ProviderErrorWrapper::remove(const spi::Bucket& bucket, spi::Timestamp ts, const document::DocumentId& docId, spi::Context& context)
{
return checkResult(_impl.remove(bucket, ts, docId, context));
}
spi::RemoveResult
-ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const document::DocumentId& docId,
- spi::Context& context)
+ProviderErrorWrapper::removeIfFound(const spi::Bucket& bucket, spi::Timestamp ts,
+ const document::DocumentId& docId, spi::Context& context)
{
return checkResult(_impl.removeIfFound(bucket, ts, docId, context));
}
spi::UpdateResult
-ProviderErrorWrapper::update(const spi::Bucket& bucket,
- spi::Timestamp ts,
- const spi::DocumentUpdateSP& docUpdate,
- spi::Context& context)
+ProviderErrorWrapper::update(const spi::Bucket& bucket, spi::Timestamp ts,
+ spi::DocumentUpdateSP docUpdate, spi::Context& context)
{
- return checkResult(_impl.update(bucket, ts, docUpdate, context));
+ return checkResult(_impl.update(bucket, ts, std::move(docUpdate), context));
}
spi::GetResult
-ProviderErrorWrapper::get(const spi::Bucket& bucket,
- const document::FieldSet& fieldSet,
- const document::DocumentId& docId,
- spi::Context& context) const
+ProviderErrorWrapper::get(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
+ const document::DocumentId& docId, spi::Context& context) const
{
return checkResult(_impl.get(bucket, fieldSet, docId, context));
}
spi::CreateIteratorResult
-ProviderErrorWrapper::createIterator(const spi::Bucket& bucket,
- const document::FieldSet& fieldSet,
- const spi::Selection& selection,
- spi::IncludedVersions versions,
- spi::Context& context)
+ProviderErrorWrapper::createIterator(const spi::Bucket& bucket, const document::FieldSet& fieldSet,
+ const spi::Selection& selection, spi::IncludedVersions versions, spi::Context& context)
{
return checkResult(_impl.createIterator(bucket, fieldSet, selection, versions, context));
}
@@ -187,7 +175,7 @@ ProviderErrorWrapper::removeEntry(const spi::Bucket& bucket, spi::Timestamp ts,
void
ProviderErrorWrapper::putAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentSP doc,
- spi::Context &context, spi::OperationComplete::UP onComplete)
+ spi::Context &context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
_impl.putAsync(bucket, ts, std::move(doc), context, std::move(onComplete));
@@ -203,10 +191,18 @@ ProviderErrorWrapper::removeAsync(const spi::Bucket &bucket, spi::Timestamp ts,
void
ProviderErrorWrapper::removeIfFoundAsync(const spi::Bucket &bucket, spi::Timestamp ts, const document::DocumentId &docId,
- spi::Context & context, spi::OperationComplete::UP onComplete)
+ spi::Context & context, spi::OperationComplete::UP onComplete)
{
onComplete->addResultHandler(this);
_impl.removeIfFoundAsync(bucket, ts, docId, context, std::move(onComplete));
}
+void
+ProviderErrorWrapper::updateAsync(const spi::Bucket &bucket, spi::Timestamp ts, spi::DocumentUpdateSP upd,
+ spi::Context &context, spi::OperationComplete::UP onComplete)
+{
+ onComplete->addResultHandler(this);
+ _impl.updateAsync(bucket, ts, std::move(upd), context, std::move(onComplete));
+}
+
} // ns storage
diff --git a/storage/src/vespa/storage/persistence/provider_error_wrapper.h b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
index 4d25de3cdab..602877e0b02 100644
--- a/storage/src/vespa/storage/persistence/provider_error_wrapper.h
+++ b/storage/src/vespa/storage/persistence/provider_error_wrapper.h
@@ -50,7 +50,7 @@ public:
spi::Result put(const spi::Bucket&, spi::Timestamp, spi::DocumentSP, spi::Context&) override;
spi::RemoveResult remove(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
spi::RemoveResult removeIfFound(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&) override;
- spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, const spi::DocumentUpdateSP&, spi::Context&) override;
+ spi::UpdateResult update(const spi::Bucket&, spi::Timestamp, spi::DocumentUpdateSP, spi::Context&) override;
spi::GetResult get(const spi::Bucket&, const document::FieldSet&, const document::DocumentId&, spi::Context&) const override;
spi::CreateIteratorResult createIterator(const spi::Bucket&, const document::FieldSet&, const spi::Selection&,
spi::IncludedVersions versions, spi::Context&) override;
@@ -76,6 +76,7 @@ public:
void putAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentSP, spi::Context &, spi::OperationComplete::UP) override;
void removeAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
void removeIfFoundAsync(const spi::Bucket&, spi::Timestamp, const document::DocumentId&, spi::Context&, spi::OperationComplete::UP) override;
+ void updateAsync(const spi::Bucket &, spi::Timestamp, spi::DocumentUpdateSP, spi::Context &, spi::OperationComplete::UP) override;
private:
template <typename ResultType>