diff options
8 files changed, 38 insertions, 1 deletions
diff --git a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp index d5d33a178fe..567e0a947da 100644 --- a/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp +++ b/storage/src/tests/distributor/top_level_bucket_db_updater_test.cpp @@ -2513,6 +2513,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).unordered_merge_chaining); EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).two_phase_remove_location); EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).no_implicit_indexing_of_active_buckets); + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(i).document_condition_probe); } } @@ -2526,6 +2527,7 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_ reply.supported_node_features().unordered_merge_chaining = true; reply.supported_node_features().two_phase_remove_location = true; reply.supported_node_features().no_implicit_indexing_of_active_buckets = true; + reply.supported_node_features().document_condition_probe = true; } })); } @@ -2535,14 +2537,17 @@ TEST_F(TopLevelBucketDBUpdaterTest, node_feature_sets_are_aggregated_from_nodes_ EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).unordered_merge_chaining); EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).two_phase_remove_location); EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).no_implicit_indexing_of_active_buckets); + EXPECT_FALSE(s->node_supported_features_repo().node_supported_features(0).document_condition_probe); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).unordered_merge_chaining); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).two_phase_remove_location); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).no_implicit_indexing_of_active_buckets); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(1).document_condition_probe); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).unordered_merge_chaining); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).two_phase_remove_location); EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).no_implicit_indexing_of_active_buckets); + EXPECT_TRUE(s->node_supported_features_repo().node_supported_features(2).document_condition_probe); } } diff --git a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp index 6d8c3585726..3a3a3a6b016 100644 --- a/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp +++ b/storage/src/tests/storageapi/mbusprot/storageprotocoltest.cpp @@ -289,6 +289,7 @@ TEST_P(StorageProtocolTest, get) { EXPECT_EQ(Timestamp(123), reply2->getBeforeTimestamp()); EXPECT_EQ(Timestamp(100), reply2->getLastModifiedTimestamp()); EXPECT_FALSE(reply2->is_tombstone()); + EXPECT_FALSE(reply2->condition_matched()); EXPECT_NO_FATAL_FAILURE(assert_bucket_info_reply_fields_propagated(*reply2)); } @@ -310,6 +311,13 @@ TEST_P(StorageProtocolTest, can_set_internal_read_consistency_on_get_commands) { EXPECT_EQ(cmd2->internal_read_consistency(), InternalReadConsistency::Strong); } +TEST_P(StorageProtocolTest, get_command_with_condition) { + auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar,vekterli", 123); + cmd->set_condition(TestAndSetCondition(CONDITION_STRING)); + auto cmd2 = copyCommand(cmd); + EXPECT_EQ(cmd->condition().getSelection(), cmd2->condition().getSelection()); +} + TEST_P(StorageProtocolTest, tombstones_propagated_for_gets) { auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar", 123); auto reply = std::make_shared<GetReply>(*cmd, std::shared_ptr<Document>(), 100, false, true); @@ -323,6 +331,14 @@ TEST_P(StorageProtocolTest, tombstones_propagated_for_gets) { EXPECT_TRUE(reply2->is_tombstone()); } +TEST_P(StorageProtocolTest, condition_matched_propagated_for_get_result) { + auto cmd = std::make_shared<GetCommand>(_bucket, _testDocId, "foo,bar", 123); + auto reply = std::make_shared<GetReply>(*cmd, std::shared_ptr<Document>(), 100, false, false, true); + set_dummy_bucket_info_reply_fields(*reply); + auto reply2 = copyReply(reply); + EXPECT_TRUE(reply2->condition_matched()); +} + TEST_P(StorageProtocolTest, remove) { auto cmd = std::make_shared<RemoveCommand>(_bucket, _testDocId, 159); auto cmd2 = copyCommand(cmd); @@ -390,6 +406,7 @@ TEST_P(StorageProtocolTest, request_bucket_info) { EXPECT_TRUE(reply2->supported_node_features().unordered_merge_chaining); EXPECT_TRUE(reply2->supported_node_features().two_phase_remove_location); EXPECT_TRUE(reply2->supported_node_features().no_implicit_indexing_of_active_buckets); + EXPECT_TRUE(reply2->supported_node_features().document_condition_probe); } } diff --git a/storage/src/vespa/storage/distributor/node_supported_features.h b/storage/src/vespa/storage/distributor/node_supported_features.h index bbd17403a6d..f4c9553775b 100644 --- a/storage/src/vespa/storage/distributor/node_supported_features.h +++ b/storage/src/vespa/storage/distributor/node_supported_features.h @@ -14,6 +14,7 @@ struct NodeSupportedFeatures { bool unordered_merge_chaining = false; bool two_phase_remove_location = false; bool no_implicit_indexing_of_active_buckets = false; + bool document_condition_probe = false; bool operator==(const NodeSupportedFeatures& rhs) const noexcept = default; }; diff --git a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp index 36f7af1c1e6..9d25e19deb2 100644 --- a/storage/src/vespa/storage/distributor/pendingclusterstate.cpp +++ b/storage/src/vespa/storage/distributor/pendingclusterstate.cpp @@ -333,6 +333,7 @@ PendingClusterState::update_node_supported_features_from_reply(uint16_t node, co dest_feat.unordered_merge_chaining = src_feat.unordered_merge_chaining; dest_feat.two_phase_remove_location = src_feat.two_phase_remove_location; dest_feat.no_implicit_indexing_of_active_buckets = src_feat.no_implicit_indexing_of_active_buckets; + dest_feat.document_condition_probe = src_feat.document_condition_probe; // This will overwrite per bucket-space reply, but does not matter since it's independent of bucket space. _node_features.insert(std::make_pair(node, dest_feat)); } diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto index 92c0fdc0b87..b02e1eab0aa 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/feed.proto @@ -66,6 +66,7 @@ message GetRequest { Weak = 1; } InternalReadConsistency internal_read_consistency = 5; + TestAndSetCondition condition = 6; } message GetResponse { @@ -76,6 +77,7 @@ message GetResponse { // Note: last_modified_timestamp and tombstone_timestamp are mutually exclusive. // Tracked separately (rather than being a flag bool) to avoid issues during rolling upgrades. uint64 tombstone_timestamp = 5; + bool condition_matched = 6; } message RevertRequest { diff --git a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto index 0c1df005a5a..74b2646463a 100644 --- a/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto +++ b/storage/src/vespa/storageapi/mbusprot/protobuf/maintenance.proto @@ -113,6 +113,7 @@ message SupportedNodeFeatures { bool unordered_merge_chaining = 1; bool two_phase_remove_location = 2; bool no_implicit_indexing_of_active_buckets = 3; + bool document_condition_probe = 4; } message RequestBucketInfoResponse { diff --git a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp index 2083dd2700d..0ede96179e8 100644 --- a/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp +++ b/storage/src/vespa/storageapi/mbusprot/protocolserialization7.cpp @@ -555,6 +555,9 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetCommand& msg) co req.set_field_set(msg.getFieldSet().data(), msg.getFieldSet().size()); } req.set_internal_read_consistency(read_consistency_to_protobuf(msg.internal_read_consistency())); + if (msg.has_condition()) { + set_tas_condition(*req.mutable_condition(), msg.condition()); + } }); } @@ -574,6 +577,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::GetReply& msg) cons // found document for older versions. res.set_last_modified_timestamp(0); } + res.set_condition_matched(msg.condition_matched()); }); } @@ -583,6 +587,9 @@ api::StorageCommand::UP ProtocolSerialization7::onDecodeGetCommand(BBuf& buf) co auto op = std::make_unique<api::GetCommand>(bucket, std::move(doc_id), req.field_set(), req.before_timestamp()); op->set_internal_read_consistency(read_consistency_from_protobuf(req.internal_read_consistency())); + if (req.has_condition()) { + op->set_condition(get_tas_condition(req.condition())); + } return op; }); } @@ -596,7 +603,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeGetReply(const SCmd& cmd, : res.last_modified_timestamp()); return std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), std::move(document), effective_timestamp, - false, is_tombstone); + false, is_tombstone, res.condition_matched()); } catch (std::exception& e) { auto reply = std::make_unique<api::GetReply>(static_cast<const api::GetCommand&>(cmd), std::shared_ptr<document::Document>(), 0u); @@ -1063,6 +1070,7 @@ void ProtocolSerialization7::onEncode(GBBuf& buf, const api::RequestBucketInfoRe res.mutable_supported_node_features()->set_unordered_merge_chaining(true); res.mutable_supported_node_features()->set_two_phase_remove_location(true); res.mutable_supported_node_features()->set_no_implicit_indexing_of_active_buckets(true); + res.mutable_supported_node_features()->set_document_condition_probe(true); } }); } @@ -1106,6 +1114,7 @@ api::StorageReply::UP ProtocolSerialization7::onDecodeRequestBucketInfoReply(con dest_features.unordered_merge_chaining = src_features.unordered_merge_chaining(); dest_features.two_phase_remove_location = src_features.two_phase_remove_location(); dest_features.no_implicit_indexing_of_active_buckets = src_features.no_implicit_indexing_of_active_buckets(); + dest_features.document_condition_probe = src_features.document_condition_probe(); } return reply; }); diff --git a/storage/src/vespa/storageapi/message/bucket.h b/storage/src/vespa/storageapi/message/bucket.h index 801c75322b3..e02b8fcd672 100644 --- a/storage/src/vespa/storageapi/message/bucket.h +++ b/storage/src/vespa/storageapi/message/bucket.h @@ -396,6 +396,7 @@ public: bool unordered_merge_chaining = false; bool two_phase_remove_location = false; bool no_implicit_indexing_of_active_buckets = false; + bool document_condition_probe = false; }; using EntryVector = std::vector<Entry, vespalib::allocator_large<Entry>>; private: |