diff options
Diffstat (limited to 'searchcore')
5 files changed, 37 insertions, 41 deletions
diff --git a/searchcore/src/apps/proton/downpersistence.cpp b/searchcore/src/apps/proton/downpersistence.cpp index 8d8665b645e..aa87c383c33 100644 --- a/searchcore/src/apps/proton/downpersistence.cpp +++ b/searchcore/src/apps/proton/downpersistence.cpp @@ -83,7 +83,7 @@ DownPersistence::removeEntry(const Bucket&, Timestamp, Context&) return errorResult; } -UpdateResult DownPersistence::update(const Bucket&, Timestamp, const DocumentUpdate::SP&, Context&) +UpdateResult DownPersistence::update(const Bucket&, Timestamp, DocumentUpdate::SP, Context&) { return UpdateResult(errorResult.getErrorCode(), errorResult.getErrorMessage()); } diff --git a/searchcore/src/apps/proton/downpersistence.h b/searchcore/src/apps/proton/downpersistence.h index 2716b0ee3b4..10e3d9c1ad7 100644 --- a/searchcore/src/apps/proton/downpersistence.h +++ b/searchcore/src/apps/proton/downpersistence.h @@ -36,7 +36,7 @@ public: RemoveResult remove(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; RemoveResult removeIfFound(const Bucket&, Timestamp timestamp, const DocumentId& id, Context&) override; Result removeEntry(const Bucket&, Timestamp, Context&) override; - UpdateResult update(const Bucket&, Timestamp timestamp, const DocumentUpdateSP& update, Context&) override; + UpdateResult update(const Bucket&, Timestamp timestamp, DocumentUpdateSP update, Context&) override; GetResult get(const Bucket&, const document::FieldSet& fieldSet, const DocumentId& id, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet& fieldSet, diff --git a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp index 4fb4abcb7c5..2927457a6a5 100644 --- a/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp +++ b/searchcore/src/tests/proton/persistenceengine/persistenceengine_test.cpp @@ -458,12 +458,12 @@ TEST_F("require that puts are routed to handler", SimpleFixture) storage::spi::LoadType loadType(0, "default"); Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); f.engine.put(bucket1, tstamp1, doc1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); f.engine.put(bucket1, tstamp1, doc2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), f.engine.put(bucket1, tstamp1, doc3, context)); @@ -490,14 +490,14 @@ TEST_F("require that updates are routed to handler", SimpleFixture) Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); f.hset.handler1.setExistingTimestamp(tstamp2); UpdateResult ur = f.engine.update(bucket1, tstamp1, upd1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_EQUAL(tstamp2, ur.getExistingTimestamp()); f.hset.handler2.setExistingTimestamp(tstamp3); ur = f.engine.update(bucket1, tstamp1, upd2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_EQUAL(tstamp3, ur.getExistingTimestamp()); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), @@ -533,31 +533,31 @@ TEST_F("require that removes are routed to handlers", SimpleFixture) storage::spi::LoadType loadType(0, "default"); Context context(loadType, storage::spi::Priority(0), storage::spi::Trace::TraceLevel(0)); RemoveResult rr = f.engine.remove(bucket1, tstamp1, docId3, context); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_FALSE(rr.wasFound()); EXPECT_TRUE(rr.hasError()); EXPECT_EQUAL(Result(Result::ErrorType::PERMANENT_ERROR, "No handler for document type 'type3'"), rr); f.hset.handler1.setExistingTimestamp(tstamp2); rr = f.engine.remove(bucket1, tstamp1, docId1, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket0, tstamp0, docId0, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket0, tstamp0, docId0, f.hset.handler2)); EXPECT_TRUE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); f.hset.handler1.setExistingTimestamp(tstamp0); f.hset.handler2.setExistingTimestamp(tstamp3); rr = f.engine.remove(bucket1, tstamp1, docId2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_TRUE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); f.hset.handler2.setExistingTimestamp(tstamp0); rr = f.engine.remove(bucket1, tstamp1, docId2, context); - assertHandler(bucket1, tstamp1, docId1, f.hset.handler1); - assertHandler(bucket1, tstamp1, docId2, f.hset.handler2); + TEST_DO(assertHandler(bucket1, tstamp1, docId1, f.hset.handler1)); + TEST_DO(assertHandler(bucket1, tstamp1, docId2, f.hset.handler2)); EXPECT_FALSE(rr.wasFound()); EXPECT_FALSE(rr.hasError()); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp index 1b64578607f..20c32e0048a 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.cpp @@ -368,34 +368,34 @@ PersistenceEngine::removeAsync(const Bucket& b, Timestamp t, const DocumentId& d } -PersistenceEngine::UpdateResult -PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP& upd, Context&) +void +PersistenceEngine::updateAsync(const Bucket& b, Timestamp t, DocumentUpdate::SP upd, Context&, OperationComplete::UP onComplete) { if (!_writeFilter.acceptWriteOperation()) { IResourceWriteFilter::State state = _writeFilter.getAcceptState(); if (!state.acceptWriteOperation()) { - return UpdateResult(Result::ErrorType::RESOURCE_EXHAUSTED, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::RESOURCE_EXHAUSTED, make_string("Update operation rejected for document '%s': '%s'", - upd->getId().toString().c_str(), state.message().c_str())); + upd->getId().toString().c_str(), state.message().c_str()))); } } try { upd->eagerDeserialize(); } catch (document::FieldNotFoundException & e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'", - upd->getId().toString().c_str(), upd->getType().getName().c_str())); + upd->getId().toString().c_str(), upd->getType().getName().c_str()))); } catch (document::DocumentTypeNotFoundException & e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s'.", - upd->getId().toString().c_str(), e.getDocumentTypeName().c_str())); + upd->getId().toString().c_str(), e.getDocumentTypeName().c_str()))); } catch (document::WrongTensorTypeException &e) { - return UpdateResult(Result::ErrorType::TRANSIENT_ERROR, + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::TRANSIENT_ERROR, make_string("Update operation rejected for document '%s' of type '%s': 'Wrong tensor type: %s'", upd->getId().toString().c_str(), upd->getType().getName().c_str(), - e.getMessage().c_str())); + e.getMessage().c_str()))); } std::shared_lock<std::shared_timed_mutex> rguard(_rwMutex); DocTypeName docType(upd->getType()); @@ -403,24 +403,20 @@ PersistenceEngine::update(const Bucket& b, Timestamp t, const DocumentUpdate::SP b.toString().c_str(), static_cast<uint64_t>(t.getValue()), docType.toString().c_str(), upd->getId().toString().c_str(), (upd->getCreateIfNonExistent() ? "true" : "false")); if (!upd->getId().hasDocType()) { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str())); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Old id scheme not supported in elastic mode (%s)", upd->getId().toString().c_str()))); } if (upd->getId().getDocType() != docType.getName()) { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, - make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str())); + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, + make_string("Update operation rejected due to bad id (%s, %s)", upd->getId().toString().c_str(), docType.getName().c_str()))); } IPersistenceHandler * handler = getHandler(rguard, b.getBucketSpace(), docType); - if (handler) { - TransportLatch latch(1); - LOG(debug, "update = %s", upd->toXml().c_str()); - handler->handleUpdate(feedtoken::make(latch), b, t, upd); - latch.await(); - return latch.getUpdateResult(); - } else { - return UpdateResult(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str())); + if ( handler == nullptr) { + return onComplete->onComplete(std::make_unique<UpdateResult>(Result::ErrorType::PERMANENT_ERROR, make_string("No handler for document type '%s'", docType.toString().c_str()))); } + auto transportContext = std::make_unique<AsyncTranportContext>(1, std::move(onComplete)); + handler->handleUpdate(feedtoken::make(std::move(transportContext)), b, t, std::move(upd)); } diff --git a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h index a5a0c164499..230f8c411aa 100644 --- a/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h +++ b/searchcore/src/vespa/searchcore/proton/persistenceengine/persistenceengine.h @@ -103,7 +103,7 @@ public: BucketInfoResult getBucketInfo(const Bucket&) const override; void putAsync(const Bucket &, Timestamp, storage::spi::DocumentSP, Context &context, OperationComplete::UP) override; void removeAsync(const Bucket&, Timestamp, const document::DocumentId&, Context&, OperationComplete::UP) override; - UpdateResult update(const Bucket&, Timestamp, const std::shared_ptr<document::DocumentUpdate>&, Context&) override; + void updateAsync(const Bucket&, Timestamp, storage::spi::DocumentUpdateSP, Context&, OperationComplete::UP) override; GetResult get(const Bucket&, const document::FieldSet&, const document::DocumentId&, Context&) const override; CreateIteratorResult createIterator(const Bucket&, const document::FieldSet&, const Selection&, IncludedVersions, Context&) override; |