summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@vespa.ai>2024-04-25 12:01:53 +0000
committerTor Brede Vekterli <vekterli@vespa.ai>2024-04-26 12:19:50 +0000
commit2f960fa96fc8b09da846b3fc0e7cc80bacf76746 (patch)
tree94e1044dc3ad6022840f4382550022c2e0404e46 /storage
parente045e41b93f74f05bbcfc5be244ed6901fea8b68 (diff)
Propagate "create if missing"-flag outside binary Update payload in protocols
Avoids potentially having to deserialize the entire update just to get to a single bit of information that is technically metadata existing orthogonally to the document update itself. To ensure backwards/forwards compatibility, the flag is propagated as a Protobuf `enum` where the default value is a special "unspecified" sentinel, implying an old sender. Since the Java protocol implementation always eagerly deserializes messages, it unconditionally assigns the `create_if_missing` field when sending and completely ignores it when receiving. The C++ protocol implementation observes and propagates the field iff set. Otherwise the flag is deferred to the update object as before. This applies to both the DocumentAPI and StorageAPI protocols.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/updateoperationtest.cpp23
-rw-r--r--storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp34
-rw-r--r--storage/src/tests/storageserver/documentapiconvertertest.cpp62
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp5
-rw-r--r--storage/src/vespa/storage/storageserver/documentapiconverter.cpp6
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto17
-rw-r--r--storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp7
-rw-r--r--storage/src/vespa/storageapi/message/persistence.cpp12
-rw-r--r--storage/src/vespa/storageapi/message/persistence.h21
10 files changed, 150 insertions, 39 deletions
diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp
index 31ebbe19cbb..e00ce249298 100644
--- a/storage/src/tests/distributor/updateoperationtest.cpp
+++ b/storage/src/tests/distributor/updateoperationtest.cpp
@@ -49,13 +49,13 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil {
const api::ReturnCode& result = api::ReturnCode());
std::shared_ptr<UpdateOperation>
- sendUpdate(const std::string& bucketState, bool create_if_missing = false);
+ sendUpdate(const std::string& bucketState, bool create_if_missing = false, bool cache_create_flag = false);
document::BucketId _bId;
};
std::shared_ptr<UpdateOperation>
-UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing)
+UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_missing, bool cache_create_flag)
{
auto update = std::make_shared<document::DocumentUpdate>(
*_repo, *_html_type,
@@ -67,6 +67,9 @@ UpdateOperationTest::sendUpdate(const std::string& bucketState, bool create_if_m
addNodesToBucketDB(_bId, bucketState);
auto msg = std::make_shared<api::UpdateCommand>(makeDocumentBucket(document::BucketId(0)), update, 100);
+ if (cache_create_flag) {
+ msg->set_cached_create_if_missing(create_if_missing);
+ }
return std::make_shared<UpdateOperation>(
node_context(), operation_context(), getDistributorBucketSpace(), msg, std::vector<BucketDatabase::Entry>(),
@@ -271,4 +274,20 @@ TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) {
dumpBucket(_bId));
}
+TEST_F(UpdateOperationTest, cached_create_if_missing_is_propagated_to_fanout_requests) {
+ setup_stripe(1, 1, "distributor:1 storage:1");
+ for (bool cache_flag : {false, true}) {
+ for (bool create_if_missing : {false, true}) {
+ std::shared_ptr<UpdateOperation> cb(sendUpdate("0=1/2/3", create_if_missing, cache_flag));
+ DistributorMessageSenderStub sender;
+ cb->start(sender);
+
+ ASSERT_EQ("Update => 0", sender.getCommands(true));
+ auto& cmd = dynamic_cast<api::UpdateCommand&>(*sender.command(0));
+ EXPECT_EQ(cmd.has_cached_create_if_missing(), cache_flag);
+ EXPECT_EQ(cmd.create_if_missing(), create_if_missing);
+ }
+ }
+}
+
}
diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
index 698d8dee573..231f41ffd21 100644
--- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
+++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp
@@ -262,7 +262,6 @@ TEST_P(StorageProtocolTest, response_metadata_is_propagated) {
TEST_P(StorageProtocolTest, update) {
auto update = std::make_shared<document::DocumentUpdate>(_docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17))));
-
update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0"));
auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
@@ -284,6 +283,37 @@ TEST_P(StorageProtocolTest, update) {
EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2));
}
+TEST_P(StorageProtocolTest, update_request_create_if_missing_flag_is_propagated) {
+ auto make_update_cmd = [&](bool create_if_missing, bool cached) {
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _docMan.getTypeRepo(), *_testDoc->getDataType(), _testDoc->getId());
+ update->addUpdate(FieldUpdate(_testDoc->getField("headerval")).addUpdate(
+ std::make_unique<AssignValueUpdate>(std::make_unique<IntFieldValue>(17))));
+ update->addFieldPathUpdate(std::make_unique<RemoveFieldPathUpdate>("headerval", "testdoctype1.headerval > 0"));
+ update->setCreateIfNonExistent(create_if_missing);
+ auto cmd = std::make_shared<UpdateCommand>(_bucket, update, 14);
+ if (cached) {
+ cmd->set_cached_create_if_missing(create_if_missing);
+ }
+ return cmd;
+ };
+
+ auto check_flag_propagation = [&](bool create_if_missing, bool cached) {
+ auto cmd = make_update_cmd(create_if_missing, cached);
+ EXPECT_EQ(cmd->has_cached_create_if_missing(), cached);
+ EXPECT_EQ(cmd->create_if_missing(), create_if_missing);
+
+ auto cmd2 = copyCommand(cmd);
+ EXPECT_EQ(cmd2->has_cached_create_if_missing(), cached);
+ EXPECT_EQ(cmd2->create_if_missing(), create_if_missing);
+ };
+
+ check_flag_propagation(false, false);
+ check_flag_propagation(true, false);
+ check_flag_propagation(false, true);
+ check_flag_propagation(true, true);
+}
+
TEST_P(StorageProtocolTest, get) {
auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123);
auto cmd2 = copyCommand(cmd);
@@ -880,7 +910,7 @@ TEST_P(StorageProtocolTest, track_memory_footprint_for_some_messages) {
EXPECT_EQ(sizeof(BucketInfoCommand), sizeof(BucketCommand));
EXPECT_EQ(sizeof(TestAndSetCommand), sizeof(BucketInfoCommand) + sizeof(vespalib::string));
EXPECT_EQ(sizeof(PutCommand), sizeof(TestAndSetCommand) + 40);
- EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 32);
+ EXPECT_EQ(sizeof(UpdateCommand), sizeof(TestAndSetCommand) + 40);
EXPECT_EQ(sizeof(RemoveCommand), sizeof(TestAndSetCommand) + 112);
EXPECT_EQ(sizeof(GetCommand), sizeof(BucketInfoCommand) + sizeof(TestAndSetCondition) + 184);
}
diff --git a/storage/src/tests/storageserver/documentapiconvertertest.cpp b/storage/src/tests/storageserver/documentapiconvertertest.cpp
index eb4789b25d4..1eb6bf5dd9a 100644
--- a/storage/src/tests/storageserver/documentapiconvertertest.cpp
+++ b/storage/src/tests/storageserver/documentapiconvertertest.cpp
@@ -159,28 +159,46 @@ TEST_F(DocumentApiConverterTest, forwarded_put) {
}
TEST_F(DocumentApiConverterTest, update) {
- auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId);
- documentapi::UpdateDocumentMessage updateMsg(update);
- updateMsg.setOldTimestamp(1234);
- updateMsg.setNewTimestamp(5678);
- updateMsg.setCondition(my_condition);
-
- auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg);
- EXPECT_EQ(defaultBucket, updateCmd->getBucket());
- ASSERT_EQ(update.get(), updateCmd->getUpdate().get());
- EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp());
- EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp());
- EXPECT_EQ(my_condition, updateCmd->getCondition());
-
- auto mbusReply = updateMsg.createReply();
- ASSERT_TRUE(mbusReply.get());
- toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd);
-
- auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd);
- ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get());
- EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp());
- EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp());
- EXPECT_EQ(my_condition, mbusUpdate->getCondition());
+ auto do_test_update = [&](bool create_if_missing) {
+ auto update = std::make_shared<document::DocumentUpdate>(*_repo, _html_type, defaultDocId);
+ update->setCreateIfNonExistent(create_if_missing);
+ documentapi::UpdateDocumentMessage updateMsg(update);
+ updateMsg.setOldTimestamp(1234);
+ updateMsg.setNewTimestamp(5678);
+ updateMsg.setCondition(my_condition);
+ EXPECT_FALSE(updateMsg.has_cached_create_if_missing());
+ EXPECT_EQ(updateMsg.create_if_missing(), create_if_missing);
+
+ auto updateCmd = toStorageAPI<api::UpdateCommand>(updateMsg);
+ EXPECT_EQ(defaultBucket, updateCmd->getBucket());
+ ASSERT_EQ(update.get(), updateCmd->getUpdate().get());
+ EXPECT_EQ(api::Timestamp(1234), updateCmd->getOldTimestamp());
+ EXPECT_EQ(api::Timestamp(5678), updateCmd->getTimestamp());
+ EXPECT_EQ(my_condition, updateCmd->getCondition());
+ EXPECT_FALSE(updateCmd->has_cached_create_if_missing());
+ EXPECT_EQ(updateCmd->create_if_missing(), create_if_missing);
+
+ auto mbusReply = updateMsg.createReply();
+ ASSERT_TRUE(mbusReply.get());
+ toStorageAPI<api::UpdateReply>(*mbusReply, *updateCmd);
+
+ auto mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd);
+ ASSERT_EQ((&mbusUpdate->getDocumentUpdate()), update.get());
+ EXPECT_EQ(api::Timestamp(1234), mbusUpdate->getOldTimestamp());
+ EXPECT_EQ(api::Timestamp(5678), mbusUpdate->getNewTimestamp());
+ EXPECT_EQ(my_condition, mbusUpdate->getCondition());
+ EXPECT_EQ(mbusUpdate->create_if_missing(), create_if_missing);
+
+ // Cached value of create_if_missing should override underlying update's value
+ updateCmd->set_cached_create_if_missing(!create_if_missing);
+ EXPECT_TRUE(updateCmd->has_cached_create_if_missing());
+ EXPECT_EQ(updateCmd->create_if_missing(), !create_if_missing);
+ mbusUpdate = toDocumentAPI<documentapi::UpdateDocumentMessage>(*updateCmd);
+ EXPECT_TRUE(mbusUpdate->has_cached_create_if_missing());
+ EXPECT_EQ(mbusUpdate->create_if_missing(), !create_if_missing);
+ };
+ do_test_update(false);
+ do_test_update(true);
}
TEST_F(DocumentApiConverterTest, remove) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 84e9ab71bcb..849746416d6 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -668,7 +668,7 @@ TwoPhaseUpdateOperation::applyUpdateToDocument(document::Document& doc) const
bool
TwoPhaseUpdateOperation::shouldCreateIfNonExistent() const
{
- return _updateCmd->getUpdate()->getCreateIfNonExistent();
+ return _updateCmd->create_if_missing();
}
bool
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index 7b6833cc299..2b47d53363f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -29,7 +29,7 @@ UpdateOperation::UpdateOperation(const DistributorNodeContext& node_ctx,
_msg(msg),
_entries(std::move(entries)),
_new_timestamp(_msg->getTimestamp()),
- _is_auto_create_update(_msg->getUpdate()->getCreateIfNonExistent()),
+ _is_auto_create_update(_msg->create_if_missing()),
_node_ctx(node_ctx),
_op_ctx(op_ctx),
_bucketSpace(bucketSpace),
@@ -112,6 +112,9 @@ UpdateOperation::onStart(DistributorStripeMessageSender& sender)
copyMessageSettings(*_msg, *command);
command->setOldTimestamp(_msg->getOldTimestamp());
command->setCondition(_msg->getCondition());
+ if (_msg->has_cached_create_if_missing()) {
+ command->set_cached_create_if_missing(_msg->create_if_missing());
+ }
messages.emplace_back(std::move(command), node);
}
diff --git a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
index ca46e87285b..5b8052a05f8 100644
--- a/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
+++ b/storage/src/vespa/storage/storageserver/documentapiconverter.cpp
@@ -54,6 +54,9 @@ DocumentApiConverter::toStorageAPI(documentapi::DocumentMessage& fromMsg)
auto to = std::make_unique<api::UpdateCommand>(bucket, from.stealDocumentUpdate(), from.getNewTimestamp());
to->setOldTimestamp(from.getOldTimestamp());
to->setCondition(from.getCondition());
+ if (from.has_cached_create_if_missing()) {
+ to->set_cached_create_if_missing(from.create_if_missing());
+ }
toMsg = std::move(to);
break;
}
@@ -217,6 +220,9 @@ DocumentApiConverter::toDocumentAPI(api::StorageCommand& fromMsg)
to->setOldTimestamp(from.getOldTimestamp());
to->setNewTimestamp(from.getTimestamp());
to->setCondition(from.getCondition());
+ if (from.has_cached_create_if_missing()) {
+ to->set_cached_create_if_missing(from.create_if_missing());
+ }
toMsg = std::move(to);
break;
}
diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
index 55d516a017b..403752b0c84 100644
--- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
+++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto
@@ -31,11 +31,18 @@ message Update {
}
message UpdateRequest {
- Bucket bucket = 1;
- Update update = 2;
- uint64 new_timestamp = 3;
- uint64 expected_old_timestamp = 4; // If zero; no expectation.
- TestAndSetCondition condition = 5;
+ enum CreateIfMissing {
+ UNSPECIFIED = 0; // Legacy fallback: must deserialize `update` to find flag value
+ TRUE = 1;
+ FALSE = 2;
+ }
+
+ Bucket bucket = 1;
+ Update update = 2;
+ uint64 new_timestamp = 3;
+ uint64 expected_old_timestamp = 4; // If zero; no expectation.
+ TestAndSetCondition condition = 5;
+ CreateIfMissing create_if_missing = 6;
}
message UpdateResponse {
diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
index 57047be6037..0f4a34cc775 100644
--- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
+++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp
@@ -465,6 +465,10 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::UpdateCommand& msg)
if (msg.getCondition().isPresent()) {
set_tas_condition(*req.mutable_condition(), msg.getCondition());
}
+ if (msg.has_cached_create_if_missing()) {
+ req.set_create_if_missing(msg.create_if_missing() ? protobuf::UpdateRequest_CreateIfMissing_TRUE
+ : protobuf::UpdateRequest_CreateIfMissing_FALSE);
+ }
});
}
@@ -482,6 +486,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeUpdateCommand(BBuf& buf)
if (req.has_condition()) {
cmd->setCondition(get_tas_condition(req.condition()));
}
+ if (req.create_if_missing() != protobuf::UpdateRequest_CreateIfMissing_UNSPECIFIED) {
+ cmd->set_cached_create_if_missing(req.create_if_missing() == protobuf::UpdateRequest_CreateIfMissing_TRUE);
+ }
return cmd;
});
}
diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp
index 4c24bb74faf..af054855bbe 100644
--- a/storage/src/vespa/storageapi/message/persistence.cpp
+++ b/storage/src/vespa/storageapi/message/persistence.cpp
@@ -105,13 +105,23 @@ UpdateCommand::UpdateCommand(const document::Bucket &bucket, const document::Doc
: TestAndSetCommand(MessageType::UPDATE, bucket),
_update(update),
_timestamp(time),
- _oldTimestamp(0)
+ _oldTimestamp(0),
+ _create_if_missing()
{
if ( ! _update) {
throw vespalib::IllegalArgumentException("Cannot update a null update", VESPA_STRLOC);
}
}
+bool
+UpdateCommand::create_if_missing() const
+{
+ if (_create_if_missing.has_value()) {
+ return *_create_if_missing;
+ }
+ return _update->getCreateIfNonExistent();
+}
+
const document::DocumentType *
UpdateCommand::getDocumentType() const {
return &_update->getType();
diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h
index f44ab4e8280..0676e1d0f44 100644
--- a/storage/src/vespa/storageapi/message/persistence.h
+++ b/storage/src/vespa/storageapi/message/persistence.h
@@ -1,8 +1,6 @@
// Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
/**
- * @file persistence.h
- *
- * Persistence related commands, like put, get & remove
+ * Persistence related commands, like put, get & remove
*/
#pragma once
@@ -10,6 +8,7 @@
#include <vespa/storageapi/defs.h>
#include <vespa/document/base/documentid.h>
#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
+#include <optional>
namespace document {
class DocumentUpdate;
@@ -117,20 +116,32 @@ class UpdateCommand : public TestAndSetCommand {
std::shared_ptr<document::DocumentUpdate> _update;
Timestamp _timestamp;
Timestamp _oldTimestamp;
+ std::optional<bool> _create_if_missing; // caches the value held (possibly lazily deserialized) in _update
public:
UpdateCommand(const document::Bucket &bucket,
const std::shared_ptr<document::DocumentUpdate>&, Timestamp);
~UpdateCommand() override;
- void setTimestamp(Timestamp ts) { _timestamp = ts; }
- void setOldTimestamp(Timestamp ts) { _oldTimestamp = ts; }
+ void setTimestamp(Timestamp ts) noexcept { _timestamp = ts; }
+ void setOldTimestamp(Timestamp ts) noexcept { _oldTimestamp = ts; }
+
+ [[nodiscard]] bool has_cached_create_if_missing() const noexcept {
+ return _create_if_missing.has_value();
+ }
+ // It is the caller's responsibility to ensure this value matches that of _update->getCreateIfNonExisting()
+ void set_cached_create_if_missing(bool create) noexcept {
+ _create_if_missing = create;
+ }
const std::shared_ptr<document::DocumentUpdate>& getUpdate() const { return _update; }
const document::DocumentId& getDocumentId() const override;
Timestamp getTimestamp() const { return _timestamp; }
Timestamp getOldTimestamp() const { return _oldTimestamp; }
+ // May throw iff has_cached_create_if_missing() == false, otherwise noexcept.
+ [[nodiscard]] bool create_if_missing() const;
+
const document::DocumentType * getDocumentType() const override;
vespalib::string getSummary() const override;