From 9c04f6c0508de1aeea5a163262ae14b1a0bb1faf Mon Sep 17 00:00:00 2001 From: Tor Brede Vekterli Date: Wed, 19 Apr 2023 14:05:42 +0000 Subject: Add condition support to distributor `GetOperation` This involves two things: * Propagate input condition to sent Get requests when present * Add condition match status to newest replica metadata aggregation --- .../messagebus/messages/testandsetcondition.h | 4 + storage/src/tests/distributor/getoperationtest.cpp | 137 +++++++++++++++++---- .../operations/external/getoperation.cpp | 9 +- .../operations/external/newest_replica.cpp | 1 + .../operations/external/newest_replica.h | 11 +- .../src/vespa/storageapi/message/persistence.cpp | 12 +- 6 files changed, 143 insertions(+), 31 deletions(-) diff --git a/documentapi/src/vespa/documentapi/messagebus/messages/testandsetcondition.h b/documentapi/src/vespa/documentapi/messagebus/messages/testandsetcondition.h index c06c1767414..1e0bfda986c 100644 --- a/documentapi/src/vespa/documentapi/messagebus/messages/testandsetcondition.h +++ b/documentapi/src/vespa/documentapi/messagebus/messages/testandsetcondition.h @@ -27,6 +27,10 @@ public: const vespalib::string & getSelection() const { return _selection; } bool isPresent() const noexcept { return !_selection.empty(); } + + bool operator==(const TestAndSetCondition& rhs) const noexcept { + return (_selection == rhs._selection); + } }; } diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 36a1495579f..6601fcabbeb 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -19,11 +19,11 @@ #include #include -using std::shared_ptr; using config::ConfigGetter; using config::FileSpec; using document::test::makeDocumentBucket; using document::BucketId; +using documentapi::TestAndSetCondition; using namespace ::testing; namespace storage::distributor { @@ -54,23 +54,28 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { op.reset(); } - void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) { - auto msg = std::make_shared(makeDocumentBucket(BucketId(0)), docId, document::AllFields::NAME); + void start_operation(std::shared_ptr cmd, api::InternalReadConsistency consistency) { op = std::make_unique( node_context(), getDistributorBucketSpace(), getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(), - msg, metrics().gets, + std::move(cmd), metrics().gets, consistency); op->start(_sender); } + void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) { + auto msg = std::make_shared(makeDocumentBucket(BucketId(0)), docId, document::AllFields::NAME); + start_operation(std::move(msg), consistency); + } + static constexpr uint32_t LastCommand = UINT32_MAX; void sendReply(uint32_t idx, api::ReturnCode::Result result, std::string authorVal, uint32_t timestamp, - bool is_tombstone = false) + bool is_tombstone = false, + bool condition_matched = false) { if (idx == LastCommand) { idx = _sender.commands().size() - 1; @@ -91,7 +96,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { document::StringFieldValue(authorVal)); } - auto reply = std::make_shared(*tmp, doc, timestamp, false, is_tombstone); + auto reply = std::make_shared(*tmp, doc, timestamp, false, is_tombstone, condition_matched); reply->setResult(result); op->receive(_sender, reply); @@ -101,6 +106,10 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { sendReply(idx, api::ReturnCode::OK, "", tombstone_ts, true); } + void reply_with_condition_match(uint32_t idx, uint32_t timestamp, bool condition_match) { + sendReply(idx, api::ReturnCode::OK, "", timestamp, false, condition_match); + } + void replyWithFailure() { sendReply(LastCommand, api::ReturnCode::IO_FAILURE, "", 0); } @@ -147,6 +156,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { } void do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency); + void set_up_condition_match_get_operation(); }; GetOperationTest::GetOperationTest() = default; @@ -154,8 +164,10 @@ GetOperationTest::~GetOperationTest() = default; namespace { -NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node, bool is_tombstone) { - return NewestReplica::of(ts, bucket_id, node, is_tombstone); +NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node, + bool is_tombstone, bool condition_matched) +{ + return NewestReplica::of(ts, bucket_id, node, is_tombstone, condition_matched); } } @@ -177,7 +189,7 @@ TEST_F(GetOperationTest, simple) { EXPECT_FALSE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) { @@ -198,7 +210,7 @@ TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) { @@ -221,7 +233,7 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) { EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, send_to_all_invalid_copies) { @@ -289,7 +301,7 @@ TEST_F(GetOperationTest, inconsistent_split) { EXPECT_FALSE(last_reply_had_consistent_replicas()); // Bucket with highest timestamp should be returned. In this case it's the one on node 0. ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) { @@ -333,7 +345,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) { EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, multi_inconsistent_bucket) { @@ -383,7 +395,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) { EXPECT_FALSE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); // First send to node 2 fails, second is to node 3 which returned the highest timestamp - EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) { @@ -423,7 +435,7 @@ TEST_F(GetOperationTest, not_found) { // the caller may want to perform special logic if all replicas are in sync // but are missing the document. // FIXME make sure all callers are aware of this! - EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, not_found_on_subset_of_replicas_marks_get_as_inconsistent) { @@ -468,7 +480,7 @@ TEST_F(GetOperationTest, resend_on_storage_failure) { // Replica had read failure, but they're still in sync. An immutable Get won't change that fact. EXPECT_TRUE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) { @@ -485,7 +497,7 @@ TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_in EXPECT_TRUE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) { @@ -534,7 +546,7 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) { _sender.getLastReply()); EXPECT_TRUE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { @@ -565,7 +577,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) { EXPECT_TRUE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); ASSERT_TRUE(op->newest_replica().has_value()); - EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2, false, false), *op->newest_replica()); } TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) { @@ -610,7 +622,7 @@ TEST_F(GetOperationTest, replicas_considered_consistent_if_all_equal_tombstone_t EXPECT_FALSE(op->any_replicas_failed()); EXPECT_TRUE(last_reply_had_consistent_replicas()); EXPECT_FALSE(last_reply_has_document()); - EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, true), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, true, false), *op->newest_replica()); } TEST_F(GetOperationTest, newer_tombstone_hides_older_document) { @@ -628,7 +640,7 @@ TEST_F(GetOperationTest, newer_tombstone_hides_older_document) { EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); EXPECT_FALSE(last_reply_has_document()); - EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 1, true), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 1, true, false), *op->newest_replica()); } TEST_F(GetOperationTest, older_tombstone_does_not_hide_newer_document) { @@ -646,7 +658,88 @@ TEST_F(GetOperationTest, older_tombstone_does_not_hide_newer_document) { EXPECT_FALSE(op->any_replicas_failed()); EXPECT_FALSE(last_reply_had_consistent_replicas()); EXPECT_TRUE(last_reply_has_document()); - EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 0, false), *op->newest_replica()); + EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 0, false, false), *op->newest_replica()); +} + +TEST_F(GetOperationTest, provided_condition_is_propagated_to_sent_gets) { + setClusterState("distributor:1 storage:1"); + addNodesToBucketDB(bucketId, "0=123"); + + TestAndSetCondition my_cond("my_cool_condition"); + auto msg = std::make_shared(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME); + msg->set_condition(my_cond); + + start_operation(std::move(msg), api::InternalReadConsistency::Strong); + ASSERT_EQ("Get => 0", _sender.getCommands(true)); + auto& cmd = dynamic_cast(*_sender.command(0)); + EXPECT_EQ(cmd.condition().getSelection(), my_cond.getSelection()); +} + +void GetOperationTest::set_up_condition_match_get_operation() { + setClusterState("distributor:1 storage:3"); + addNodesToBucketDB(bucketId, "0=100,2=200,1=300"); + + TestAndSetCondition my_cond("my_cool_condition"); + auto msg = std::make_shared(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME); + msg->set_condition(my_cond); + start_operation(std::move(msg), api::InternalReadConsistency::Strong); + + ASSERT_EQ("Get => 0,Get => 2,Get => 1", _sender.getCommands(true)); +} + +TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica_mismatch_case) { + ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation()); + // node 0 (send index 0) has an old doc without a match + // node 2 (send index 1) has an old tombstone without match + // node 1 (send index 2) has a new doc without a match + // Newest replica should reflect node 1's results + ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(0, 200, false)); + ASSERT_NO_FATAL_FAILURE(reply_with_tombstone(1, 100)); + ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(2, 300, false)); + + ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, timestamp 300) ReturnCode(NONE)", + _sender.getLastReply()); + EXPECT_FALSE(op->any_replicas_failed()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); + EXPECT_FALSE(last_reply_has_document()); + EXPECT_EQ(replica_of(api::Timestamp(300), bucketId, 1, false, false), *op->newest_replica()); +} + +TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica_match_case) { + ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation()); + // node 0 (send index 0) has a new doc with a match + // node 2 (send index 1) has an old tombstone without match + // node 1 (send index 2) has an old doc without a match + // Newest replica should reflect node 0's results + ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(0, 400, true)); + ASSERT_NO_FATAL_FAILURE(reply_with_tombstone(1, 300)); + ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(2, 200, false)); + + ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, timestamp 400) ReturnCode(NONE)", + _sender.getLastReply()); + EXPECT_FALSE(op->any_replicas_failed()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); + EXPECT_FALSE(last_reply_has_document()); + EXPECT_EQ(replica_of(api::Timestamp(400), bucketId, 0, false, true), *op->newest_replica()); +} + +TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica_tombstone_case) { + ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation()); + // node 0 (send index 0) has an old doc with a match + // node 2 (send index 1) has a new tombstone without match + // node 1 (send index 2) has an old doc without a match + // Newest replica should reflect node 2's results + ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(0, 400, true)); + ASSERT_NO_FATAL_FAILURE(reply_with_tombstone(1, 500)); + ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(2, 300, false)); + + // Timestamp 0 in reply signals "not found" to clients + ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, timestamp 0) ReturnCode(NONE)", + _sender.getLastReply()); + EXPECT_FALSE(op->any_replicas_failed()); + EXPECT_FALSE(last_reply_had_consistent_replicas()); + EXPECT_FALSE(last_reply_has_document()); + EXPECT_EQ(replica_of(api::Timestamp(500), bucketId, 2, true, false), *op->newest_replica()); } } diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 41260115297..44148bdbf25 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -112,6 +112,9 @@ GetOperation::sendForChecksum(DistributorStripeMessageSender& sender, const docu _msg->getFieldSet(), _msg->getBeforeTimestamp()); copyMessageSettings(*_msg, *command); command->set_internal_read_consistency(_desired_read_consistency); + if (_msg->has_condition()) { + command->set_condition(_msg->condition()); + } LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode()); @@ -175,9 +178,9 @@ GetOperation::onReceive(DistributorStripeMessageSender& sender, const std::share if (!_newest_replica.has_value() || getreply->getLastModifiedTimestamp() > _newest_replica->timestamp) { _returnCode = getreply->getResult(); assert(response.second[i].to_node != UINT16_MAX); - _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id, - send_state.to_node, getreply->is_tombstone()); - _doc = getreply->getDocument(); // May be empty (tombstones). + _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id, send_state.to_node, + getreply->is_tombstone(), getreply->condition_matched()); + _doc = getreply->getDocument(); // May be empty (tombstones or metadata-only). } } else { _any_replicas_failed = true; diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp index 4632501d59b..cfc03882ed3 100644 --- a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp @@ -9,6 +9,7 @@ std::ostream& operator<<(std::ostream& os, const NewestReplica& nr) { << ", bucket_id " << nr.bucket_id << ", node " << nr.node << ", is_tombstone " << nr.is_tombstone + << ", condition_matched " << nr.condition_matched << ')'; return os; } diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h index 5fcb719aee2..94274747f30 100644 --- a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h +++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h @@ -18,23 +18,26 @@ struct NewestReplica { document::BucketId bucket_id; uint16_t node {UINT16_MAX}; bool is_tombstone {false}; + bool condition_matched {false}; // Only relevant if a condition was initially sent static NewestReplica of(api::Timestamp timestamp, const document::BucketId& bucket_id, uint16_t node, - bool is_tombstone) noexcept { - return {timestamp, bucket_id, node, is_tombstone}; + bool is_tombstone, + bool condition_matched) noexcept { + return {timestamp, bucket_id, node, is_tombstone, condition_matched}; } static NewestReplica make_empty() { - return {api::Timestamp(0), document::BucketId(), 0, false}; + return {api::Timestamp(0), document::BucketId(), 0, false, false}; } bool operator==(const NewestReplica& rhs) const noexcept { return ((timestamp == rhs.timestamp) && (bucket_id == rhs.bucket_id) && (node == rhs.node) && - (is_tombstone == rhs.is_tombstone)); + (is_tombstone == rhs.is_tombstone) && + (condition_matched == rhs.condition_matched)); } bool operator!=(const NewestReplica& rhs) const noexcept { return !(*this == rhs); diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp index 1b09639fd9b..a8fa9a0bba1 100644 --- a/storage/src/vespa/storageapi/message/persistence.cpp +++ b/storage/src/vespa/storageapi/message/persistence.cpp @@ -202,7 +202,11 @@ GetCommand::getSummary() const { vespalib::asciistream stream; stream << "Get(BucketId(" << vespalib::hex << getBucketId().getId() << "), " << _docId.toString() - << ", beforetimestamp " << vespalib::dec << _beforeTimestamp << ')'; + << ", beforetimestamp " << vespalib::dec << _beforeTimestamp; + if (has_condition()) { + stream << ", condition " << condition().getSelection(); + } + stream << ')'; return stream.str(); } @@ -211,7 +215,11 @@ GetCommand::getSummary() const void GetCommand::print(std::ostream& out, bool verbose, const std::string& indent) const { - out << "Get(" << getBucketId() << ", " << _docId << ")"; + out << "Get(" << getBucketId() << ", " << _docId; + if (has_condition()) { + out << ", condition " << condition().getSelection(); + } + out << ")"; if (verbose) { out << " : "; BucketCommand::print(out, verbose, indent); -- cgit v1.2.3