diff options
author | Tor Brede Vekterli <vekterli@vespa.ai> | 2024-04-25 12:01:53 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@vespa.ai> | 2024-04-26 12:19:50 +0000 |
commit | 2f960fa96fc8b09da846b3fc0e7cc80bacf76746 (patch) | |
tree | 94e1044dc3ad6022840f4382550022c2e0404e46 /storage | |
parent | e045e41b93f74f05bbcfc5be244ed6901fea8b68 (diff) |
Propagate "create if missing"-flag outside binary Update payload in protocols
Avoids potentially having to deserialize the entire update just
to get to a single bit of information that is technically
metadata existing orthogonally to the document update itself.
To ensure backwards/forwards compatibility, the flag is
propagated as a Protobuf `enum` where the default value is
a special "unspecified" sentinel, implying an old sender.
Since the Java protocol implementation always eagerly
deserializes messages, it unconditionally assigns the
`create_if_missing` field when sending and completely ignores
it when receiving.
The C++ protocol implementation observes and propagates the
field iff set. Otherwise the flag is deferred to the update
object as before. This applies to both the DocumentAPI and
StorageAPI protocols.
Diffstat (limited to 'storage')
10 files changed, 150 insertions, 39 deletions
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index 31ebbe19cbb..e00ce249298 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -49,13 +49,13 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil { const api::ReturnCode& result = api::ReturnCode()); std::shared_ptr<UpdateOperation> - sendUpdate(const std::string& bucketState, bool create_if_missing = false); + sendUpdate(const std::string& bucketState, bool create_if_missing = false, bool cache_create_flag = false); document::BucketId _bId; }; std::shared_ptr<UpdateOperation> -UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing) +UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing, bool cache_create_flag) { auto update = std::make_shared<document::DocumentUpdate>( *_repo, *_html_type, @@ -67,6 +67,9 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m addNodesToBucketDB(_bId, bucketState); auto msg = std::make_shared<api::UpdateCommand>(makeDocumentBucket(document::BucketId(0)), update, 100); + if (cache_create_flag) { + msg->set_cached_create_if_missing(create_if_missing); + } return std::make_shared<UpdateOperation>( node_context(), operation_context(), getDistributorBucketSpace(), msg, std::vector<BucketDatabase::Entry>(), @@ -271,4 +274,20 @@ TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) { dumpBucket(_bId)); } +TEST_F(UpdateOperationTest, cached_create_if_missing_is_propagated_to_fanout_requests) { + setup_stripe(1, 1, "distributor:1 storage:1"); + for (bool cache_flag : {false, true}) { + for (bool create_if_missing : {false, true}) { + std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3", create_if_missing, cache_flag)); + DistributorMessageSenderStub sender; + cb->start(sender); + + ASSERT_EQ("Update => 0", sender.getCommands(true)); + auto& cmd = dynamic_cast<api::UpdateCommand&>(*sender.command(0)); + EXPECT_EQ(cmd.has_cached_create_if_missing(), cache_flag); + EXPECT_EQ(cmd.create_if_missing(), create_if_missing); + } + } +} + } diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index 698d8dee573..231f41ffd21 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -262,7 +262,6 @@ TEST_P(StorageProtocolTest, response_metadata_is_propagated) { TEST_P(StorageProtocolTest, update) { auto update = std::make_shared<document::DocumentUpdate>(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17)))); - update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0")); auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14); @@ -284,6 +283,37 @@ TEST_P(StorageProtocolTest, update) { EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } +TEST_P(StorageProtocolTest, update_request_create_if_missing_flag_is_propagated) { + auto make_update_cmd = [&](bool create_if_missing, bool cached) { + auto update = std::make_shared<document::DocumentUpdate>( + _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId()); + update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate( + std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17)))); + update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0")); + update->setCreateIfNonExistent(create_if_missing); + auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14); + if (cached) { + cmd->set_cached_create_if_missing(create_if_missing); + } + return cmd; + }; + + auto check_flag_propagation = [&](bool create_if_missing, bool cached) { + auto cmd = make_update_cmd(create_if_missing, cached); + EXPECT_EQ(cmd->has_cached_create_if_missing(), cached); + EXPECT_EQ(cmd->create_if_missing(), create_if_missing); + + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd2->has_cached_create_if_missing(), cached); + EXPECT_EQ(cmd2->create_if_missing(), create_if_missing); + }; + + check_flag_propagation(false, false); + check_flag_propagation(true, false); + check_flag_propagation(false, true); + check_flag_propagation(true, true); +} + TEST_P(StorageProtocolTest, get) { auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); auto cmd2 = copyCommand(cmd); @@ -880,7 +910,7 @@ TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) { EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand)); EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string)); EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 40); - EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32); + EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 40); EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112); EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184); } diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp index eb4789b25d4..1eb6bf5dd9a 100644 --- a/storage/src/tests/storageserver/documentapiconvertertest.cpp +++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp @@ -159,28 +159,46 @@ TEST_F(DocumentApiConverterTest, forwarded_put) { } TEST_F(DocumentApiConverterTest, update) { - auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId); - documentapi::UpdateDocumentMessage updateMsg(update); - updateMsg.setOldTimestamp(1234); - updateMsg.setNewTimestamp(5678); - updateMsg.setCondition(my_condition); - - auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg); - EXPECT_EQ(defaultBucket, updateCmd->getBucket()); - ASSERT_EQ(update.get(), updateCmd->getUpdate().get()); - EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp()); - EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp()); - EXPECT_EQ(my_condition, updateCmd->getCondition()); - - auto mbusReply = updateMsg.createReply(); - ASSERT_TRUE(mbusReply.get()); - toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd); - - auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd); - ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get()); - EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp()); - EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp()); - EXPECT_EQ(my_condition, mbusUpdate->getCondition()); + auto do_test_update = [&](bool create_if_missing) { + auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId); + update->setCreateIfNonExistent(create_if_missing); + documentapi::UpdateDocumentMessage updateMsg(update); + updateMsg.setOldTimestamp(1234); + updateMsg.setNewTimestamp(5678); + updateMsg.setCondition(my_condition); + EXPECT_FALSE(updateMsg.has_cached_create_if_missing()); + EXPECT_EQ(updateMsg.create_if_missing(), create_if_missing); + + auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg); + EXPECT_EQ(defaultBucket, updateCmd->getBucket()); + ASSERT_EQ(update.get(), updateCmd->getUpdate().get()); + EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp()); + EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp()); + EXPECT_EQ(my_condition, updateCmd->getCondition()); + EXPECT_FALSE(updateCmd->has_cached_create_if_missing()); + EXPECT_EQ(updateCmd->create_if_missing(), create_if_missing); + + auto mbusReply = updateMsg.createReply(); + ASSERT_TRUE(mbusReply.get()); + toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd); + + auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd); + ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get()); + EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp()); + EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp()); + EXPECT_EQ(my_condition, mbusUpdate->getCondition()); + EXPECT_EQ(mbusUpdate->create_if_missing(), create_if_missing); + + // Cached value of create_if_missing should override underlying update's value + updateCmd->set_cached_create_if_missing(!create_if_missing); + EXPECT_TRUE(updateCmd->has_cached_create_if_missing()); + EXPECT_EQ(updateCmd->create_if_missing(), !create_if_missing); + mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd); + EXPECT_TRUE(mbusUpdate->has_cached_create_if_missing()); + EXPECT_EQ(mbusUpdate->create_if_missing(), !create_if_missing); + }; + do_test_update(false); + do_test_update(true); } TEST_F(DocumentApiConverterTest, remove) { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 84e9ab71bcb..849746416d6 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const bool TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const { - return _updateCmd->getUpdate()->getCreateIfNonExistent(); + return _updateCmd->create_if_missing(); } bool diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index 7b6833cc299..2b47d53363f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx, _msg(msg), _entries(std::move(entries)), _new_timestamp(_msg->getTimestamp()), - _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()), + _is_auto_create_update(_msg->create_if_missing()), _node_ctx(node_ctx), _op_ctx(op_ctx), _bucketSpace(bucketSpace), @@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender) copyMessageSettings(*_msg, *command); command->setOldTimestamp(_msg->getOldTimestamp()); command->setCondition(_msg->getCondition()); + if (_msg->has_cached_create_if_missing()) { + command->set_cached_create_if_missing(_msg->create_if_missing()); + } messages.emplace_back(std::move(command), node); } diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp index ca46e87285b..5b8052a05f8 100644 --- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp +++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp @@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg) auto to = std::make_unique<api::UpdateCommand>(bucket, from.stealDocumentUpdate(), from.getNewTimestamp()); to->setOldTimestamp(from.getOldTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } @@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg) to->setOldTimestamp(from.getOldTimestamp()); to->setNewTimestamp(from.getTimestamp()); to->setCondition(from.getCondition()); + if (from.has_cached_create_if_missing()) { + to->set_cached_create_if_missing(from.create_if_missing()); + } toMsg = std::move(to); break; } diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto index 55d516a017b..403752b0c84 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -31,11 +31,18 @@ message Update { } message UpdateRequest { - Bucket bucket = 1; - Update update = 2; - uint64 new_timestamp = 3; - uint64 expected_old_timestamp = 4; // If zero; no expectation. - TestAndSetCondition condition = 5; + enum CreateIfMissing { + UNSPECIFIED = 0; // Legacy fallback: must deserialize `update` to find flag value + TRUE = 1; + FALSE = 2; + } + + Bucket bucket = 1; + Update update = 2; + uint64 new_timestamp = 3; + uint64 expected_old_timestamp = 4; // If zero; no expectation. + TestAndSetCondition condition = 5; + CreateIfMissing create_if_missing = 6; } message UpdateResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 57047be6037..0f4a34cc775 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -465,6 +465,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg) if (msg.getCondition().isPresent()) { set_tas_condition(*req.mutable_condition(), msg.getCondition()); } + if (msg.has_cached_create_if_missing()) { + req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE + : protobuf::UpdateRequest_CreateIfMissing_FALSE); + } }); } @@ -482,6 +486,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf) if (req.has_condition()) { cmd->setCondition(get_tas_condition(req.condition())); } + if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) { + cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE); + } return cmd; }); } diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp index 4c24bb74faf..af054855bbe 100644 --- a/storage/src/vespa/storageapi/message/persistence.cpp +++ b/storage/src/vespa/storageapi/message/persistence.cpp @@ -105,13 +105,23 @@ UpdateCommand::UpdateCommand(const document::Bucket &bucket, const document::Doc : TestAndSetCommand(MessageType::UPDATE, bucket), _update(update), _timestamp(time), - _oldTimestamp(0) + _oldTimestamp(0), + _create_if_missing() { if ( ! _update) { throw vespalib::IllegalArgumentException("Cannot update a null update", VESPA_STRLOC); } } +bool +UpdateCommand::create_if_missing() const +{ + if (_create_if_missing.has_value()) { + return *_create_if_missing; + } + return _update->getCreateIfNonExistent(); +} + const document::DocumentType * UpdateCommand::getDocumentType() const { return &_update->getType(); diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h index f44ab4e8280..0676e1d0f44 100644 --- a/storage/src/vespa/storageapi/message/persistence.h +++ b/storage/src/vespa/storageapi/message/persistence.h @@ -1,8 +1,6 @@ // Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. /** - * @file persistence.h - * - * Persistence related commands, like put, get & remove + * Persistence related commands, like put, get & remove */ #pragma once @@ -10,6 +8,7 @@ #include <vespa/storageapi/defs.h> #include <vespa/document/base/documentid.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> +#include <optional> namespace document { class DocumentUpdate; @@ -117,20 +116,32 @@ class UpdateCommand : public TestAndSetCommand { std::shared_ptr<document::DocumentUpdate> _update; Timestamp _timestamp; Timestamp _oldTimestamp; + std::optional<bool> _create_if_missing; // caches the value held (possibly lazily deserialized) in _update public: UpdateCommand(const document::Bucket &bucket, const std::shared_ptr<document::DocumentUpdate>&, Timestamp); ~UpdateCommand() override; - void setTimestamp(Timestamp ts) { _timestamp = ts; } - void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; } + void setTimestamp(Timestamp ts) noexcept { _timestamp = ts; } + void setOldTimestamp(Timestamp ts) noexcept { _oldTimestamp = ts; } + + [[nodiscard]] bool has_cached_create_if_missing() const noexcept { + return _create_if_missing.has_value(); + } + // It is the caller's responsibility to ensure this value matches that of _update->getCreateIfNonExisting() + void set_cached_create_if_missing(bool create) noexcept { + _create_if_missing = create; + } const std::shared_ptr<document::DocumentUpdate>& getUpdate() const { return _update; } const document::DocumentId& getDocumentId() const override; Timestamp getTimestamp() const { return _timestamp; } Timestamp getOldTimestamp() const { return _oldTimestamp; } + // May throw iff has_cached_create_if_missing() == false, otherwise noexcept. + [[nodiscard]] bool create_if_missing() const; + const document::DocumentType * getDocumentType() const override; vespalib::string getSummary() const override; |