aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-12 18:10:59 +0200
committerGitHub <noreply@github.com>2023-05-12 18:10:59 +0200
commit24630ef218beb0051f09a64d6d215de7918a676e (patch)
tree97617e4a55b64ccc26dc0295e396c849fe0c2a05
parentead796f94c126e51b9a1edaaed5819d9a4eb6999 (diff)
parent9dfd191dbd235a524f76615560366aeb9e2ae079 (diff)
Merge pull request #27096 from vespa-engine/havardpe/write-repair-for-conditional-remove
added write repair logic to RemoveOperation
-rw-r--r--storage/src/tests/distributor/removeoperationtest.cpp162
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp80
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removeoperation.h15
3 files changed, 241 insertions, 16 deletions
diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp
index ed24b7271b8..d88a0a574cc 100644
--- a/storage/src/tests/distributor/removeoperationtest.cpp
+++ b/storage/src/tests/distributor/removeoperationtest.cpp
@@ -8,6 +8,7 @@
#include <vespa/storageapi/message/persistence.h>
#include <vespa/vespalib/gtest/gtest.h>
+using documentapi::TestAndSetCondition;
using document::test::makeDocumentBucket;
using namespace ::testing;
@@ -18,11 +19,14 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
document::BucketId bucketId;
std::unique_ptr<RemoveOperation> op;
- void SetUp() override {
+ void minimal_setup() {
createLinks();
-
docId = document::DocumentId("id:test:test::uri");
bucketId = operation_context().make_split_bit_constrained_bucket_id(docId);
+ }
+
+ void SetUp() override {
+ minimal_setup();
enable_cluster_state("distributor:1 storage:4");
};
@@ -30,9 +34,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
close();
}
- void sendRemove(document::DocumentId dId) {
- auto msg = std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(0)), dId, 100);
-
+ void sendRemove(std::shared_ptr<api::RemoveCommand> msg) {
op = std::make_unique<RemoveOperation>(
node_context(),
operation_context(),
@@ -43,6 +45,14 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
op->start(_sender);
}
+ std::shared_ptr<api::RemoveCommand> createRemove(document::DocumentId dId) {
+ return std::make_shared<api::RemoveCommand>(makeDocumentBucket(document::BucketId(0)), dId, 100);
+ }
+
+ void sendRemove(document::DocumentId dId) {
+ sendRemove(createRemove(dId));
+ }
+
void replyToMessage(RemoveOperation& callback,
uint32_t index,
uint64_t oldTimestamp)
@@ -62,8 +72,59 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil {
void sendRemove() {
sendRemove(docId);
}
+
+ void reply_with(auto msg) { op->receive(_sender, std::move(msg)); }
+
+ auto sent_get_command(size_t idx) { return sent_command<api::GetCommand>(idx); }
+ auto sent_remove_command(size_t idx) { return sent_command<api::RemoveCommand>(idx); }
+
+ auto make_remove_reply(size_t idx, api::Timestamp old_ts) {
+ return std::make_shared<api::RemoveReply>(*sent_remove_command(idx), old_ts);
+ }
+
+ auto make_get_reply(size_t idx, api::Timestamp ts, bool is_tombstone, bool condition_matched) {
+ return std::make_shared<api::GetReply>(*sent_get_command(idx), std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ auto make_failure_reply(size_t idx) {
+ auto reply = sent_command<api::StorageCommand>(idx)->makeReply();
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "did a bork"));
+ return reply;
+ }
+};
+
+struct ExtRemoveOperationTest : public RemoveOperationTest {
+ void SetUp() override { minimal_setup(); }
+ enum class ReplicaState { NONE, CONSISTENT, INCONSISTENT };
+ void set_up_tas_remove_with_2_nodes(ReplicaState state);
};
+void ExtRemoveOperationTest::set_up_tas_remove_with_2_nodes(ReplicaState replica_state) {
+ setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
+ config_enable_condition_probing(true);
+ tag_content_node_supports_condition_probing(0, true);
+ tag_content_node_supports_condition_probing(1, true);
+
+ switch (replica_state) {
+ case ReplicaState::CONSISTENT:
+ addNodesToBucketDB(bucketId, "1=10/20/30,0=10/20/30");
+ break;
+ case ReplicaState::INCONSISTENT:
+ addNodesToBucketDB(bucketId, "1=10/20/30,0=20/30/40");
+ break;
+ case ReplicaState::NONE:
+ break;
+ }
+
+ auto remove = createRemove(docId);
+ remove->setCondition(TestAndSetCondition("test.foo"));
+ sendRemove(std::move(remove));
+ if (replica_state == ReplicaState::INCONSISTENT) {
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ }
+}
+
TEST_F(RemoveOperationTest, simple) {
addNodesToBucketDB(bucketId, "1=0");
@@ -152,4 +213,95 @@ TEST_F(RemoveOperationTest, can_send_remove_when_all_replica_nodes_retired) {
_sender.getLastCommand());
}
+TEST_F(ExtRemoveOperationTest, conditional_removes_are_forwarded_with_condition_when_replicas_are_in_sync) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT));
+ ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true));
+ EXPECT_EQ(_sender.replies().size(), 0);
+ auto remove_n1 = sent_remove_command(0);
+ EXPECT_TRUE(remove_n1->hasTestAndSetCondition());
+ auto remove_n0 = sent_remove_command(1);
+ EXPECT_TRUE(remove_n0->hasTestAndSetCondition());
+}
+
+TEST_F(ExtRemoveOperationTest, conditional_removes_are_instantly_successful_when_there_are_no_replicas) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::NONE));
+ ASSERT_EQ("", _sender.getCommands(true));
+ ASSERT_EQ(_sender.replies().size(), 1);
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, matching_condition_probe_sends_unconditional_removes_to_all_nodes) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 50, false, true));
+ reply_with(make_get_reply(1, 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0,Remove => 1,Remove => 0", _sender.getCommands(true)); // Note: cumulative message list
+
+ auto remove_n1 = sent_remove_command(2);
+ EXPECT_FALSE(remove_n1->hasTestAndSetCondition());
+ auto remove_n0 = sent_remove_command(3);
+ EXPECT_FALSE(remove_n0->hasTestAndSetCondition());
+
+ // Ensure replies are no longer routed to condition checker
+ ASSERT_TRUE(_sender.replies().empty());
+ reply_with(make_remove_reply(2, 50)); // remove from node 1
+ ASSERT_TRUE(_sender.replies().empty());
+ reply_with(make_remove_reply(3, 50)); // remove from node 0
+ ASSERT_EQ(_sender.replies().size(), 1);
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, removed doc from 50) "
+ "ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, mismatching_condition_probe_fails_op_with_tas_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 50, false, false));
+ reply_with(make_get_reply(1, 50, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Condition did not match document)",
+ _sender.getLastReply());
+}
+
+// TODO change semantics for Not Found...
+TEST_F(ExtRemoveOperationTest, not_found_condition_probe_fails_op_with_tas_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 0, false, false));
+ reply_with(make_get_reply(1, 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Document does not exist)",
+ _sender.getLastReply());
+}
+
+TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT));
+
+ reply_with(make_get_reply(0, 0, false, false));
+ reply_with(make_failure_reply(1));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), "
+ "id:test:test::uri, "
+ "timestamp 100, not found) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "One or more replicas failed during test-and-set condition evaluation)",
+ _sender.getLastReply());
+}
+
} // storage::distributor
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
index b752a7fadde..d3001c37f7c 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.cpp
@@ -18,21 +18,50 @@ RemoveOperation::RemoveOperation(const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& metric,
SequencingHandle sequencingHandle)
: SequencedOperation(std::move(sequencingHandle)),
- _trackerInstance(metric,
+ _tracker_instance(metric,
std::make_shared<api::RemoveReply>(*msg),
node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_trackerInstance),
+ _tracker(_tracker_instance),
_msg(std::move(msg)),
+ _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
_node_ctx(node_ctx),
- _bucketSpace(bucketSpace)
+ _op_ctx(op_ctx),
+ _temp_metric(metric), // TODO
+ _bucket_space(bucketSpace),
+ _check_condition()
{
}
RemoveOperation::~RemoveOperation() = default;
-void
-RemoveOperation::onStart(DistributorStripeMessageSender& sender)
-{
+void RemoveOperation::onStart(DistributorStripeMessageSender& sender) {
+ LOG(spam, "Received remove on document %s", _msg->getDocumentId().toString().c_str());
+
+ if (!has_condition()) {
+ start_direct_remove_dispatch(sender);
+ } else {
+ start_conditional_remove(sender);
+ }
+}
+
+void RemoveOperation::start_conditional_remove(DistributorStripeMessageSender& sender) {
+ document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id);
+ _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(),
+ _msg->getCondition(), _node_ctx, _op_ctx,
+ _temp_metric);
+ if (!_check_condition) {
+ start_direct_remove_dispatch(sender);
+ } else {
+ // Inconsistent replicas; need write repair
+ _check_condition->start_and_send(sender);
+ const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately
+ if (outcome) {
+ on_completed_check_condition(*outcome, sender);
+ }
+ }
+}
+
+void RemoveOperation::start_direct_remove_dispatch(DistributorStripeMessageSender& sender) {
LOG(spam, "Started remove on document %s", _msg->getDocumentId().toString().c_str());
document::BucketId bucketId(
@@ -40,7 +69,7 @@ RemoveOperation::onStart(DistributorStripeMessageSender& sender)
_msg->getDocumentId()));
std::vector<BucketDatabase::Entry> entries;
- _bucketSpace.getBucketDatabase().getParents(bucketId, entries);
+ _bucket_space.getBucketDatabase().getParents(bucketId, entries);
bool sent = false;
@@ -68,7 +97,7 @@ RemoveOperation::onStart(DistributorStripeMessageSender& sender)
"Remove document %s failed since no available nodes found. "
"System state is %s",
_msg->getDocumentId().toString().c_str(),
- _bucketSpace.getClusterState().toString().c_str());
+ _bucket_space.getClusterState().toString().c_str());
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::OK));
} else {
@@ -76,10 +105,18 @@ RemoveOperation::onStart(DistributorStripeMessageSender& sender)
}
};
-
void
RemoveOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply> & msg)
{
+ if (_check_condition) {
+ _check_condition->handle_reply(sender, msg);
+ const auto& outcome = _check_condition->maybe_outcome();
+ if (!outcome) {
+ return; // Condition check not done yet
+ }
+ return on_completed_check_condition(*outcome, sender);
+ }
+
auto& reply = static_cast<api::RemoveReply&>(*msg);
if (_tracker.getReply().get()) {
@@ -94,10 +131,35 @@ RemoveOperation::onReceive(DistributorStripeMessageSender& sender, const std::sh
_tracker.receiveReply(sender, reply);
}
+void RemoveOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome,
+ DistributorStripeMessageSender& sender)
+{
+ if (outcome.matched_condition()) {
+ _msg->clear_condition(); // Transform to unconditional Remove
+ start_direct_remove_dispatch(sender);
+ } else if (outcome.not_found()) {
+ // TODO "not found" not a TaS error...
+ _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
+ "Document does not exist"));
+ } else if (outcome.failed()) {
+ api::ReturnCode wrapped_error(outcome.error_code().getResult(),
+ "Failed during write repair condition probe step. Reason: " + outcome.error_code().getMessage());
+ _tracker.fail(sender, wrapped_error);
+ } else {
+ _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
+ "Condition did not match document"));
+ }
+ _check_condition.reset();
+}
+
void
RemoveOperation::onClose(DistributorStripeMessageSender& sender)
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
+bool RemoveOperation::has_condition() const noexcept {
+ return _msg->hasTestAndSetCondition();
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
index 360ab332f96..ba6d42c5108 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/removeoperation.h
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#pragma once
+#include "check_condition.h"
#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
@@ -29,11 +30,21 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
private:
- PersistenceMessageTrackerImpl _trackerInstance;
+ PersistenceMessageTrackerImpl _tracker_instance;
PersistenceMessageTracker& _tracker;
std::shared_ptr<api::RemoveCommand> _msg;
+ document::BucketId _doc_id_bucket_id;
const DistributorNodeContext& _node_ctx;
- DistributorBucketSpace& _bucketSpace;
+ DistributorStripeOperationContext& _op_ctx;
+ PersistenceOperationMetricSet& _temp_metric;
+ DistributorBucketSpace& _bucket_space;
+ std::shared_ptr<CheckCondition> _check_condition;
+
+ void start_direct_remove_dispatch(DistributorStripeMessageSender& sender);
+ void start_conditional_remove(DistributorStripeMessageSender& sender);
+ void on_completed_check_condition(const CheckCondition::Outcome& outcome,
+ DistributorStripeMessageSender& sender);
+ [[nodiscard]] bool has_condition() const noexcept;
};
}