diff options
author | Tor Egge <Tor.Egge@oath.com> | 2018-05-29 09:40:20 +0000 |
---|---|---|
committer | Tor Egge <Tor.Egge@oath.com> | 2018-05-29 11:05:35 +0000 |
commit | 302146b7ca558f99b5d990abc75c2d0c2ebeeef3 (patch) | |
tree | 7793bcec9bf4f9c5256e6f271a85cffc8ca7e9c0 | |
parent | 6cd4e8945facda874bf1ada7ea8694c2c633f9da (diff) |
Perform fixup on put and update operations using non-current document type repo.
This ensures that document type used do access fields is the same for live
feeding and replay.
7 files changed, 113 insertions, 35 deletions
diff --git a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp index 5c0edce0b94..153b1ae2867 100644 --- a/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp +++ b/searchcore/src/tests/proton/documentdb/feedhandler/feedhandler_test.cpp @@ -2,6 +2,7 @@ #include <vespa/persistence/spi/result.h> #include <vespa/document/update/assignvalueupdate.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/searchcore/proton/bucketdb/bucketdbhandler.h> #include <vespa/searchcore/proton/test/bucketfactory.h> #include <vespa/searchcore/proton/common/feedtoken.h> @@ -35,6 +36,7 @@ LOG_SETUP("feedhandler_test"); using document::BucketId; using document::Document; using document::DocumentId; +using document::DocumentType; using document::DocumentTypeRepo; using document::DocumentUpdate; using document::GlobalId; @@ -181,7 +183,9 @@ struct MyFeedView : public test::DummyFeedView { int prune_removed_count; int update_count; SerialNum update_serial; - MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr); + const DocumentType *documentType; + MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr, + const DocTypeName &docTypeName); ~MyFeedView() override; void resetPutLatch(uint32_t count) { putLatch.reset(new vespalib::CountDownLatch(count)); } void preparePut(PutOperation &op) override { @@ -203,6 +207,8 @@ struct MyFeedView : public test::DummyFeedView { if (usePutRdz) { putRdz.run(); } + EXPECT_EQUAL(_docTypeRepo.get(), putOp.getDocument()->getRepo()); + EXPECT_EQUAL(documentType, &putOp.getDocument()->getType()); ++put_count; put_serial = putOp.getSerialNum(); metaStore.allocate(putOp.getDocument()->getId().getGlobalId()); @@ -216,6 +222,7 @@ struct MyFeedView : public test::DummyFeedView { void handleUpdate(FeedToken token, const UpdateOperation &op) override { (void) token; + EXPECT_EQUAL(documentType, &op.getUpdate()->getType()); ++update_count; update_serial = op.getSerialNum(); } @@ -237,7 +244,7 @@ struct MyFeedView : public test::DummyFeedView { } }; -MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr) +MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr, const DocTypeName &docTypeName) : test::DummyFeedView(dtr), putRdz(), usePutRdz(false), @@ -250,7 +257,8 @@ MyFeedView::MyFeedView(const std::shared_ptr<const DocumentTypeRepo> &dtr) move_count(0), prune_removed_count(0), update_count(0), - update_serial(0) + update_serial(0), + documentType(dtr->getDocumentType(docTypeName.getName())) {} MyFeedView::~MyFeedView() {} @@ -294,6 +302,13 @@ struct DocumentContext { } }; +struct TwoFieldsSchemaContext : public SchemaContext { + TwoFieldsSchemaContext() + : SchemaContext() + { + addField("i2"); + } +}; struct UpdateContext { DocumentUpdate::SP update; @@ -433,7 +448,7 @@ struct FeedHandlerFixture owner(), _state(), replayConfig(), - feedView(schema.getRepo()), + feedView(schema.getRepo(), schema.getDocType()), _bucketDB(), _bucketDBHandler(_bucketDB), handler(writeService, tlsSpec, schema.getDocType(), _state, owner, @@ -714,15 +729,13 @@ TEST_F("require that update with same document type repo is ok", FeedHandlerFixt TEST_F("require that update with different document type repo can be ok", FeedHandlerFixture) { - SchemaContext schema; - schema.addField("i2"); + TwoFieldsSchemaContext schema; checkUpdate(f, schema, "i1", false, true); } TEST_F("require that update with different document type repo can be rejected", FeedHandlerFixture) { - SchemaContext schema; - schema.addField("i2"); + TwoFieldsSchemaContext schema; checkUpdate(f, schema, "i2", true, true); } @@ -733,18 +746,31 @@ TEST_F("require that update with same document type repo is ok, fallback to crea TEST_F("require that update with different document type repo can be ok, fallback to create document", FeedHandlerFixture) { - SchemaContext schema; - schema.addField("i2"); + TwoFieldsSchemaContext schema; checkUpdate(f, schema, "i1", false, false); } TEST_F("require that update with different document type repo can be rejected, preventing fallback to create document", FeedHandlerFixture) { - SchemaContext schema; - schema.addField("i2"); + TwoFieldsSchemaContext schema; checkUpdate(f, schema, "i2", true, false); } +TEST_F("require that put with different document type repo is ok", FeedHandlerFixture) +{ + TwoFieldsSchemaContext schema; + DocumentContext doc_context("doc:test:foo", *schema.builder); + auto op = std::make_unique<PutOperation>(doc_context.bucketId, + Timestamp(10), doc_context.doc); + FeedTokenContext token_context; + EXPECT_EQUAL(schema.getRepo().get(), op->getDocument()->getRepo()); + EXPECT_NOT_EQUAL(f.schema.getRepo().get(), op->getDocument()->getRepo()); + EXPECT_NOT_EQUAL(f.feedView.documentType, &op->getDocument()->getType()); + f.handler.performOperation(std::move(token_context.token), std::move(op)); + EXPECT_EQUAL(1, f.feedView.put_count); + EXPECT_EQUAL(1, f.tls_writer.store_count); +} + } // namespace TEST_MAIN() diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp index a6d34b696ca..efac7297c67 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.cpp @@ -49,6 +49,15 @@ PutOperation::deserialize(vespalib::nbostream &is, _serializedDocSize = oldSize - is.size(); } +void +PutOperation::deserializeDocument(const DocumentTypeRepo &repo) +{ + vespalib::nbostream stream; + _doc->serialize(stream); + auto fixedDoc = std::make_shared<Document>(repo, stream); + _doc = std::move(fixedDoc); +} + vespalib::string PutOperation::toString() const { diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h index 7e9dee2cbc0..33330692fab 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/putoperation.h @@ -21,6 +21,7 @@ public: virtual void serialize(vespalib::nbostream &os) const override; virtual void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override; + void deserializeDocument(const document::DocumentTypeRepo &repo); virtual vespalib::string toString() const override; }; diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp index fa5bbee8e6d..beaf719dc5c 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp @@ -8,6 +8,7 @@ LOG_SETUP(".proton.feedoperation.updateoperation"); using document::BucketId; +using document::DocumentType; using document::DocumentTypeRepo; using document::DocumentUpdate; using storage::spi::Timestamp; @@ -46,12 +47,9 @@ UpdateOperation::UpdateOperation(const BucketId &bucketId, { } - void -UpdateOperation::serialize(vespalib::nbostream &os) const +UpdateOperation::serializeUpdate(vespalib::nbostream &os) const { - assertValidBucketId(_upd->getId()); - DocumentOperation::serialize(os); if (getType() == FeedOperation::UPDATE_42) { _upd->serialize42(os); } else { @@ -59,19 +57,33 @@ UpdateOperation::serialize(vespalib::nbostream &os) const } } +void +UpdateOperation::deserializeUpdate(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) +{ + document::ByteBuffer buf(is.peek(), is.size()); + using Version = DocumentUpdate::SerializeVersion; + Version version = ((getType() == FeedOperation::UPDATE_42) ? Version::SERIALIZE_42 : Version::SERIALIZE_HEAD); + DocumentUpdate::SP update(std::make_shared<DocumentUpdate>(repo, buf, version)); + is.adjustReadPos(buf.getPos()); + _upd = std::move(update); +} + +void +UpdateOperation::serialize(vespalib::nbostream &os) const +{ + assertValidBucketId(_upd->getId()); + DocumentOperation::serialize(os); + serializeUpdate(os); +} + void UpdateOperation::deserialize(vespalib::nbostream &is, const DocumentTypeRepo &repo) { DocumentOperation::deserialize(is, repo); - document::ByteBuffer buf(is.peek(), is.size()); - using Version = DocumentUpdate::SerializeVersion; - Version version = ((getType() == FeedOperation::UPDATE_42) ? Version::SERIALIZE_42 : Version::SERIALIZE_HEAD); try { - DocumentUpdate::SP update(std::make_shared<DocumentUpdate>(repo, buf, version)); - is.adjustReadPos(buf.getPos()); - _upd = update; + deserializeUpdate(is, repo); } catch (document::DocumentTypeNotFoundException &e) { LOG(warning, "Failed deserialize update operation using unknown document type '%s'", e.getDocumentTypeName().c_str()); @@ -80,6 +92,14 @@ UpdateOperation::deserialize(vespalib::nbostream &is, } } +void +UpdateOperation::deserializeUpdate(const DocumentTypeRepo &repo) +{ + vespalib::nbostream stream; + serializeUpdate(stream); + deserializeUpdate(stream, repo); +} + vespalib::string UpdateOperation::toString() const { return make_string("%s(%s, %s)", ((getType() == FeedOperation::UPDATE_42) ? "Update42" : "Update"), diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h index 6e061f79f30..7886231af82 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h @@ -3,7 +3,10 @@ #include "documentoperation.h" -namespace document { class DocumentUpdate; } +namespace document { +class DocumentTypeRepo; +class DocumentUpdate; +} namespace proton { @@ -16,6 +19,8 @@ private: const document::BucketId &bucketId, const storage::spi::Timestamp ×tamp, const DocumentUpdateSP &upd); + void serializeUpdate(vespalib::nbostream &os) const; + void deserializeUpdate(vespalib::nbostream &is, const document::DocumentTypeRepo &repo); public: UpdateOperation(); UpdateOperation(Type type); @@ -26,6 +31,7 @@ public: const DocumentUpdateSP &getUpdate() const { return _upd; } void serialize(vespalib::nbostream &os) const override; void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo) override; + void deserializeUpdate(const document::DocumentTypeRepo &repo); virtual vespalib::string toString() const override; static UpdateOperation makeOldUpdate(const document::BucketId &bucketId, const storage::spi::Timestamp ×tamp, diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index ad5372b4af6..897e58139c1 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -96,6 +96,9 @@ void FeedHandler::performPut(FeedToken token, PutOperation &op) { } return; } + if (_repo != op.getDocument()->getRepo()) { + op.deserializeDocument(*_repo); + } storeOperation(op, token); if (token) { token->setResult(make_unique<Result>(), false); @@ -344,6 +347,8 @@ FeedHandler::FeedHandler(IThreadingService &writeService, _feedLock(), _feedState(make_shared<InitState>(getDocTypeName())), _activeFeedView(nullptr), + _repo(nullptr), + _documentType(nullptr), _bucketDBHandler(nullptr), _syncLock(), _syncedSerialNum(0), @@ -408,6 +413,14 @@ void FeedHandler::changeToNormalFeedState() { changeFeedState(make_shared<NormalState>(*this)); } +void +FeedHandler::setActiveFeedView(IFeedView *feedView) +{ + _activeFeedView = feedView; + _repo = feedView->getDocumentTypeRepo().get(); + _documentType = _repo->getDocumentType(_docTypeName.getName()); +} + bool FeedHandler::isDoingReplay() const { return _tlsMgr.isDoingReplay(); @@ -492,22 +505,17 @@ FeedHandler::considerWriteOperationForRejection(FeedToken & token, const FeedOpe } bool -FeedHandler::considerUpdateOperationForRejection(FeedToken &token, const UpdateOperation &op) +FeedHandler::considerUpdateOperationForRejection(FeedToken &token, UpdateOperation &op) { - const auto *repo = _activeFeedView->getDocumentTypeRepo().get(); const auto &update = *op.getUpdate(); /* * Check if document types are equal. DocumentTypeRepoFactory::make returns * the same document type repo if document type configs are equal, thus we * can just perform a cheaper identity check here. */ - if (repo->getDocumentType(_docTypeName.getName()) != &update.getType()) { + if (_documentType != &update.getType()) { try { - vespalib::nbostream stream; - op.serialize(stream); - UpdateOperation checkOp(op.getType()); - vespalib::nbostream checkStream(stream.peek(), stream.size()); - checkOp.deserialize(stream, *repo); + op.deserializeUpdate(*_repo); } catch (document::FieldNotFoundException &e) { if (token) { auto message = make_string("Update operation rejected for document '%s' of type '%s': 'Field not found'", @@ -516,6 +524,14 @@ FeedHandler::considerUpdateOperationForRejection(FeedToken &token, const UpdateO token->fail(); } return true; + } catch (document::DocumentTypeNotFoundException &e) { + auto message = make_string("Update operation rejected for document '%s' of type '%s': 'Uknown document type', expected '%s'", + update.getId().toString().c_str(), + e.getDocumentTypeName().c_str(), + _docTypeName.toString().c_str()); + token->setResult(make_unique<UpdateResult>(Result::TRANSIENT_ERROR, message), false); + token->fail(); + return true; } } return false; diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h index faf909080d9..34f979b7115 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.h +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.h @@ -90,6 +90,8 @@ private: FeedStateSP _feedState; // used by master write thread tasks IFeedView *_activeFeedView; + const document::DocumentTypeRepo *_repo; + const document::DocumentType *_documentType; bucketdb::IBucketDBHandler *_bucketDBHandler; std::mutex _syncLock; SerialNum _syncedSerialNum; @@ -102,7 +104,7 @@ private: void doHandleOperation(FeedToken token, FeedOperationUP op); bool considerWriteOperationForRejection(FeedToken & token, const FeedOperation &op); - bool considerUpdateOperationForRejection(FeedToken &token, const UpdateOperation &op); + bool considerUpdateOperationForRejection(FeedToken &token, UpdateOperation &op); /** * Delayed execution of feed operations against feed view, in @@ -203,9 +205,7 @@ public: * Update the active feed view. * Always called by the master write thread so locking is not needed. */ - void setActiveFeedView(IFeedView *feedView) { - _activeFeedView = feedView; - } + void setActiveFeedView(IFeedView *feedView); void setBucketDBHandler(bucketdb::IBucketDBHandler *bucketDBHandler) { _bucketDBHandler = bucketDBHandler; |