From 2f960fa96fc8b09da846b3fc0e7cc80bacf76746 Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Thu, 25 Apr 2024 12:01:53 +0000 Subject: Propagate "create if missing"-flag outside binary Update payload in protocols Avoids potentially having to deserialize the entire update just to get to a single bit of information that is technically metadata existing orthogonally to the document update itself. To ensure backwards/forwards compatibility, the flag is propagated as a Protobuf `enum` where the default value is a special "unspecified" sentinel, implying an old sender. Since the Java protocol implementation always eagerly deserializes messages, it unconditionally assigns the `create_if_missing` field when sending and completely ignores it when receiving. The C++ protocol implementation observes and propagates the field iff set. Otherwise the flag is deferred to the update object as before. This applies to both the DocumentAPI and StorageAPI protocols. --- .../src/tests/distributor/updateoperationtest.cpp | 23 +++++++- .../storageapi/mbusprot/storageprotocoltest.cpp | 34 +++++++++++- .../storageserver/documentapiconvertertest.cpp | 62 ++++++++++++++-------- .../external/twophaseupdateoperation.cpp | 2 +- .../operations/external/updateoperation.cpp | 5 +- .../storage/storageserver/documentapiconverter.cpp | 6 +++ .../vespa/storageapi/mbusprot/protobuf/feed.proto | 17 ++++-- .../storageapi/mbusprot/protocolserialization7.cpp | 7 +++ .../src/vespa/storageapi/message/persistence.cpp | 12 ++++- storage/src/vespa/storageapi/message/persistence.h | 21 ++++++-- 10 files changed, 150 insertions(+), 39 deletions(-) (limited to 'storage') diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 31ebbe19cbb..e00ce249298 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -49,13 +49,13 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil { const api::ReturnCode& result = api::ReturnCode()); std::shared_ptr - sendUpdate(const std::string& bucketState, bool create_if_missing = false); + sendUpdate(const std::string& bucketState, bool create_if_missing = false, bool cache_create_flag = false); document::BucketId _bId; }; std::shared_ptr -UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing) +UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing, bool cache_create_flag) { auto update = std::make_shared( *_repo, *_html_type, @@ -67,6 +67,9 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m addNodesToBucketDB(_bId, bucketState); auto msg = std::make_shared(makeDocumentBucket(document::BucketId(0)), update, 100); + if (cache_create_flag) { + msg->set_cached_create_if_missing(create_if_missing); + } return std::make_shared( node_context(), operation_context(), getDistributorBucketSpace(), msg, std::vector(), @@ -271,4 +274,20 @@ TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) { dumpBucket(_bId)); } +TEST_F(UpdateOperationTest, cached_create_if_missing_is_propagated_to_fanout_requests) { + setup_stripe(1, 1, "distributor:1 storage:1"); + for (bool cache_flag : {false, true}) { + for (bool create_if_missing : {false, true}) { + std::shared_ptr cb(sendUpdate("0=1/2/3", create_if_missing, cache_flag)); + DistributorMessageSenderStub sender; + cb->start(sender); + + ASSERT_EQ("Update => 0", sender.getCommands(true)); + auto& cmd = dynamic_cast(*sender.command(0)); + EXPECT_EQ(cmd.has_cached_create_if_missing(), cache_flag); + EXPECT_EQ(cmd.create_if_missing(), create_if_missing); + } + } +} + } diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index 698d8dee573..231f41ffd21 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -262,7 +262,6 @@ TEST_P(StorageProtocolTest, response_metadata_is_propagated) { TEST_P(StorageProtocolTest, update) { auto update = std::make_shared(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(std::make_unique(std::make_unique(17)))); - update->addFieldPathUpdate(std::make_unique("headerval", "testdoctype1.headerval > 0")); auto cmd = std::make_shared(_bucket, update, 14); @@ -284,6 +283,37 @@ TEST_P(StorageProtocolTest, update) { EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } +TEST_P(StorageProtocolTest, update_request_create_if_missing_flag_is_propagated) { + auto make_update_cmd = [&](bool create_if_missing, bool cached) { + auto update = std::make_shared( + _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); + update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate( + std::make_unique(std::make_unique(17)))); + update->addFieldPathUpdate(std::make_unique("headerval", "testdoctype1.headerval > 0")); + update->setCreateIfNonExistent(create_if_missing); + auto cmd = std::make_shared(_bucket, update, 14); + if (cached) { + cmd->set_cached_create_if_missing(create_if_missing); + } + return cmd; + }; + + auto check_flag_propagation = [&](bool create_if_missing, bool cached) { + auto cmd = make_update_cmd(create_if_missing, cached); + EXPECT_EQ(cmd->has_cached_create_if_missing(), cached); + EXPECT_EQ(cmd->create_if_missing(), create_if_missing); + + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->has_cached_create_if_missing(), cached); + EXPECT_EQ(cmd2->create_if_missing(), create_if_missing); + }; + + check_flag_propagation(false, false); + check_flag_propagation(true, false); + check_flag_propagation(false, true); + check_flag_propagation(true, true); +} + TEST_P(StorageProtocolTest, get) { auto cmd = std::make_shared(_bucket, _testDocId, "foo,bar,vekterli", 123); auto cmd2 = copyCommand(cmd); @@ -880,7 +910,7 @@ TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) { EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand)); EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string)); EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 40); - EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32); + EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 40); EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112); EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184); } diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index eb4789b25d4..1eb6bf5dd9a 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -159,28 +159,46 @@ TEST_F(DocumentApiConverterTest, forwarded_put) { } TEST_F(DocumentApiConverterTest, update) { - auto update = std::make_shared(*_repo, _html_type, defaultDocId); - documentapi::UpdateDocumentMessage updateMsg(update); - updateMsg.setOldTimestamp(1234); - updateMsg.setNewTimestamp(5678); - updateMsg.setCondition(my_condition); - - auto updateCmd = toStorageAPI(updateMsg); - EXPECT_EQ(defaultBucket, updateCmd->getBucket()); - ASSERT_EQ(update.get(), updateCmd->getUpdate().get()); - EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp()); - EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp()); - EXPECT_EQ(my_condition, updateCmd->getCondition()); - - auto mbusReply = updateMsg.createReply(); - ASSERT_TRUE(mbusReply.get()); - toStorageAPI(*mbusReply, *updateCmd); - - auto mbusUpdate = toDocumentAPI(*updateCmd); - ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get()); - EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp()); - EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp()); - EXPECT_EQ(my_condition, mbusUpdate->getCondition()); + auto do_test_update = [&](bool create_if_missing) { + auto update = std::make_shared(*_repo, _html_type, defaultDocId); + update->setCreateIfNonExistent(create_if_missing); + documentapi::UpdateDocumentMessage updateMsg(update); + updateMsg.setOldTimestamp(1234); + updateMsg.setNewTimestamp(5678); + updateMsg.setCondition(my_condition); + EXPECT_FALSE(updateMsg.has_cached_create_if_missing()); + EXPECT_EQ(updateMsg.create_if_missing(), create_if_missing); + + auto updateCmd = toStorageAPI(updateMsg); + EXPECT_EQ(defaultBucket, updateCmd->getBucket()); + ASSERT_EQ(update.get(), updateCmd->getUpdate().get()); + EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp()); + EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp()); + EXPECT_EQ(my_condition, updateCmd->getCondition()); + EXPECT_FALSE(updateCmd->has_cached_create_if_missing()); + EXPECT_EQ(updateCmd->create_if_missing(), create_if_missing); + + auto mbusReply = updateMsg.createReply(); + ASSERT_TRUE(mbusReply.get()); + toStorageAPI(*mbusReply, *updateCmd); + + auto mbusUpdate = toDocumentAPI(*updateCmd); + ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get()); + EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp()); + EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp()); + EXPECT_EQ(my_condition, mbusUpdate->getCondition()); + EXPECT_EQ(mbusUpdate->create_if_missing(), create_if_missing); + + // Cached value of create_if_missing should override underlying update's value + updateCmd->set_cached_create_if_missing(!create_if_missing); + EXPECT_TRUE(updateCmd->has_cached_create_if_missing()); + EXPECT_EQ(updateCmd->create_if_missing(), !create_if_missing); + mbusUpdate = toDocumentAPI(*updateCmd); + EXPECT_TRUE(mbusUpdate->has_cached_create_if_missing()); + EXPECT_EQ(mbusUpdate->create_if_missing(), !create_if_missing); + }; + do_test_update(false); + do_test_update(true); } TEST_F(DocumentApiConverterTest, remove) { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 84e9ab71bcb..849746416d6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const bool TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const { - return _updateCmd->getUpdate()->getCreateIfNonExistent(); + return _updateCmd->create_if_missing(); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 7b6833cc299..2b47d53363f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx, _msg(msg), _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), - _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), + _is_auto_create_update(_msg->create_if_missing()), _node_ctx(node_ctx), _op_ctx(op_ctx), _bucketSpace(bucketSpace), @@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); + if (_msg->has_cached_create_if_missing()) { + command->set_cached_create_if_missing(_msg->create_if_missing()); + } messages.emplace_back(std::move(command), node); } diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ca46e87285b..5b8052a05f8 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) auto to = std::make_unique(bucket, from.stealDocumentUpdate(), from.getNewTimestamp()); to->setOldTimestamp(from.getOldTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } @@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg) to->setOldTimestamp(from.getOldTimestamp()); to->setNewTimestamp(from.getTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto index 55d516a017b..403752b0c84 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -31,11 +31,18 @@ message Update { } message UpdateRequest { - Bucket bucket = 1; - Update update = 2; - uint64 new_timestamp = 3; - uint64 expected_old_timestamp = 4; // If zero; no expectation. - TestAndSetCondition condition = 5; + enum CreateIfMissing { + UNSPECIFIED = 0; // Legacy fallback: must deserialize `update` to find flag value + TRUE = 1; + FALSE = 2; + } + + Bucket bucket = 1; + Update update = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; + CreateIfMissing create_if_missing = 6; } message UpdateResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 57047be6037..0f4a34cc775 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -465,6 +465,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } + if (msg.has_cached_create_if_missing()) { + req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE + : protobuf::UpdateRequest_CreateIfMissing_FALSE); + } }); } @@ -482,6 +486,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) if (req.has_condition()) { cmd->setCondition(get_tas_condition(req.condition())); } + if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) { + cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE); + } return cmd; }); } diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp index 4c24bb74faf..af054855bbe 100644 --- a/storage/src/vespa/storageapi/message/persistence.cpp +++ b/storage/src/vespa/storageapi/message/persistence.cpp @@ -105,13 +105,23 @@ UpdateCommand::UpdateCommand(const document::Bucket &bucket, const document::Doc : TestAndSetCommand(MessageType::UPDATE, bucket), _update(update), _timestamp(time), - _oldTimestamp(0) + _oldTimestamp(0), + _create_if_missing() { if ( ! _update) { throw vespalib::IllegalArgumentException("Cannot update a null update", VESPA_STRLOC); } } +bool +UpdateCommand::create_if_missing() const +{ + if (_create_if_missing.has_value()) { + return *_create_if_missing; + } + return _update->getCreateIfNonExistent(); +} + const document::DocumentType * UpdateCommand::getDocumentType() const { return &_update->getType(); diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h index f44ab4e8280..0676e1d0f44 100644 --- a/storage/src/vespa/storageapi/message/persistence.h +++ b/storage/src/vespa/storageapi/message/persistence.h @@ -1,8 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @file persistence.h - * - * Persistence related commands, like put, get & remove + * Persistence related commands, like put, get & remove */ #pragma once @@ -10,6 +8,7 @@ #include #include #include +#include namespace document { class DocumentUpdate; @@ -117,20 +116,32 @@ class UpdateCommand : public TestAndSetCommand { std::shared_ptr _update; Timestamp _timestamp; Timestamp _oldTimestamp; + std::optional _create_if_missing; // caches the value held (possibly lazily deserialized) in _update public: UpdateCommand(const document::Bucket &bucket, const std::shared_ptr&, Timestamp); ~UpdateCommand() override; - void setTimestamp(Timestamp ts) { _timestamp = ts; } - void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; } + void setTimestamp(Timestamp ts) noexcept { _timestamp = ts; } + void setOldTimestamp(Timestamp ts) noexcept { _oldTimestamp = ts; } + + [[nodiscard]] bool has_cached_create_if_missing() const noexcept { + return _create_if_missing.has_value(); + } + // It is the caller's responsibility to ensure this value matches that of _update->getCreateIfNonExisting() + void set_cached_create_if_missing(bool create) noexcept { + _create_if_missing = create; + } const std::shared_ptr& getUpdate() const { return _update; } const document::DocumentId& getDocumentId() const override; Timestamp getTimestamp() const { return _timestamp; } Timestamp getOldTimestamp() const { return _oldTimestamp; } + // May throw iff has_cached_create_if_missing() == false, otherwise noexcept. + [[nodiscard]] bool create_if_missing() const; + const document::DocumentType * getDocumentType() const override; vespalib::string getSummary() const override; -- cgit v1.2.3