diff options
author | Geir Storli <geirstorli@yahoo.no> | 2017-02-28 15:54:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-02-28 15:54:03 +0100 |
commit | fcab5ce0fc0a12bb3ae860bd915c67bede4bfeb6 (patch) | |
tree | 765f289573cd37bc555d1bb700a8b5551ec78a13 /searchcore | |
parent | fa8d5cd55cbb7b275360376c70591c208241ce55 (diff) | |
parent | 58b12f743d63df3f36373193dc5319cba4cf06d7 (diff) |
Merge pull request #1885 from yahoo/toregge/add-limited-support-for-field-path-updates-to-proton
Add limited support for field path updates to proton.
Diffstat (limited to 'searchcore')
10 files changed, 167 insertions, 37 deletions
diff --git a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp index 804927a0e03..a49127a44f4 100644 --- a/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp +++ b/searchcore/src/tests/proton/feedoperation/feedoperation_test.cpp @@ -10,6 +10,10 @@ LOG_SETUP("feedoperation_test"); #include <vespa/document/datatype/datatype.h> #include <vespa/document/fieldvalue/document.h> #include <vespa/document/update/documentupdate.h> +#include <vespa/document/update/assignvalueupdate.h> +#include <vespa/document/update/fieldupdate.h> +#include <vespa/document/update/valueupdate.h> +#include <vespa/document/fieldvalue/fieldvalues.h> #include <persistence/spi/types.h> #include <vespa/searchcore/proton/feedoperation/compact_lid_space_operation.h> #include <vespa/searchcore/proton/feedoperation/deletebucketoperation.h> @@ -25,13 +29,24 @@ LOG_SETUP("feedoperation_test"); #include <vespa/searchcore/proton/feedoperation/updateoperation.h> #include <vespa/searchcore/proton/feedoperation/wipehistoryoperation.h> #include <vespa/searchlib/query/base.h> +#include <vespa/document/repo/configbuilder.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/vespalib/testkit/testapp.h> using document::BucketId; using document::DataType; using document::Document; +using document::DocumentType; using document::DocumentId; using document::DocumentUpdate; +using document::DocumentTypeRepo; +using document::GlobalId; +using document::StringFieldValue; +using document::AssignValueUpdate; +using document::FieldUpdate; +using document::config_builder::DocumenttypesConfigBuilderHelper; +using document::config_builder::Struct; +using document::config_builder::Map; using search::DocumentIdT; using storage::spi::Timestamp; using namespace proton; @@ -44,6 +59,58 @@ struct MyStreamHandler : NewConfigOperation::IStreamHandler { virtual void deserializeConfig(SerialNum, vespalib::nbostream &) {} }; + +const int32_t doc_type_id = 787121340; +const vespalib::string type_name = "test"; +const vespalib::string header_name = type_name + ".header"; +const vespalib::string body_name = type_name + ".body"; + +const document::DocumentId docId("id::test::1"); + +BucketId toBucket(const GlobalId &gid) +{ + BucketId bucket(gid.convertToBucketId()); + bucket.setUsedBits(8); + return bucket; +} + +DocumentTypeRepo::UP +makeDocTypeRepo(void) +{ + DocumenttypesConfigBuilderHelper builder; + builder.document(doc_type_id, type_name, + Struct(header_name), Struct(body_name). + addField("string", DataType::T_STRING). + addField("struct", Struct("pair"). + addField("x", DataType::T_STRING). + addField("y", DataType::T_STRING)). + addField("map", Map(DataType::T_STRING, + DataType::T_STRING))); + return DocumentTypeRepo::UP(new DocumentTypeRepo(builder.config())); +} + + +struct Fixture +{ + DocumentTypeRepo::SP _repo; + const DocumentType &_docType; + +public: + Fixture() + : _repo(makeDocTypeRepo()), + _docType(*_repo->getDocumentType(type_name)) + { + } + + auto makeUpdate() { + auto upd(std::make_shared<DocumentUpdate>(_docType, docId)); + upd->addUpdate(FieldUpdate(upd->getType().getField("string")). + addUpdate(AssignValueUpdate(StringFieldValue("newval")))); + return upd; + } + +}; + TEST("require that toString() on derived classes are meaningful") { BucketId bucket_id1(42); @@ -167,6 +234,42 @@ TEST("require that serialize/deserialize works for CompactLidSpaceOperation") } } +TEST_F("require that we can serialize and deserialize update operations", Fixture) +{ + vespalib::nbostream stream; + BucketId bucket(toBucket(docId.getGlobalId())); + auto upd(f.makeUpdate()); + { + UpdateOperation op(bucket, Timestamp(10), upd); + op.serialize(stream); + } + { + UpdateOperation op; + op.deserialize(stream, *f._repo); + EXPECT_EQUAL(*upd, *op.getUpdate()); + EXPECT_EQUAL(bucket, op.getBucketId()); + EXPECT_EQUAL(10, op.getTimestamp().getValue()); + } +} + +TEST_F("require that we can deserialize old update operations", Fixture) +{ + vespalib::nbostream stream; + BucketId bucket(toBucket(docId.getGlobalId())); + auto upd(f.makeUpdate()); + { + UpdateOperation op(UpdateOperation::makeOldUpdate(bucket, Timestamp(10), upd)); + op.serialize(stream); + } + { + UpdateOperation op(FeedOperation::UPDATE_42); + op.deserialize(stream, *f._repo); + EXPECT_EQUAL(*upd, *op.getUpdate()); + EXPECT_EQUAL(bucket, op.getBucketId()); + EXPECT_EQUAL(10, op.getTimestamp().getValue()); + } +} + } // namespace TEST_MAIN() { TEST_RUN_ALL(); } diff --git a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp index f1aefced17a..9d0a63677e0 100644 --- a/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp +++ b/searchcore/src/vespa/searchcore/proton/common/feedtoken.cpp @@ -61,6 +61,7 @@ FeedToken::State::ack(const FeedOperation::Type opType, case FeedOperation::REMOVE_BATCH: metrics.RegisterRemove(_startTime); break; + case FeedOperation::UPDATE_42: case FeedOperation::UPDATE: metrics.RegisterUpdate(_startTime); break; diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/feedoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/feedoperation.h index c477ed0e5e1..16939f5f165 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/feedoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/feedoperation.h @@ -14,11 +14,17 @@ public: typedef std::shared_ptr<FeedOperation> SP; typedef std::unique_ptr<FeedOperation> UP; + /* + * Enumeration of feed operations. UPDATE_42 is partial update + * without support for field path updates (kept to support replay + * of old transaction logs). UPDATE is partial update with + * support for field paths updates. + */ enum Type { PUT = 1, REMOVE = 2, REMOVE_BATCH = 3, - UPDATE = 4, + UPDATE_42 = 4, NOOP = 5, NEW_CONFIG = 6, WIPE_HISTORY = 7, @@ -30,7 +36,8 @@ public: SPOOLER_REPLAY_COMPLETE = 14, MOVE = 15, CREATE_BUCKET = 16, - COMPACT_LID_SPACE = 17 + COMPACT_LID_SPACE = 17, + UPDATE = 18 }; private: diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp index 9e2b024806f..e69b64548bc 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.cpp @@ -16,16 +16,22 @@ using vespalib::make_string; namespace proton { UpdateOperation::UpdateOperation() - : DocumentOperation(FeedOperation::UPDATE), + : UpdateOperation(FeedOperation::UPDATE) +{ +} + +UpdateOperation::UpdateOperation(Type type) + : DocumentOperation(type), _upd() { } -UpdateOperation::UpdateOperation(const BucketId &bucketId, +UpdateOperation::UpdateOperation(Type type, + const BucketId &bucketId, const Timestamp ×tamp, const DocumentUpdate::SP &upd) - : DocumentOperation(FeedOperation::UPDATE, + : DocumentOperation(type, bucketId, timestamp), _upd(upd) @@ -33,19 +39,10 @@ UpdateOperation::UpdateOperation(const BucketId &bucketId, } -UpdateOperation::UpdateOperation(const document::BucketId &bucketId, - const storage::spi::Timestamp ×tamp, - const document::DocumentUpdate::SP &upd, - SerialNum serialNum, - DbDocumentId dbdId, - DbDocumentId prevDbdId) - : DocumentOperation(FeedOperation::UPDATE, - bucketId, - timestamp, - serialNum, - dbdId, - prevDbdId), - _upd(upd) +UpdateOperation::UpdateOperation(const BucketId &bucketId, + const Timestamp ×tamp, + const DocumentUpdate::SP &upd) + : UpdateOperation(FeedOperation::UPDATE, bucketId, timestamp, upd) { } @@ -55,7 +52,11 @@ UpdateOperation::serialize(vespalib::nbostream &os) const { assertValidBucketId(_upd->getId()); DocumentOperation::serialize(os); - _upd->serialize42(os); + if (getType() == FeedOperation::UPDATE_42) { + _upd->serialize42(os); + } else { + _upd->serializeHEAD(os); + } } @@ -65,11 +66,10 @@ UpdateOperation::deserialize(vespalib::nbostream &is, { DocumentOperation::deserialize(is, repo); document::ByteBuffer buf(is.peek(), is.size()); + using Version = DocumentUpdate::SerializeVersion; + Version version = ((getType() == FeedOperation::UPDATE_42) ? Version::SERIALIZE_42 : Version::SERIALIZE_HEAD); try { - DocumentUpdate::SP update(new DocumentUpdate(repo, buf, - DocumentUpdate:: - SerializeVersion:: - SERIALIZE_42)); + DocumentUpdate::SP update(std::make_shared<DocumentUpdate>(repo, buf, version)); is.adjustReadPos(buf.getPos()); _upd = update; } catch (document::DocumentTypeNotFoundException &e) { @@ -81,9 +81,19 @@ UpdateOperation::deserialize(vespalib::nbostream &is, } vespalib::string UpdateOperation::toString() const { - return make_string("Update(%s, %s)", + return make_string("%s(%s, %s)", + ((getType() == FeedOperation::UPDATE_42) ? "Update42" : "Update"), _upd.get() ? _upd->getId().getScheme().toString().c_str() : "NULL", docArgsToString().c_str()); } + +UpdateOperation +UpdateOperation::makeOldUpdate(const document::BucketId &bucketId, + const storage::spi::Timestamp ×tamp, + const document::DocumentUpdate::SP &upd) +{ + return UpdateOperation(FeedOperation::UPDATE_42, bucketId, timestamp, upd); +} + } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h index c5ae9960b39..32e4bff7d9e 100644 --- a/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h +++ b/searchcore/src/vespa/searchcore/proton/feedoperation/updateoperation.h @@ -10,23 +10,25 @@ class UpdateOperation : public DocumentOperation { private: document::DocumentUpdate::SP _upd; + UpdateOperation(Type type, + const document::BucketId &bucketId, + const storage::spi::Timestamp ×tamp, + const document::DocumentUpdate::SP &upd); public: UpdateOperation(); + UpdateOperation(Type type); UpdateOperation(const document::BucketId &bucketId, const storage::spi::Timestamp ×tamp, const document::DocumentUpdate::SP &upd); - UpdateOperation(const document::BucketId &bucketId, - const storage::spi::Timestamp ×tamp, - const document::DocumentUpdate::SP &upd, - SerialNum serialNum, - DbDocumentId dbdId, - DbDocumentId prevDbdId); virtual ~UpdateOperation() {} const document::DocumentUpdate::SP &getUpdate() const { return _upd; } virtual void serialize(vespalib::nbostream &os) const; virtual void deserialize(vespalib::nbostream &is, const document::DocumentTypeRepo &repo); virtual vespalib::string toString() const; + static UpdateOperation makeOldUpdate(const document::BucketId &bucketId, + const storage::spi::Timestamp ×tamp, + const document::DocumentUpdate::SP &upd); }; } // namespace proton diff --git a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp index 798e70dad8e..6cad66a094f 100644 --- a/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/fast_access_feed_view.cpp @@ -24,6 +24,9 @@ FastAccessFeedView::getUpdateScope(const DocumentUpdate &upd) break; } } + if (!upd.getFieldPathUpdates().empty()) { + updateScope._nonAttributeFields = true; + } return updateScope; } diff --git a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp index c485e1a1308..4a96c25d948 100644 --- a/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/feedhandler.cpp @@ -142,7 +142,7 @@ void notifyConfigRejected(FeedToken *token, FeedOperation::Type type, DocTypeName docTypeName) { if (type == FeedOperation::REMOVE) { configRejected<RemoveResult>(token, docTypeName); - } else if (type == FeedOperation::UPDATE) { + } else if ((type == FeedOperation::UPDATE_42) || (type == FeedOperation::UPDATE)) { configRejected<UpdateResult>(token, docTypeName); } else { configRejected<Result>(token, docTypeName); @@ -665,7 +665,7 @@ namespace { bool isRejectableFeedOperation(FeedOperation::Type type) { - return type == FeedOperation::PUT || type == FeedOperation::UPDATE; + return type == FeedOperation::PUT || type == FeedOperation::UPDATE_42 || type == FeedOperation::UPDATE; } template <typename ResultType> @@ -684,7 +684,7 @@ void notifyFeedOperationRejected(FeedToken *token, const FeedOperation &op, DocTypeName docTypeName, const vespalib::string &rejectMessage) { - if (op.getType() == FeedOperation::UPDATE) { + if ((op.getType() == FeedOperation::UPDATE_42) || (op.getType() == FeedOperation::UPDATE)) { vespalib::string docId = (static_cast<const UpdateOperation &>(op)).getUpdate()->getId().toString(); feedOperationRejected<UpdateResult>(token, "Update", docId, docTypeName, rejectMessage); } else if (op.getType() == FeedOperation::PUT) { @@ -727,6 +727,7 @@ FeedHandler::performOperation(FeedToken::UP token, FeedOperation::UP op) case FeedOperation::REMOVE: performRemove(std::move(token), static_cast<RemoveOperation &>(*op)); return; + case FeedOperation::UPDATE_42: case FeedOperation::UPDATE: performUpdate(std::move(token), static_cast<UpdateOperation &>(*op)); return; diff --git a/searchcore/src/vespa/searchcore/proton/server/replaypacketdispatcher.cpp b/searchcore/src/vespa/searchcore/proton/server/replaypacketdispatcher.cpp index 7d44c504b50..3a0dd48f367 100644 --- a/searchcore/src/vespa/searchcore/proton/server/replaypacketdispatcher.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/replaypacketdispatcher.cpp @@ -39,8 +39,9 @@ ReplayPacketDispatcher::replayEntry(const Packet::Entry &entry) RemoveOperation op; replay(op, is, entry); break; - } case FeedOperation::UPDATE: { - UpdateOperation op; + } case FeedOperation::UPDATE_42: + case FeedOperation::UPDATE: { + UpdateOperation op(static_cast<FeedOperation::Type>(entry.type())); replay(op, is, entry); break; } case FeedOperation::NOOP: { diff --git a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp index 2b46abbe1ee..e1e7bd3a024 100644 --- a/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/searchable_feed_view.cpp @@ -176,6 +176,9 @@ SearchableFeedView::getUpdateScope(const DocumentUpdate &upd) updateScope._nonAttributeFields = true; } } + if (!upd.getFieldPathUpdates().empty()) { + updateScope._nonAttributeFields = true; + } return updateScope; } diff --git a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp index f96b13fc51c..d7dd6238d20 100644 --- a/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp +++ b/searchcore/src/vespa/searchcore/proton/server/storeonlyfeedview.cpp @@ -267,8 +267,7 @@ StoreOnlyFeedView::UpdateScope StoreOnlyFeedView::getUpdateScope(const DocumentUpdate &upd) { UpdateScope updateScope; - size_t m(upd.getUpdates().size()); - if (m != 0) { + if (!upd.getUpdates().empty() || !upd.getFieldPathUpdates().empty()) { updateScope._nonAttributeFields = true; } return updateScope; |