From 9dfd191dbd235a524f76615560366aeb9e2ae079 Mon Sep 17 00:00:00 2001 From: HÃ¥vard Pettersen Date: Fri, 12 May 2023 12:52:27 +0000 Subject: added write repair logic to RemoveOperation --- .../src/tests/distributor/removeoperationtest.cpp | 162 ++++++++++++++++++++- .../operations/external/removeoperation.cpp | 80 ++++++++-- .../operations/external/removeoperation.h | 15 +- 3 files changed, 241 insertions(+), 16 deletions(-) diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index ed24b7271b8..d88a0a574cc 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -8,6 +8,7 @@ #include #include +using documentapi::TestAndSetCondition; using document::test::makeDocumentBucket; using namespace ::testing; @@ -18,11 +19,14 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { document::BucketId bucketId; std::unique_ptr op; - void SetUp() override { + void minimal_setup() { createLinks(); - docId = document::DocumentId("id:test:test::uri"); bucketId = operation_context().make_split_bit_constrained_bucket_id(docId); + } + + void SetUp() override { + minimal_setup(); enable_cluster_state("distributor:1 storage:4"); }; @@ -30,9 +34,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { close(); } - void sendRemove(document::DocumentId dId) { - auto msg = std::make_shared(makeDocumentBucket(document::BucketId(0)), dId, 100); - + void sendRemove(std::shared_ptr msg) { op = std::make_unique( node_context(), operation_context(), @@ -43,6 +45,14 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { op->start(_sender); } + std::shared_ptr createRemove(document::DocumentId dId) { + return std::make_shared(makeDocumentBucket(document::BucketId(0)), dId, 100); + } + + void sendRemove(document::DocumentId dId) { + sendRemove(createRemove(dId)); + } + void replyToMessage(RemoveOperation& callback, uint32_t index, uint64_t oldTimestamp) @@ -62,8 +72,59 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { void sendRemove() { sendRemove(docId); } + + void reply_with(auto msg) { op->receive(_sender, std::move(msg)); } + + auto sent_get_command(size_t idx) { return sent_command(idx); } + auto sent_remove_command(size_t idx) { return sent_command(idx); } + + auto make_remove_reply(size_t idx, api::Timestamp old_ts) { + return std::make_shared(*sent_remove_command(idx), old_ts); + } + + auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) { + return std::make_shared(*sent_get_command(idx), std::shared_ptr(), ts, + false, is_tombstone, condition_matched); + } + + auto make_failure_reply(size_t idx) { + auto reply = sent_command(idx)->makeReply(); + reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "did a bork")); + return reply; + } +}; + +struct ExtRemoveOperationTest : public RemoveOperationTest { + void SetUp() override { minimal_setup(); } + enum class ReplicaState { NONE, CONSISTENT, INCONSISTENT }; + void set_up_tas_remove_with_2_nodes(ReplicaState state); }; +void ExtRemoveOperationTest::set_up_tas_remove_with_2_nodes(ReplicaState replica_state) { + setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1"); + config_enable_condition_probing(true); + tag_content_node_supports_condition_probing(0, true); + tag_content_node_supports_condition_probing(1, true); + + switch (replica_state) { + case ReplicaState::CONSISTENT: + addNodesToBucketDB(bucketId, "1=10/20/30,0=10/20/30"); + break; + case ReplicaState::INCONSISTENT: + addNodesToBucketDB(bucketId, "1=10/20/30,0=20/30/40"); + break; + case ReplicaState::NONE: + break; + } + + auto remove = createRemove(docId); + remove->setCondition(TestAndSetCondition("test.foo")); + sendRemove(std::move(remove)); + if (replica_state == ReplicaState::INCONSISTENT) { + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + } +} + TEST_F(RemoveOperationTest, simple) { addNodesToBucketDB(bucketId, "1=0"); @@ -152,4 +213,95 @@ TEST_F(RemoveOperationTest, can_send_remove_when_all_replica_nodes_retired) { _sender.getLastCommand()); } +TEST_F(ExtRemoveOperationTest, conditional_removes_are_forwarded_with_condition_when_replicas_are_in_sync) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT)); + ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true)); + EXPECT_EQ(_sender.replies().size(), 0); + auto remove_n1 = sent_remove_command(0); + EXPECT_TRUE(remove_n1->hasTestAndSetCondition()); + auto remove_n0 = sent_remove_command(1); + EXPECT_TRUE(remove_n0->hasTestAndSetCondition()); +} + +TEST_F(ExtRemoveOperationTest, conditional_removes_are_instantly_successful_when_there_are_no_replicas) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::NONE)); + ASSERT_EQ("", _sender.getCommands(true)); + ASSERT_EQ(_sender.replies().size(), 1); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(NONE)", + _sender.getLastReply()); +} + +TEST_F(ExtRemoveOperationTest, matching_condition_probe_sends_unconditional_removes_to_all_nodes) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 50, false, true)); + reply_with(make_get_reply(1, 50, false, true)); + + ASSERT_EQ("Get => 1,Get => 0,Remove => 1,Remove => 0", _sender.getCommands(true)); // Note: cumulative message list + + auto remove_n1 = sent_remove_command(2); + EXPECT_FALSE(remove_n1->hasTestAndSetCondition()); + auto remove_n0 = sent_remove_command(3); + EXPECT_FALSE(remove_n0->hasTestAndSetCondition()); + + // Ensure replies are no longer routed to condition checker + ASSERT_TRUE(_sender.replies().empty()); + reply_with(make_remove_reply(2, 50)); // remove from node 1 + ASSERT_TRUE(_sender.replies().empty()); + reply_with(make_remove_reply(3, 50)); // remove from node 0 + ASSERT_EQ(_sender.replies().size(), 1); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, removed doc from 50) " + "ReturnCode(NONE)", + _sender.getLastReply()); +} + +TEST_F(ExtRemoveOperationTest, mismatching_condition_probe_fails_op_with_tas_error) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 50, false, false)); + reply_with(make_get_reply(1, 50, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Condition did not match document)", + _sender.getLastReply()); +} + +// TODO change semantics for Not Found... +TEST_F(ExtRemoveOperationTest, not_found_condition_probe_fails_op_with_tas_error) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 0, false, false)); + reply_with(make_get_reply(1, 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Document does not exist)", + _sender.getLastReply()); +} + +TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_error) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 0, false, false)); + reply_with(make_failure_reply(1)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "One or more replicas failed during test-and-set condition evaluation)", + _sender.getLastReply()); +} + } // storage::distributor diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp index b752a7fadde..d3001c37f7c 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp @@ -18,21 +18,50 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx, PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) : SequencedOperation(std::move(sequencingHandle)), - _trackerInstance(metric, + _tracker_instance(metric, std::make_shared(*msg), node_ctx, op_ctx, msg->getTimestamp()), - _tracker(_trackerInstance), + _tracker(_tracker_instance), _msg(std::move(msg)), + _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())), _node_ctx(node_ctx), - _bucketSpace(bucketSpace) + _op_ctx(op_ctx), + _temp_metric(metric), // TODO + _bucket_space(bucketSpace), + _check_condition() { } RemoveOperation::~RemoveOperation() = default; -void -RemoveOperation::onStart(DistributorStripeMessageSender& sender) -{ +void RemoveOperation::onStart(DistributorStripeMessageSender& sender) { + LOG(spam, "Received remove on document %s", _msg->getDocumentId().toString().c_str()); + + if (!has_condition()) { + start_direct_remove_dispatch(sender); + } else { + start_conditional_remove(sender); + } +} + +void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& sender) { + document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id); + _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(), + _msg->getCondition(), _node_ctx, _op_ctx, + _temp_metric); + if (!_check_condition) { + start_direct_remove_dispatch(sender); + } else { + // Inconsistent replicas; need write repair + _check_condition->start_and_send(sender); + const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately + if (outcome) { + on_completed_check_condition(*outcome, sender); + } + } +} + +void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSender& sender) { LOG(spam, "Started remove on document %s", _msg->getDocumentId().toString().c_str()); document::BucketId bucketId( @@ -40,7 +69,7 @@ RemoveOperation::onStart(DistributorStripeMessageSender& sender) _msg->getDocumentId())); std::vector entries; - _bucketSpace.getBucketDatabase().getParents(bucketId, entries); + _bucket_space.getBucketDatabase().getParents(bucketId, entries); bool sent = false; @@ -68,7 +97,7 @@ RemoveOperation::onStart(DistributorStripeMessageSender& sender) "Remove document %s failed since no available nodes found. " "System state is %s", _msg->getDocumentId().toString().c_str(), - _bucketSpace.getClusterState().toString().c_str()); + _bucket_space.getClusterState().toString().c_str()); _tracker.fail(sender, api::ReturnCode(api::ReturnCode::OK)); } else { @@ -76,10 +105,18 @@ RemoveOperation::onStart(DistributorStripeMessageSender& sender) } }; - void RemoveOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr & msg) { + if (_check_condition) { + _check_condition->handle_reply(sender, msg); + const auto& outcome = _check_condition->maybe_outcome(); + if (!outcome) { + return; // Condition check not done yet + } + return on_completed_check_condition(*outcome, sender); + } + auto& reply = static_cast(*msg); if (_tracker.getReply().get()) { @@ -94,10 +131,35 @@ RemoveOperation::onReceive(DistributorStripeMessageSender& sender, const std::sh _tracker.receiveReply(sender, reply); } +void RemoveOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome, + DistributorStripeMessageSender& sender) +{ + if (outcome.matched_condition()) { + _msg->clear_condition(); // Transform to unconditional Remove + start_direct_remove_dispatch(sender); + } else if (outcome.not_found()) { + // TODO "not found" not a TaS error... + _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, + "Document does not exist")); + } else if (outcome.failed()) { + api::ReturnCode wrapped_error(outcome.error_code().getResult(), + "Failed during write repair condition probe step. Reason: " + outcome.error_code().getMessage()); + _tracker.fail(sender, wrapped_error); + } else { + _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, + "Condition did not match document")); + } + _check_condition.reset(); +} + void RemoveOperation::onClose(DistributorStripeMessageSender& sender) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } +bool RemoveOperation::has_condition() const noexcept { + return _msg->hasTestAndSetCondition(); +} + } diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h index 360ab332f96..ba6d42c5108 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #pragma once +#include "check_condition.h" #include #include @@ -29,11 +30,21 @@ public: void onClose(DistributorStripeMessageSender& sender) override; private: - PersistenceMessageTrackerImpl _trackerInstance; + PersistenceMessageTrackerImpl _tracker_instance; PersistenceMessageTracker& _tracker; std::shared_ptr _msg; + document::BucketId _doc_id_bucket_id; const DistributorNodeContext& _node_ctx; - DistributorBucketSpace& _bucketSpace; + DistributorStripeOperationContext& _op_ctx; + PersistenceOperationMetricSet& _temp_metric; + DistributorBucketSpace& _bucket_space; + std::shared_ptr _check_condition; + + void start_direct_remove_dispatch(DistributorStripeMessageSender& sender); + void start_conditional_remove(DistributorStripeMessageSender& sender); + void on_completed_check_condition(const CheckCondition::Outcome& outcome, + DistributorStripeMessageSender& sender); + [[nodiscard]] bool has_condition() const noexcept; }; } -- cgit v1.2.3