summaryrefslogtreecommitdiffstats
path: root/searchcore
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 10:06:41 +0000
committerHenning Baldersheim <balder@yahoo-inc.com>2020-05-05 10:06:41 +0000
commitbbfebdcf506f8b638ce40b28e47c3657cf002055 (patch)
treea176b0bf30c7e74e3e792fa393ec3ba805c789cd /searchcore
parent2b83b031718d466df9fc0c4abd89f7fe0bcdbbf8 (diff)
Add async update and followup on PR comments.
Diffstat (limited to 'searchcore')
-rw-r--r--searchcore/src/apps/proton/downpersistence.cpp2
-rw-r--r--searchcore/src/apps/proton/downpersistence.h2
-rw-r--r--searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp32
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp40
-rw-r--r--searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h2
5 files changed, 37 insertions, 41 deletions
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp
index 8d8665b645e..aa87c383c33 100644
--- a/searchcore/src/apps/proton/downpersistence.cpp
+++ b/searchcore/src/apps/proton/downpersistence.cpp
@@ -83,7 +83,7 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&)
return errorResult;
}
-UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&)
+UpdateResult DownPersistence::update(const Bucket&, Timestamp, DocumentUpdate::SP, Context&)
{
return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage());
}
diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h
index 2716b0ee3b4..10e3d9c1ad7 100644
--- a/searchcore/src/apps/proton/downpersistence.h
+++ b/searchcore/src/apps/proton/downpersistence.h
@@ -36,7 +36,7 @@ public:
RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override;
Result removeEntry(const Bucket&, Timestamp, Context&) override;
- UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override;
+ UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&) override;
GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet,
diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
index 4fb4abcb7c5..2927457a6a5 100644
--- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
+++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp
@@ -458,12 +458,12 @@ TEST_F("require that puts are routed to handler", SimpleFixture)
storage::spi::LoadType loadType(0, "default");
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
f.engine.put(bucket1, tstamp1, doc1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
f.engine.put(bucket1, tstamp1, doc2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"),
f.engine.put(bucket1, tstamp1, doc3, context));
@@ -490,14 +490,14 @@ TEST_F("require that updates are routed to handler", SimpleFixture)
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
f.hset.handler1.setExistingTimestamp(tstamp2);
UpdateResult ur = f.engine.update(bucket1, tstamp1, upd1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_EQUAL(tstamp2, ur.getExistingTimestamp());
f.hset.handler2.setExistingTimestamp(tstamp3);
ur = f.engine.update(bucket1, tstamp1, upd2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_EQUAL(tstamp3, ur.getExistingTimestamp());
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"),
@@ -533,31 +533,31 @@ TEST_F("require that removes are routed to handlers", SimpleFixture)
storage::spi::LoadType loadType(0, "default");
Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0));
RemoveResult rr = f.engine.remove(bucket1, tstamp1, docId3, context);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_FALSE(rr.wasFound());
EXPECT_TRUE(rr.hasError());
EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), rr);
f.hset.handler1.setExistingTimestamp(tstamp2);
rr = f.engine.remove(bucket1, tstamp1, docId1, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket0, tstamp0, docId0, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2));
EXPECT_TRUE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
f.hset.handler1.setExistingTimestamp(tstamp0);
f.hset.handler2.setExistingTimestamp(tstamp3);
rr = f.engine.remove(bucket1, tstamp1, docId2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_TRUE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
f.hset.handler2.setExistingTimestamp(tstamp0);
rr = f.engine.remove(bucket1, tstamp1, docId2, context);
- assertHandler(bucket1, tstamp1, docId1, f.hset.handler1);
- assertHandler(bucket1, tstamp1, docId2, f.hset.handler2);
+ TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1));
+ TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2));
EXPECT_FALSE(rr.wasFound());
EXPECT_FALSE(rr.hasError());
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
index 1b64578607f..20c32e0048a 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp
@@ -368,34 +368,34 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d
}
-PersistenceEngine::UpdateResult
-PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP& upd, Context&)
+void
+PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd, Context&, OperationComplete::UP onComplete)
{
if (!_writeFilter.acceptWriteOperation()) {
IResourceWriteFilter::State state = _writeFilter.getAcceptState();
if (!state.acceptWriteOperation()) {
- return UpdateResult(Result::ErrorType::RESOURCE_EXHAUSTED,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED,
make_string("Update operation rejected for document '%s': '%s'",
- upd->getId().toString().c_str(), state.message().c_str()));
+ upd->getId().toString().c_str(), state.message().c_str())));
}
}
try {
upd->eagerDeserialize();
} catch (document::FieldNotFoundException & e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'",
- upd->getId().toString().c_str(), upd->getType().getName().c_str()));
+ upd->getId().toString().c_str(), upd->getType().getName().c_str())));
} catch (document::DocumentTypeNotFoundException & e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s'.",
- upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()));
+ upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())));
} catch (document::WrongTensorTypeException &e) {
- return UpdateResult(Result::ErrorType::TRANSIENT_ERROR,
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR,
make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'",
upd->getId().toString().c_str(),
upd->getType().getName().c_str(),
- e.getMessage().c_str()));
+ e.getMessage().c_str())));
}
std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex);
DocTypeName docType(upd->getType());
@@ -403,24 +403,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP
b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(),
upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false"));
if (!upd->getId().hasDocType()) {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())));
}
if (upd->getId().getDocType() != docType.getName()) {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR,
- make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()));
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR,
+ make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())));
}
IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType);
- if (handler) {
- TransportLatch latch(1);
- LOG(debug, "update = %s", upd->toXml().c_str());
- handler->handleUpdate(feedtoken::make(latch), b, t, upd);
- latch.await();
- return latch.getUpdateResult();
- } else {
- return UpdateResult(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()));
+ if ( handler == nullptr) {
+ return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())));
}
+ auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete));
+ handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd));
}
diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
index a5a0c164499..230f8c411aa 100644
--- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
+++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h
@@ -103,7 +103,7 @@ public:
BucketInfoResult getBucketInfo(const Bucket&) const override;
void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override;
void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override;
- UpdateResult update(const Bucket&, Timestamp, const std::shared_ptr<document::DocumentUpdate>&, Context&) override;
+ void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override;
GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override;
CreateIteratorResult createIterator(const Bucket&, const document::FieldSet&, const Selection&,
IncludedVersions, Context&) override;