aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-04-25 11:38:51 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-05-04 11:58:02 +0000
commit36c29c6a56ba24a5ec0e3b6de565a19ebd3b2b03 (patch)
tree58bcca93297ff7b5ac5039b04dec308c9522ad1c /storage/src/tests
parentf4442c9332f73e21be58cc538fca9851120b7553 (diff)
Support write-repair for conditional Put operations
This adds a generic `CheckCondition` utility class that can be explicitly invoked by operations requiring a distinct condition evaluation phase. For now, this is only used by `PutOperation`. Distributed condition checking is only used _iff_ the target bucket's replica set is inconsistent and all involved content nodes support condition probing. Write-repair is currently disabled by default, as there does not yet exist any config wired in that enables it (coming as part of a follow-up). Also currently missing is proper metric wiring and MBus reply trace propagation for condition-checking sub-operations.
Diffstat (limited to 'storage/src/tests')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp222
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp14
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h14
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp106
5 files changed, 355 insertions, 2 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 11bbf2c241a..88ffa4e764a 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -9,6 +9,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
bucketdbmetricupdatertest.cpp
bucketgctimecalculatortest.cpp
bucketstateoperationtest.cpp
+ check_condition_test.cpp
distributor_bucket_space_repo_test.cpp
distributor_bucket_space_test.cpp
distributor_host_info_reporter_test.cpp
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
new file mode 100644
index 00000000000..7e42236f660
--- /dev/null
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -0,0 +1,222 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
+#include <vespa/storage/distributor/node_supported_features.h>
+#include <vespa/storage/distributor/operations/external/check_condition.h>
+#include <vespa/storage/distributor/persistence_operation_metric_set.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/distributor/distributor_stripe_test_util.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+using namespace document;
+using documentapi::TestAndSetCondition;
+
+class CheckConditionTest
+ : public Test,
+ public DistributorStripeTestUtil
+{
+public:
+ DocumentId _doc_id{"id:foo:testdoctype1:n=1234:bar"};
+ BucketId _bucket_id{16, 1234};
+ TestAndSetCondition _tas_cond{"foo or bar"};
+ PersistenceOperationMetricSet _metrics{"dummy_metrics", nullptr};
+
+ CheckConditionTest();
+ ~CheckConditionTest() override;
+
+ void SetUp() override {
+ createLinks();
+ // By default, set up 2 nodes {0, 1} with mutually out of sync replica state
+ // and with both reporting that they support condition probing.
+ setup_stripe(2, 2, "version:1 storage:2 distributor:1");
+ config_enable_condition_probing(true);
+ tag_content_node_supports_condition_probing(0, true);
+ tag_content_node_supports_condition_probing(1, true);
+ addNodesToBucketDB(_bucket_id, "0=10/20/30/t,1=40/50/60");
+ };
+
+ void TearDown() override {
+ close();
+ }
+
+ std::shared_ptr<CheckCondition> create_check_condition() {
+ auto& bucket_space = getDistributorBucketSpace();
+ auto doc_bucket = BucketIdFactory{}.getBucketId(_doc_id);
+ auto bucket = Bucket(FixedBucketSpaces::default_space(), _bucket_id);
+ assert(_bucket_id.contains(doc_bucket));
+ return CheckCondition::create_if_inconsistent_replicas(bucket, bucket_space, _doc_id, _tas_cond,
+ node_context(), operation_context(), _metrics);
+ }
+
+ std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) {
+ return sent_command<api::GetCommand>(idx);
+ }
+
+ std::shared_ptr<api::PutCommand> sent_put_command(size_t idx) {
+ return sent_command<api::PutCommand>(idx);
+ }
+
+ static std::shared_ptr<api::GetReply> make_reply(const api::GetCommand& cmd, api::Timestamp ts,
+ bool is_tombstone, bool condition_matched)
+ {
+ return std::make_shared<api::GetReply>(cmd, std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ std::shared_ptr<api::GetReply> make_matched_reply(size_t cmd_idx, api::Timestamp ts = 1000) {
+ return make_reply(*sent_get_command(cmd_idx), ts, false, true);
+ }
+
+ std::shared_ptr<api::GetReply> make_mismatched_reply(size_t cmd_idx, api::Timestamp ts = 1000) {
+ return make_reply(*sent_get_command(cmd_idx), ts, false, false);
+ }
+
+ std::shared_ptr<api::GetReply> make_not_found_non_tombstone_reply(size_t cmd_idx) {
+ return make_reply(*sent_get_command(cmd_idx), 0, false, false);
+ }
+
+ std::shared_ptr<api::GetReply> make_tombstone_reply(size_t cmd_idx, api::Timestamp ts = 1000) {
+ return make_reply(*sent_get_command(cmd_idx), ts, true, false);
+ }
+
+ std::shared_ptr<api::GetReply> make_failed_reply(size_t cmd_idx) {
+ auto reply = make_reply(*sent_get_command(cmd_idx), 0, false, false);
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "did a bork"));
+ return reply;
+ }
+
+ void test_cond_with_2_gets_sent(const std::function<void(CheckCondition&)>& reply_invoker,
+ const std::function<void(const CheckCondition::Outcome&)>& outcome_checker)
+ {
+ auto cond = create_check_condition();
+ ASSERT_TRUE(cond);
+ cond->start_and_send(_sender);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_invoker(*cond);
+ auto& outcome = cond->maybe_outcome();
+ ASSERT_TRUE(outcome);
+ outcome_checker(*outcome);
+ }
+};
+
+CheckConditionTest::CheckConditionTest() = default;
+CheckConditionTest::~CheckConditionTest() = default;
+
+TEST_F(CheckConditionTest, no_checker_returned_when_config_disabled) {
+ config_enable_condition_probing(false);
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, no_checker_returned_when_probing_not_supported_on_at_least_one_node) {
+ tag_content_node_supports_condition_probing(1, false);
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, no_checker_returned_when_bucket_replicas_are_consistent) {
+ addNodesToBucketDB(_bucket_id, "0=10/20/30/t,1=10/20/30");
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, no_checker_returned_when_empty_replica_set) {
+ removeFromBucketDB(_bucket_id);
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, starting_sends_condition_probe_gets) {
+ auto cond = create_check_condition();
+ ASSERT_TRUE(cond);
+ EXPECT_FALSE(cond->maybe_outcome());
+ // We don't test too much of the Get functionality, as that's already covered by GetOperation tests.
+ // But we test the main binding glue between the two components.
+ cond->start_and_send(_sender);
+ EXPECT_FALSE(cond->maybe_outcome());
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ auto cmd = sent_get_command(0);
+ EXPECT_EQ(cmd->getDocumentId(), _doc_id);
+ EXPECT_EQ(cmd->condition(), _tas_cond);
+ EXPECT_EQ(cmd->getFieldSet(), NoFields::NAME);
+ EXPECT_EQ(cmd->internal_read_consistency(), api::InternalReadConsistency::Strong);
+}
+
+TEST_F(CheckConditionTest, condition_matching_completes_check_with_match_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_TRUE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, condition_mismatching_completes_check_with_mismatch_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0, 1000));
+ cond.handle_reply(_sender, make_mismatched_reply(1, 1001));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, not_found_non_tombstone_completes_check_with_not_found_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_not_found_non_tombstone_reply(0));
+ cond.handle_reply(_sender, make_not_found_non_tombstone_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_TRUE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, not_found_with_tombstone_completes_check_with_not_found_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0, 1000));
+ cond.handle_reply(_sender, make_tombstone_reply(1, 1001));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_TRUE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.handle_reply(_sender, make_failed_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ // Simulate node 0 going down, with new cluster state version push and implicit DB removal
+ enable_cluster_state("version:2 storage:1 distributor:1");
+ addNodesToBucketDB(_bucket_id, "1=10/20/30");
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND);
+ });
+}
+
+}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 58902e2eb94..7a64eda28ff 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -568,4 +568,18 @@ DistributorStripeTestUtil::setSystemState(const lib::ClusterState& systemState)
_stripe->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
}
+void
+DistributorStripeTestUtil::config_enable_condition_probing(bool enable) {
+ auto cfg = make_config();
+ cfg->set_enable_condition_probing(enable);
+ configure_stripe(cfg);
+}
+
+void
+DistributorStripeTestUtil::tag_content_node_supports_condition_probing(uint16_t index, bool supported) {
+ NodeSupportedFeatures features;
+ features.document_condition_probe = supported;
+ set_node_supported_features(index, features);
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 4b489d41f7d..707418512f9 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -40,7 +40,7 @@ class DistributorStripeTestUtil : public DoneInitializeHandler,
public StripeHostInfoNotifier {
public:
DistributorStripeTestUtil();
- ~DistributorStripeTestUtil();
+ ~DistributorStripeTestUtil() override;
/**
* Sets up the storage link chain.
@@ -211,6 +211,18 @@ public:
void simulate_set_pending_cluster_state(const vespalib::string& state_str);
void clear_pending_cluster_state_bundle();
+ template <typename CmdType>
+ requires std::is_base_of_v<api::StorageCommand, CmdType>
+ [[nodiscard]] std::shared_ptr<CmdType> sent_command(size_t idx) {
+ assert(idx < _sender.commands().size());
+ auto cmd = std::dynamic_pointer_cast<CmdType>(_sender.command(idx));
+ assert(cmd != nullptr);
+ return cmd;
+ }
+
+ void config_enable_condition_probing(bool enable);
+ void tag_content_node_supports_condition_probing(uint16_t index, bool supported);
+
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 2a3f06b1e8c..a69cf8ec54c 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -63,7 +63,7 @@ public:
}
std::shared_ptr<api::StorageCommand> msg = _sender.command(idx);
- api::StorageReply::SP reply(msg->makeReply().release());
+ api::StorageReply::SP reply(msg->makeReply());
dynamic_cast<api::BucketInfoReply&>(*reply).setBucketInfo(info);
reply->setResult(result);
@@ -96,6 +96,30 @@ public:
}
void set_up_3_nodes_and_send_put_with_create_bucket_acks();
+
+ std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) {
+ return sent_command<api::GetCommand>(idx);
+ }
+
+ std::shared_ptr<api::PutCommand> sent_put_command(size_t idx) {
+ return sent_command<api::PutCommand>(idx);
+ }
+
+ static std::shared_ptr<api::GetReply> make_get_reply(const api::GetCommand& cmd, api::Timestamp ts,
+ bool is_tombstone, bool condition_matched)
+ {
+ return std::make_shared<api::GetReply>(cmd, std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ std::shared_ptr<api::GetReply> make_failed_get_reply(size_t cmd_idx) {
+ auto reply = make_get_reply(*sent_get_command(cmd_idx), 0, false, false);
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "did a bork"));
+ return reply;
+ }
+
+ void set_up_tas_put_with_2_inconsistent_replica_nodes();
+
};
PutOperationTest::~PutOperationTest() = default;
@@ -650,4 +674,84 @@ TEST_F(PutOperationTest, minority_failure_override_not_in_effect_for_non_tas_err
_sender.getLastReply());
}
+void PutOperationTest::set_up_tas_put_with_2_inconsistent_replica_nodes() {
+ setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
+ config_enable_condition_probing(true);
+ tag_content_node_supports_condition_probing(0, true);
+ tag_content_node_supports_condition_probing(1, true);
+
+ auto doc = createDummyDocument("test", "test");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "1=10/20/30,0=20/30/40");
+
+ auto put = createPut(doc);
+ put->setCondition(TestAndSetCondition("test.foo"));
+ sendPut(std::move(put));
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+}
+
+TEST_F(PutOperationTest, matching_condition_probe_sends_unconditional_puts_to_all_nodes) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 50, false, true));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0,Put => 1,Put => 0", _sender.getCommands(true)); // Note: cumulative message list
+
+ auto put_n1 = sent_put_command(2);
+ EXPECT_FALSE(put_n1->hasTestAndSetCondition());
+ auto put_n0 = sent_put_command(3);
+ EXPECT_FALSE(put_n0->hasTestAndSetCondition());
+
+ // Ensure replies are no longer routed to condition checker
+ ASSERT_TRUE(_sender.replies().empty());
+ sendReply(2); // put to node 1
+ sendReply(3); // put to node 0
+ EXPECT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, mismatching_condition_probe_fails_op_with_tas_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 50, false, false));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Condition did not match document)",
+ _sender.getLastReply());
+}
+
+// TODO change semantics for Not Found...
+TEST_F(PutOperationTest, not_found_condition_probe_fails_op_with_tas_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Document does not exist)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->receive(_sender, make_failed_get_reply(1));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "One or more replicas failed during test-and-set condition evaluation)",
+ _sender.getLastReply());
+}
+
}