summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-04-25 11:38:51 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-05-04 11:58:02 +0000
commit36c29c6a56ba24a5ec0e3b6de565a19ebd3b2b03 (patch)
tree58bcca93297ff7b5ac5039b04dec308c9522ad1c /storage
parentf4442c9332f73e21be58cc538fca9851120b7553 (diff)
Support write-repair for conditional Put operations
This adds a generic `CheckCondition` utility class that can be explicitly invoked by operations requiring a distinct condition evaluation phase. For now, this is only used by `PutOperation`. Distributed condition checking is only used _iff_ the target bucket's replica set is inconsistent and all involved content nodes support condition probing. Write-repair is currently disabled by default, as there does not yet exist any config wired in that enables it (coming as part of a follow-up). Also currently missing is proper metric wiring and MBus reply trace propagation for condition-checking sub-operations.
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/CMakeLists.txt1
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp222
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.cpp14
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h14
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp106
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.cpp1
-rw-r--r--storage/src/vespa/storage/config/distributorconfiguration.h7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp216
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h160
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp52
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h33
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp124
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h25
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp10
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp4
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp60
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h38
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp2
-rw-r--r--storage/src/vespa/storage/distributor/sentmessagemap.h3
-rw-r--r--storage/src/vespa/storageapi/message/persistence.h1
25 files changed, 975 insertions, 130 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt
index 11bbf2c241a..88ffa4e764a 100644
--- a/storage/src/tests/distributor/CMakeLists.txt
+++ b/storage/src/tests/distributor/CMakeLists.txt
@@ -9,6 +9,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST
bucketdbmetricupdatertest.cpp
bucketgctimecalculatortest.cpp
bucketstateoperationtest.cpp
+ check_condition_test.cpp
distributor_bucket_space_repo_test.cpp
distributor_bucket_space_test.cpp
distributor_host_info_reporter_test.cpp
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
new file mode 100644
index 00000000000..7e42236f660
--- /dev/null
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -0,0 +1,222 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/document/bucket/fixed_bucket_spaces.h>
+#include <vespa/document/base/documentid.h>
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/documentapi/messagebus/messages/testandsetcondition.h>
+#include <vespa/storage/distributor/node_supported_features.h>
+#include <vespa/storage/distributor/operations/external/check_condition.h>
+#include <vespa/storage/distributor/persistence_operation_metric_set.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <tests/distributor/distributor_stripe_test_util.h>
+#include <vespa/vespalib/gtest/gtest.h>
+
+using namespace ::testing;
+
+namespace storage::distributor {
+
+using namespace document;
+using documentapi::TestAndSetCondition;
+
+class CheckConditionTest
+ : public Test,
+ public DistributorStripeTestUtil
+{
+public:
+ DocumentId _doc_id{"id:foo:testdoctype1:n=1234:bar"};
+ BucketId _bucket_id{16, 1234};
+ TestAndSetCondition _tas_cond{"foo or bar"};
+ PersistenceOperationMetricSet _metrics{"dummy_metrics", nullptr};
+
+ CheckConditionTest();
+ ~CheckConditionTest() override;
+
+ void SetUp() override {
+ createLinks();
+ // By default, set up 2 nodes {0, 1} with mutually out of sync replica state
+ // and with both reporting that they support condition probing.
+ setup_stripe(2, 2, "version:1 storage:2 distributor:1");
+ config_enable_condition_probing(true);
+ tag_content_node_supports_condition_probing(0, true);
+ tag_content_node_supports_condition_probing(1, true);
+ addNodesToBucketDB(_bucket_id, "0=10/20/30/t,1=40/50/60");
+ };
+
+ void TearDown() override {
+ close();
+ }
+
+ std::shared_ptr<CheckCondition> create_check_condition() {
+ auto& bucket_space = getDistributorBucketSpace();
+ auto doc_bucket = BucketIdFactory{}.getBucketId(_doc_id);
+ auto bucket = Bucket(FixedBucketSpaces::default_space(), _bucket_id);
+ assert(_bucket_id.contains(doc_bucket));
+ return CheckCondition::create_if_inconsistent_replicas(bucket, bucket_space, _doc_id, _tas_cond,
+ node_context(), operation_context(), _metrics);
+ }
+
+ std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) {
+ return sent_command<api::GetCommand>(idx);
+ }
+
+ std::shared_ptr<api::PutCommand> sent_put_command(size_t idx) {
+ return sent_command<api::PutCommand>(idx);
+ }
+
+ static std::shared_ptr<api::GetReply> make_reply(const api::GetCommand& cmd, api::Timestamp ts,
+ bool is_tombstone, bool condition_matched)
+ {
+ return std::make_shared<api::GetReply>(cmd, std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ std::shared_ptr<api::GetReply> make_matched_reply(size_t cmd_idx, api::Timestamp ts = 1000) {
+ return make_reply(*sent_get_command(cmd_idx), ts, false, true);
+ }
+
+ std::shared_ptr<api::GetReply> make_mismatched_reply(size_t cmd_idx, api::Timestamp ts = 1000) {
+ return make_reply(*sent_get_command(cmd_idx), ts, false, false);
+ }
+
+ std::shared_ptr<api::GetReply> make_not_found_non_tombstone_reply(size_t cmd_idx) {
+ return make_reply(*sent_get_command(cmd_idx), 0, false, false);
+ }
+
+ std::shared_ptr<api::GetReply> make_tombstone_reply(size_t cmd_idx, api::Timestamp ts = 1000) {
+ return make_reply(*sent_get_command(cmd_idx), ts, true, false);
+ }
+
+ std::shared_ptr<api::GetReply> make_failed_reply(size_t cmd_idx) {
+ auto reply = make_reply(*sent_get_command(cmd_idx), 0, false, false);
+ reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "did a bork"));
+ return reply;
+ }
+
+ void test_cond_with_2_gets_sent(const std::function<void(CheckCondition&)>& reply_invoker,
+ const std::function<void(const CheckCondition::Outcome&)>& outcome_checker)
+ {
+ auto cond = create_check_condition();
+ ASSERT_TRUE(cond);
+ cond->start_and_send(_sender);
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_invoker(*cond);
+ auto& outcome = cond->maybe_outcome();
+ ASSERT_TRUE(outcome);
+ outcome_checker(*outcome);
+ }
+};
+
+CheckConditionTest::CheckConditionTest() = default;
+CheckConditionTest::~CheckConditionTest() = default;
+
+TEST_F(CheckConditionTest, no_checker_returned_when_config_disabled) {
+ config_enable_condition_probing(false);
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, no_checker_returned_when_probing_not_supported_on_at_least_one_node) {
+ tag_content_node_supports_condition_probing(1, false);
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, no_checker_returned_when_bucket_replicas_are_consistent) {
+ addNodesToBucketDB(_bucket_id, "0=10/20/30/t,1=10/20/30");
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, no_checker_returned_when_empty_replica_set) {
+ removeFromBucketDB(_bucket_id);
+ auto cond = create_check_condition();
+ EXPECT_FALSE(cond);
+}
+
+TEST_F(CheckConditionTest, starting_sends_condition_probe_gets) {
+ auto cond = create_check_condition();
+ ASSERT_TRUE(cond);
+ EXPECT_FALSE(cond->maybe_outcome());
+ // We don't test too much of the Get functionality, as that's already covered by GetOperation tests.
+ // But we test the main binding glue between the two components.
+ cond->start_and_send(_sender);
+ EXPECT_FALSE(cond->maybe_outcome());
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ auto cmd = sent_get_command(0);
+ EXPECT_EQ(cmd->getDocumentId(), _doc_id);
+ EXPECT_EQ(cmd->condition(), _tas_cond);
+ EXPECT_EQ(cmd->getFieldSet(), NoFields::NAME);
+ EXPECT_EQ(cmd->internal_read_consistency(), api::InternalReadConsistency::Strong);
+}
+
+TEST_F(CheckConditionTest, condition_matching_completes_check_with_match_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_TRUE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, condition_mismatching_completes_check_with_mismatch_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0, 1000));
+ cond.handle_reply(_sender, make_mismatched_reply(1, 1001));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, not_found_non_tombstone_completes_check_with_not_found_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_not_found_non_tombstone_reply(0));
+ cond.handle_reply(_sender, make_not_found_non_tombstone_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_TRUE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, not_found_with_tombstone_completes_check_with_not_found_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0, 1000));
+ cond.handle_reply(_sender, make_tombstone_reply(1, 1001));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_TRUE(outcome.not_found());
+ EXPECT_FALSE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ cond.handle_reply(_sender, make_failed_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ });
+}
+
+TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_matched_reply(0));
+ // Simulate node 0 going down, with new cluster state version push and implicit DB removal
+ enable_cluster_state("version:2 storage:1 distributor:1");
+ addNodesToBucketDB(_bucket_id, "1=10/20/30");
+ cond.handle_reply(_sender, make_matched_reply(1));
+ }, [&](auto& outcome) {
+ EXPECT_FALSE(outcome.matched_condition());
+ EXPECT_FALSE(outcome.not_found());
+ EXPECT_TRUE(outcome.failed());
+ EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND);
+ });
+}
+
+}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
index 58902e2eb94..7a64eda28ff 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp
@@ -568,4 +568,18 @@ DistributorStripeTestUtil::setSystemState(const lib::ClusterState& systemState)
_stripe->enableClusterStateBundle(lib::ClusterStateBundle(systemState));
}
+void
+DistributorStripeTestUtil::config_enable_condition_probing(bool enable) {
+ auto cfg = make_config();
+ cfg->set_enable_condition_probing(enable);
+ configure_stripe(cfg);
+}
+
+void
+DistributorStripeTestUtil::tag_content_node_supports_condition_probing(uint16_t index, bool supported) {
+ NodeSupportedFeatures features;
+ features.document_condition_probe = supported;
+ set_node_supported_features(index, features);
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 4b489d41f7d..707418512f9 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -40,7 +40,7 @@ class DistributorStripeTestUtil : public DoneInitializeHandler,
public StripeHostInfoNotifier {
public:
DistributorStripeTestUtil();
- ~DistributorStripeTestUtil();
+ ~DistributorStripeTestUtil() override;
/**
* Sets up the storage link chain.
@@ -211,6 +211,18 @@ public:
void simulate_set_pending_cluster_state(const vespalib::string& state_str);
void clear_pending_cluster_state_bundle();
+ template <typename CmdType>
+ requires std::is_base_of_v<api::StorageCommand, CmdType>
+ [[nodiscard]] std::shared_ptr<CmdType> sent_command(size_t idx) {
+ assert(idx < _sender.commands().size());
+ auto cmd = std::dynamic_pointer_cast<CmdType>(_sender.command(idx));
+ assert(cmd != nullptr);
+ return cmd;
+ }
+
+ void config_enable_condition_probing(bool enable);
+ void tag_content_node_supports_condition_probing(uint16_t index, bool supported);
+
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 2a3f06b1e8c..a69cf8ec54c 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -63,7 +63,7 @@ public:
}
std::shared_ptr<api::StorageCommand> msg = _sender.command(idx);
- api::StorageReply::SP reply(msg->makeReply().release());
+ api::StorageReply::SP reply(msg->makeReply());
dynamic_cast<api::BucketInfoReply&>(*reply).setBucketInfo(info);
reply->setResult(result);
@@ -96,6 +96,30 @@ public:
}
void set_up_3_nodes_and_send_put_with_create_bucket_acks();
+
+ std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) {
+ return sent_command<api::GetCommand>(idx);
+ }
+
+ std::shared_ptr<api::PutCommand> sent_put_command(size_t idx) {
+ return sent_command<api::PutCommand>(idx);
+ }
+
+ static std::shared_ptr<api::GetReply> make_get_reply(const api::GetCommand& cmd, api::Timestamp ts,
+ bool is_tombstone, bool condition_matched)
+ {
+ return std::make_shared<api::GetReply>(cmd, std::shared_ptr<document::Document>(), ts,
+ false, is_tombstone, condition_matched);
+ }
+
+ std::shared_ptr<api::GetReply> make_failed_get_reply(size_t cmd_idx) {
+ auto reply = make_get_reply(*sent_get_command(cmd_idx), 0, false, false);
+ reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "did a bork"));
+ return reply;
+ }
+
+ void set_up_tas_put_with_2_inconsistent_replica_nodes();
+
};
PutOperationTest::~PutOperationTest() = default;
@@ -650,4 +674,84 @@ TEST_F(PutOperationTest, minority_failure_override_not_in_effect_for_non_tas_err
_sender.getLastReply());
}
+void PutOperationTest::set_up_tas_put_with_2_inconsistent_replica_nodes() {
+ setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
+ config_enable_condition_probing(true);
+ tag_content_node_supports_condition_probing(0, true);
+ tag_content_node_supports_condition_probing(1, true);
+
+ auto doc = createDummyDocument("test", "test");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "1=10/20/30,0=20/30/40");
+
+ auto put = createPut(doc);
+ put->setCondition(TestAndSetCondition("test.foo"));
+ sendPut(std::move(put));
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+}
+
+TEST_F(PutOperationTest, matching_condition_probe_sends_unconditional_puts_to_all_nodes) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 50, false, true));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0,Put => 1,Put => 0", _sender.getCommands(true)); // Note: cumulative message list
+
+ auto put_n1 = sent_put_command(2);
+ EXPECT_FALSE(put_n1->hasTestAndSetCondition());
+ auto put_n0 = sent_put_command(3);
+ EXPECT_FALSE(put_n0->hasTestAndSetCondition());
+
+ // Ensure replies are no longer routed to condition checker
+ ASSERT_TRUE(_sender.replies().empty());
+ sendReply(2); // put to node 1
+ sendReply(3); // put to node 0
+ EXPECT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, mismatching_condition_probe_fails_op_with_tas_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 50, false, false));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Condition did not match document)",
+ _sender.getLastReply());
+}
+
+// TODO change semantics for Not Found...
+TEST_F(PutOperationTest, not_found_condition_probe_fails_op_with_tas_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Document does not exist)",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->receive(_sender, make_failed_get_reply(1));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "One or more replicas failed during test-and-set condition evaluation)",
+ _sender.getLastReply());
+}
+
}
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp
index c83c5cdd245..c6e3ebb7087 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.cpp
+++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp
@@ -53,6 +53,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component)
_use_unordered_merge_chaining(false),
_inhibit_default_merges_when_global_merges_pending(false),
_enable_two_phase_garbage_collection(false),
+ _enable_condition_probing(false),
_minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED)
{
}
diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h
index a18770ac69f..e3664276518 100644
--- a/storage/src/vespa/storage/config/distributorconfiguration.h
+++ b/storage/src/vespa/storage/config/distributorconfiguration.h
@@ -287,6 +287,12 @@ public:
[[nodiscard]] bool enable_two_phase_garbage_collection() const noexcept {
return _enable_two_phase_garbage_collection;
}
+ void set_enable_condition_probing(bool enable) noexcept {
+ _enable_condition_probing = enable;
+ }
+ [[nodiscard]] bool enable_condition_probing() const noexcept {
+ return _enable_condition_probing;
+ }
uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; }
@@ -347,6 +353,7 @@ private:
bool _use_unordered_merge_chaining;
bool _inhibit_default_merges_when_global_merges_pending;
bool _enable_two_phase_garbage_collection;
+ bool _enable_condition_probing;
DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode;
diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
index 8556815b241..f6f079d8d5f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
+++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt
@@ -1,7 +1,9 @@
# Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
vespa_add_library(storage_distributoroperationexternal OBJECT
SOURCES
+ check_condition.cpp
getoperation.cpp
+ intermediate_message_sender.cpp
newest_replica.cpp
putoperation.cpp
read_for_write_visitor_operation.cpp
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
new file mode 100644
index 00000000000..7f7e6b08c1f
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -0,0 +1,216 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "check_condition.h"
+#include "getoperation.h"
+#include "intermediate_message_sender.h"
+#include <vespa/document/fieldset/fieldsets.h>
+#include <vespa/storage/distributor/distributor_bucket_space.h>
+#include <vespa/storage/distributor/distributor_node_context.h>
+#include <vespa/storage/distributor/distributor_stripe_operation_context.h>
+#include <vespa/storage/distributor/node_supported_features_repo.h>
+#include <vespa/vdslib/state/clusterstate.h>
+#include <vespa/storageapi/message/persistence.h>
+#include <cassert>
+
+#include <vespa/log/log.h>
+LOG_SETUP(".distributor.operations.external.check_condition");
+
+namespace storage::distributor {
+
+CheckCondition::CheckCondition(Outcome known_outcome,
+ const DistributorBucketSpace& bucket_space,
+ const DistributorNodeContext& node_ctx,
+ private_ctor_tag)
+ : _doc_id_bucket(),
+ _bucket_space(bucket_space),
+ _node_ctx(node_ctx),
+ _cluster_state_version_at_creation_time(_bucket_space.getClusterState().getVersion()),
+ _cond_get_op(),
+ _sent_message_map(),
+ _outcome(known_outcome)
+{
+}
+
+CheckCondition::CheckCondition(const document::Bucket& bucket,
+ const document::DocumentId& doc_id,
+ const documentapi::TestAndSetCondition& tas_condition,
+ const DistributorBucketSpace& bucket_space,
+ const DistributorNodeContext& node_ctx,
+ PersistenceOperationMetricSet& metric,
+ private_ctor_tag)
+ : _doc_id_bucket(bucket),
+ _bucket_space(bucket_space),
+ _node_ctx(node_ctx),
+ _cluster_state_version_at_creation_time(_bucket_space.getClusterState().getVersion()),
+ _cond_get_op(),
+ _sent_message_map()
+{
+ // Condition checks only return metadata back to the distributor and thus have an empty fieldset.
+ // Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID.
+ auto get_cmd = std::make_shared<api::GetCommand>(_doc_id_bucket, doc_id, document::NoFields::NAME);
+ get_cmd->set_condition(tas_condition);
+ _cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space,
+ _bucket_space.getBucketDatabase().acquire_read_guard(),
+ std::move(get_cmd),
+ metric, api::InternalReadConsistency::Strong);
+}
+
+CheckCondition::~CheckCondition() = default;
+
+void CheckCondition::start_and_send(DistributorStripeMessageSender& sender) {
+ IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
+ _cond_get_op->start(proxy_sender);
+ if (proxy_sender._reply) {
+ // Could not send any Get ops at all; bail out immediately
+ handle_internal_get_operation_reply(std::move(proxy_sender._reply));
+ }
+}
+
+void
+CheckCondition::handle_reply(DistributorStripeMessageSender& sender,
+ const std::shared_ptr<api::StorageReply>& reply)
+{
+ auto op = _sent_message_map.pop(reply->getMsgId());
+ assert(op == _cond_get_op); // We only wrap a single operation
+ IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
+ _cond_get_op->onReceive(proxy_sender, reply);
+ if (proxy_sender._reply) {
+ handle_internal_get_operation_reply(std::move(proxy_sender._reply));
+ }
+}
+
+void CheckCondition::cancel(DistributorStripeMessageSender& sender) {
+ IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender);
+ _cond_get_op->onClose(proxy_sender);
+ // We don't propagate any generated reply from the GetOperation, as its existence
+ // is an implementation detail.
+}
+
+// FIXME this is a (logic-inverted) duplicate of TwoPhaseUpdateOperation and partially of
+// GetOperation, but all can be removed entirely once we redesign how operations are aborted
+// across cluster state edges...!
+bool CheckCondition::replica_set_changed_after_get_operation() const {
+ auto entries = get_bucket_database_entries(_bucket_space, _doc_id_bucket.getBucketId());
+
+ std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now;
+ for (const auto & e : entries) {
+ for (uint32_t i = 0; i < e->getNodeCount(); i++) {
+ const auto& copy = e->getNodeRef(i);
+ replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode());
+ }
+ }
+ return (replicas_in_db_now != _cond_get_op->replicas_in_db());
+}
+
+CheckCondition::Outcome::Result
+CheckCondition::newest_replica_to_outcome(const std::optional<NewestReplica>& newest) noexcept {
+ if (!newest) {
+ // Did not find any replicas to send to; implicitly Not Found
+ return Outcome::Result::NotFound;
+ }
+ if (newest->condition_matched) {
+ return Outcome::Result::MatchedCondition;
+ } else if (newest->is_tombstone || newest->timestamp == 0) {
+ return Outcome::Result::NotFound;
+ } else {
+ return Outcome::Result::DidNotMatchCondition;
+ }
+}
+
+std::vector<BucketDatabase::Entry>
+CheckCondition::get_bucket_database_entries(const DistributorBucketSpace& bucket_space,
+ const document::BucketId& bucket_id)
+{
+ std::vector<BucketDatabase::Entry> entries;
+ bucket_space.getBucketDatabase().getParents(bucket_id, entries);
+ return entries;
+}
+
+void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::StorageReply> reply) {
+ if (reply->getResult().success()) {
+ if (_cond_get_op->any_replicas_failed()) {
+ _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED,
+ "One or more replicas failed during test-and-set condition evaluation"));
+ return;
+ }
+ const auto state_version_now = _bucket_space.getClusterState().getVersion();
+ if ((state_version_now != _cluster_state_version_at_creation_time)
+ && replica_set_changed_after_get_operation())
+ {
+ // BUCKET_NOT_FOUND is semantically (usually) inaccurate here, but it's what we use for this purpose
+ // in existing operations. Checking the replica set will implicitly check for ownership changes,
+ // as it will be empty if the distributor no longer owns the bucket.
+ // FIXME but it doesn't handle ABA-cases, so we still want to redesign operation aborting to be
+ // explicitly edge-handled...!
+ _outcome.emplace(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
+ "Bucket ownership or replica set changed between condition "
+ "read and operation write phases"));
+ } else {
+ auto maybe_newest = _cond_get_op->newest_replica();
+ _outcome.emplace(newest_replica_to_outcome(maybe_newest));
+ }
+ } else {
+ _outcome.emplace(reply->getResult());
+ }
+}
+
+bool CheckCondition::bucket_has_consistent_replicas(std::span<const BucketDatabase::Entry> entries) {
+ // Fast path iff bucket exists AND is consistent (split and copies). Same as TwoPhaseUpdateOperation.
+ // TODO consolidate logic
+ if (entries.size() != 1) {
+ return false;
+ }
+ return entries[0]->validAndConsistent();
+}
+bool
+CheckCondition::all_nodes_support_document_condition_probe(std::span<const BucketDatabase::Entry> entries,
+ const DistributorStripeOperationContext& op_ctx)
+{
+ // TODO move node set feature checking to repo itself
+ const auto& features_repo = op_ctx.node_supported_features_repo();
+ for (const auto& entry : entries) {
+ for (uint32_t i = 0; i < entry->getNodeCount(); ++i) {
+ if (!features_repo.node_supported_features(entry->getNodeRef(i).getNode()).document_condition_probe) {
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+std::shared_ptr<CheckCondition>
+CheckCondition::create_not_found(const DistributorBucketSpace& bucket_space,
+ const DistributorNodeContext& node_ctx)
+{
+ return std::make_shared<CheckCondition>(Outcome(Outcome::Result::NotFound),
+ bucket_space, node_ctx, private_ctor_tag{});
+}
+
+std::shared_ptr<CheckCondition>
+CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
+ const DistributorBucketSpace& bucket_space,
+ const document::DocumentId& doc_id,
+ const documentapi::TestAndSetCondition& tas_condition,
+ const DistributorNodeContext& node_ctx,
+ const DistributorStripeOperationContext& op_ctx,
+ PersistenceOperationMetricSet& metric)
+{
+ // TODO move this check to the caller?
+ if (!op_ctx.distributor_config().enable_condition_probing()) {
+ return {};
+ }
+ auto entries = get_bucket_database_entries(bucket_space, bucket.getBucketId());
+ if (entries.empty()) {
+ return {}; // Not found
+ }
+ if (bucket_has_consistent_replicas(entries)) {
+ return {}; // Replicas are consistent; no need for write-repair
+ }
+ if (!all_nodes_support_document_condition_probe(entries, op_ctx)) {
+ return {}; // Want write-repair, but one or more nodes are too old to use the feature
+ }
+ return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space,
+ node_ctx, metric, private_ctor_tag{});
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
new file mode 100644
index 00000000000..ea3bfff2e3e
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -0,0 +1,160 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include "newest_replica.h"
+#include <vespa/document/bucket/bucket.h>
+#include <vespa/storage/bucketdb/bucketdatabase.h>
+#include <vespa/storage/distributor/sentmessagemap.h>
+#include <vespa/storageapi/messageapi/returncode.h>
+#include <memory>
+#include <optional>
+#include <span>
+#include <vector>
+
+namespace document { class DocumentId; }
+namespace documentapi { class TestAndSetCondition; }
+namespace storage::api { class StorageReply; }
+
+namespace storage::distributor {
+
+class DistributorBucketSpace;
+class DistributorNodeContext;
+class DistributorStripeMessageSender;
+class DistributorStripeOperationContext;
+class GetOperation;
+class PersistenceOperationMetricSet;
+
+/**
+ * Encapsulates all logic required to evaluate a test-and-set condition for a
+ * particular document ID across a set of bucket replicas.
+ *
+ * Usage and lifecycle:
+ *
+ * 1. Invoke start_and_send() once.
+ * 2. Check if maybe_outcome() indicates that a result is ready. If so, the
+ * instance can be safely discarded after the outcome has been handled.
+ * 3. Wait for responses to messages sent.
+ * 4. When a GetReply is received, invoke handle_reply() with it. Note that
+ * this may transparently send new requests in case of transient failure
+ * responses.
+ * 5. Check if maybe_outcome() is ready, c.f. #2.
+ * 6. Go to 3.
+ *
+ * Although this class appears very similar to an Operation in that it sends
+ * requests and accepts responses, it is not an actual Operation itself.
+ * It is instead intended to be directly used _by_ other phased operations that
+ * require distributed condition checking as part of their write repair logic.
+ * One major distinction between a CheckCondition and an Operation is that this
+ * class does _not_ generate a Reply to communicate its result. Instead, the
+ * caller must check maybe_outcome() after every operation on the instance to
+ * poll if the condition evaluation is done (or has failed).
+ */
+class CheckCondition {
+public:
+ class Outcome {
+ public:
+ enum class Result {
+ HasError,
+ MatchedCondition,
+ DidNotMatchCondition,
+ NotFound,
+ };
+
+ explicit Outcome(api::ReturnCode error_code) noexcept
+ : _error_code(std::move(error_code)),
+ _result(Result::HasError)
+ {
+ }
+
+ explicit Outcome(Result result) noexcept
+ : _error_code(),
+ _result(result)
+ {
+ }
+
+ [[nodiscard]] bool failed() const noexcept {
+ return _error_code.failed();
+ }
+
+ const api::ReturnCode& error_code() const noexcept {
+ return _error_code;
+ }
+
+ [[nodiscard]] bool matched_condition() const noexcept {
+ return _result == Result::MatchedCondition;
+ }
+
+ [[nodiscard]] bool not_found() const noexcept {
+ return _result == Result::NotFound;
+ }
+
+ private:
+ api::ReturnCode _error_code;
+ Result _result;
+ };
+private:
+ const document::Bucket _doc_id_bucket;
+ const DistributorBucketSpace& _bucket_space;
+ const DistributorNodeContext& _node_ctx;
+ const uint32_t _cluster_state_version_at_creation_time; // TODO encapsulate this better
+ std::shared_ptr<GetOperation> _cond_get_op;
+ SentMessageMap _sent_message_map;
+ std::optional<Outcome> _outcome;
+
+ struct private_ctor_tag {};
+public:
+ CheckCondition(Outcome known_outcome,
+ const DistributorBucketSpace& bucket_space,
+ const DistributorNodeContext& node_ctx,
+ private_ctor_tag);
+ CheckCondition(const document::Bucket& bucket,
+ const document::DocumentId& doc_id,
+ const documentapi::TestAndSetCondition& tas_condition,
+ const DistributorBucketSpace& bucket_space,
+ const DistributorNodeContext& node_ctx,
+ PersistenceOperationMetricSet& metric,
+ private_ctor_tag);
+ ~CheckCondition();
+
+ void start_and_send(DistributorStripeMessageSender& sender);
+ void handle_reply(DistributorStripeMessageSender& sender,
+ const std::shared_ptr<api::StorageReply>& reply);
+ void cancel(DistributorStripeMessageSender& sender);
+
+ [[nodiscard]] const std::optional<Outcome>& maybe_outcome() const noexcept {
+ return _outcome;
+ }
+
+ [[nodiscard]] static std::shared_ptr<CheckCondition> create_if_inconsistent_replicas(
+ const document::Bucket& bucket,
+ const DistributorBucketSpace& bucket_space,
+ const document::DocumentId& doc_id,
+ const documentapi::TestAndSetCondition& tas_condition,
+ const DistributorNodeContext& node_ctx,
+ const DistributorStripeOperationContext& op_ctx,
+ PersistenceOperationMetricSet& metric);
+private:
+ [[nodiscard]] bool replica_set_changed_after_get_operation() const;
+
+ void handle_internal_get_operation_reply(std::shared_ptr<api::StorageReply> reply);
+
+ [[nodiscard]] static Outcome::Result newest_replica_to_outcome(
+ const std::optional<NewestReplica>& newest) noexcept;
+
+ [[nodiscard]] static bool bucket_has_consistent_replicas(
+ std::span<const BucketDatabase::Entry> entries);
+
+ [[nodiscard]] static bool all_nodes_support_document_condition_probe(
+ std::span<const BucketDatabase::Entry> entries,
+ const DistributorStripeOperationContext& op_ctx);
+
+ [[nodiscard]] static std::vector<BucketDatabase::Entry> get_bucket_database_entries(
+ const DistributorBucketSpace& bucket_space,
+ const document::BucketId& bucket_id);
+
+ [[nodiscard]] static std::shared_ptr<CheckCondition> create_not_found(
+ const DistributorBucketSpace& bucket_space,
+ const DistributorNodeContext& node_ctx);
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 44148bdbf25..72943eaf713 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -12,7 +12,7 @@
#include <cassert>
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.callback.doc.get");
+LOG_SETUP(".distributor.operations.external.get");
using document::BucketSpace;
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 96b5d970904..9d6b9f1986d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -24,8 +24,8 @@ class GetOperation : public Operation
{
public:
GetOperation(const DistributorNodeContext& node_ctx,
- const DistributorBucketSpace &bucketSpace,
- const std::shared_ptr<BucketDatabase::ReadGuard> & read_guard,
+ const DistributorBucketSpace& bucketSpace,
+ const std::shared_ptr<BucketDatabase::ReadGuard>& read_guard,
std::shared_ptr<api::GetCommand> msg,
PersistenceOperationMetricSet& metric,
api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong);
diff --git a/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp
new file mode 100644
index 00000000000..a7a008e2e09
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp
@@ -0,0 +1,52 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "intermediate_message_sender.h"
+#include <vespa/storageapi/messageapi/storagecommand.h>
+#include <vespa/storageapi/messageapi/storagereply.h>
+
+namespace storage::distributor {
+
+IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
+ std::shared_ptr<Operation> cb,
+ DistributorStripeMessageSender& fwd) noexcept
+ : msgMap(mm),
+ callback(std::move(cb)),
+ forward(fwd)
+{
+}
+
+IntermediateMessageSender::~IntermediateMessageSender() = default;
+
+void IntermediateMessageSender::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) {
+ msgMap.insert(cmd->getMsgId(), callback);
+ forward.sendCommand(cmd);
+};
+
+void IntermediateMessageSender::sendReply(const std::shared_ptr<api::StorageReply>& reply) {
+ _reply = reply;
+}
+
+int IntermediateMessageSender::getDistributorIndex() const {
+ return forward.getDistributorIndex();
+}
+
+const ClusterContext& IntermediateMessageSender::cluster_context() const {
+ return forward.cluster_context();
+}
+
+PendingMessageTracker& IntermediateMessageSender::getPendingMessageTracker() {
+ return forward.getPendingMessageTracker();
+}
+
+const PendingMessageTracker& IntermediateMessageSender::getPendingMessageTracker() const {
+ return forward.getPendingMessageTracker();
+}
+
+const OperationSequencer& IntermediateMessageSender::operation_sequencer() const noexcept {
+ return forward.operation_sequencer();
+}
+
+OperationSequencer& IntermediateMessageSender::operation_sequencer() noexcept {
+ return forward.operation_sequencer();
+}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h
new file mode 100644
index 00000000000..6d7ac5b1860
--- /dev/null
+++ b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h
@@ -0,0 +1,33 @@
+// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#pragma once
+
+#include <vespa/storage/distributor/distributormessagesender.h>
+#include <vespa/storage/distributor/sentmessagemap.h>
+#include <memory>
+
+namespace storage::api { class StorageReply; }
+
+namespace storage::distributor {
+
+struct IntermediateMessageSender final : DistributorStripeMessageSender {
+ SentMessageMap& msgMap;
+ std::shared_ptr<Operation> callback;
+ DistributorStripeMessageSender& forward;
+ std::shared_ptr<api::StorageReply> _reply;
+
+ IntermediateMessageSender(SentMessageMap& mm,
+ std::shared_ptr<Operation> cb,
+ DistributorStripeMessageSender& fwd) noexcept;
+ ~IntermediateMessageSender() override;
+
+ void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override;
+ void sendReply(const std::shared_ptr<api::StorageReply>& reply) override;
+ int getDistributorIndex() const override;
+ const ClusterContext& cluster_context() const override;
+ PendingMessageTracker& getPendingMessageTracker() override;
+ const PendingMessageTracker& getPendingMessageTracker() const override;
+ const OperationSequencer& operation_sequencer() const noexcept override;
+ OperationSequencer& operation_sequencer() noexcept override;
+};
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index aa5ad217ae9..86b92336fb8 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -1,7 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
+#include "check_condition.h"
#include "putoperation.h"
-
#include <vespa/document/fieldvalue/document.h>
#include <vespa/storage/distributor/activecopy.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
@@ -23,15 +23,19 @@ namespace storage::distributor {
PutOperation::PutOperation(const DistributorNodeContext& node_ctx,
DistributorStripeOperationContext& op_ctx,
- DistributorBucketSpace& bucketSpace,
+ DistributorBucketSpace& bucket_space,
std::shared_ptr<api::PutCommand> msg,
- PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle)
- : SequencedOperation(std::move(sequencingHandle)),
- _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()),
- _tracker(_trackerInstance),
+ PersistenceOperationMetricSet& metric,
+ SequencingHandle sequencing_handle)
+ : SequencedOperation(std::move(sequencing_handle)),
+ _tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()),
+ _tracker(_tracker_instance),
_msg(std::move(msg)),
+ _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())),
+ _node_ctx(node_ctx),
_op_ctx(op_ctx),
- _bucketSpace(bucketSpace)
+ _temp_metric(metric), // TODO
+ _bucket_space(bucket_space)
{
}
@@ -61,10 +65,10 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi
if (setOneActive) {
assert(!multipleBuckets);
(void) multipleBuckets;
- BucketDatabase::Entry entry(_bucketSpace.getBucketDatabase().get(lastBucket));
+ BucketDatabase::Entry entry(_bucket_space.getBucketDatabase().get(lastBucket));
std::vector<uint16_t> idealState(
- _bucketSpace.get_ideal_service_layer_nodes_bundle(lastBucket).get_available_nodes());
- active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry,
+ _bucket_space.get_ideal_service_layer_nodes_bundle(lastBucket).get_available_nodes());
+ active = ActiveCopy::calculate(idealState, _bucket_space.getDistribution(), entry,
_op_ctx.distributor_config().max_activation_inhibited_out_of_sync_groups());
LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str());
for (uint32_t i=0; i<active.size(); ++i) {
@@ -72,7 +76,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi
copy.setActive(true);
entry->updateNode(copy);
}
- _bucketSpace.getBucketDatabase().update(entry);
+ _bucket_space.getBucketDatabase().update(entry);
}
for (uint32_t i=0, n=copies.size(); i<n; ++i) {
if (!copies[i].isNewCopy()) continue;
@@ -91,7 +95,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi
void
PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
- const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch)
+ uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch)
{
document::Bucket bucket(bucketSpace, bucketId);
auto command = std::make_shared<api::PutCommand>(bucket, _msg->getDocument(), _msg->getTimestamp());
@@ -105,6 +109,7 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc
}
bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const {
+ // TODO handle this explicitly as part of operation abort/cancel edge
auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace());
if (!pending_state) {
return false;
@@ -116,7 +121,7 @@ bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTarge
}
bool PutOperation::at_least_one_storage_node_is_available() const {
- const lib::ClusterState& cluster_state = _bucketSpace.getClusterState();
+ const lib::ClusterState& cluster_state = _bucket_space.getClusterState();
const uint16_t storage_node_index_ubound = cluster_state.getNodeCount(lib::NodeType::STORAGE);
for (uint16_t i = 0; i < storage_node_index_ubound; i++) {
@@ -129,29 +134,59 @@ bool PutOperation::at_least_one_storage_node_is_available() const {
return false;
}
+bool PutOperation::has_condition() const noexcept {
+ return _msg->hasTestAndSetCondition();
+}
+
void
PutOperation::onStart(DistributorStripeMessageSender& sender)
{
- document::BucketIdFactory bucketIdFactory;
- document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId());
+ LOG(debug, "Received Put %s for bucket %s",
+ _msg->getDocumentId().toString().c_str(), _doc_id_bucket_id.toString().c_str());
+
+ if (!has_condition()) {
+ start_direct_put_dispatch(sender);
+ } else {
+ start_conditional_put(sender);
+ }
+}
+
+void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender) {
+ document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id);
+ _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(),
+ _msg->getCondition(), _node_ctx, _op_ctx,
+ _temp_metric);
+ if (!_check_condition) {
+ start_direct_put_dispatch(sender);
+ } else {
+ // Inconsistent or non-existent replicas; might need write repair
+ _check_condition->start_and_send(sender);
+ const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately ("no replicas"-case)
+ if (outcome) {
+ on_completed_check_condition(*outcome, sender);
+ }
+ }
+}
- LOG(debug, "Received PUT %s for bucket %s", _msg->getDocumentId().toString().c_str(), bid.toString().c_str());
+void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sender) {
+ LOG(debug, "Starting fast path Put %s for bucket %s",
+ _msg->getDocumentId().toString().c_str(), _doc_id_bucket_id.toString().c_str());
if (at_least_one_storage_node_is_available()) {
std::vector<document::BucketId> bucketsToCheckForSplit;
- OperationTargetResolverImpl targetResolver(_bucketSpace, _bucketSpace.getBucketDatabase(),
- _op_ctx.distributor_config().getMinimalBucketSplit(),
- _bucketSpace.getDistribution().getRedundancy(),
- _msg->getBucket().getBucketSpace());
- OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid));
+ OperationTargetResolverImpl targetResolver(_bucket_space, _bucket_space.getBucketDatabase(),
+ _op_ctx.distributor_config().getMinimalBucketSplit(),
+ _bucket_space.getDistribution().getRedundancy(),
+ _msg->getBucket().getBucketSpace());
+ OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id));
for (const auto& target : targets) {
if (_op_ctx.has_pending_message(target.getNode().getIndex(), target.getBucket(),
api::MessageType::DELETEBUCKET_ID))
{
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED,
- "Bucket was being deleted while we got a PUT, failing operation to be safe"));
+ "Bucket was being deleted while we got a PUT, failing operation to be safe"));
return;
}
}
@@ -165,7 +200,7 @@ PutOperation::onStart(DistributorStripeMessageSender& sender)
// Mark any entries we're not feeding to as not trusted.
std::vector<BucketDatabase::Entry> entries;
- _bucketSpace.getBucketDatabase().getParents(bid, entries);
+ _bucket_space.getBucketDatabase().getParents(_doc_id_bucket_id, entries);
std::vector<PersistenceMessageTracker::ToSend> createBucketBatch;
if (targets.hasAnyNewCopies()) {
@@ -214,7 +249,7 @@ PutOperation::onStart(DistributorStripeMessageSender& sender)
bool
PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const
{
- const auto& config(_op_ctx.distributor_config());
+ const auto& config = _op_ctx.distributor_config();
if (config.isBucketActivationDisabled()) {
return false;
}
@@ -225,15 +260,48 @@ void
PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg)
{
LOG(debug, "Received %s", msg->toString(true).c_str());
- _tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg));
+ if (!_check_condition) {
+ _tracker.receiveReply(sender, dynamic_cast<api::BucketInfoReply&>(*msg));
+ } else {
+ _check_condition->handle_reply(sender, msg);
+ const auto& outcome = _check_condition->maybe_outcome();
+ if (!outcome) {
+ return; // Condition check not done yet
+ }
+ on_completed_check_condition(*outcome, sender);
+ }
+}
+
+void
+PutOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome,
+ DistributorStripeMessageSender& sender)
+{
+ if (outcome.matched_condition()) {
+ _msg->clear_condition(); // Transform to unconditional Put
+ start_direct_put_dispatch(sender);
+ } else if (outcome.not_found()) {
+ // TODO create:true combined with not found
+ // TODO "not found" not a TaS error...
+ _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
+ "Document does not exist"));
+ } else if (outcome.failed()) {
+ api::ReturnCode wrapped_error(outcome.error_code().getResult(),
+ "Failed during write repair condition probe step. Reason: " + outcome.error_code().getMessage());
+ _tracker.fail(sender, wrapped_error);
+ } else {
+ _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED,
+ "Condition did not match document"));
+ }
+ _check_condition.reset();
}
void
PutOperation::onClose(DistributorStripeMessageSender& sender)
{
- const char* error = "Process is shutting down";
- LOG(debug, "%s", error);
- _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, error));
+ if (_check_condition) {
+ _check_condition->cancel(sender);
+ }
+ _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down"));
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index 283395f1406..d80e9cc00d7 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -2,6 +2,7 @@
#pragma once
+#include "check_condition.h"
#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/storage/distributor/persistencemessagetracker.h>
@@ -37,24 +38,32 @@ public:
void onClose(DistributorStripeMessageSender& sender) override;
private:
- PersistenceMessageTrackerImpl _trackerInstance;
- PersistenceMessageTracker& _tracker;
-
+ PersistenceMessageTrackerImpl _tracker_instance;
+ PersistenceMessageTracker& _tracker;
+ std::shared_ptr<api::PutCommand> _msg;
+ document::BucketId _doc_id_bucket_id;
+ const DistributorNodeContext& _node_ctx;
+ DistributorStripeOperationContext& _op_ctx;
+ PersistenceOperationMetricSet& _temp_metric;
+ DistributorBucketSpace& _bucket_space;
+ std::shared_ptr<CheckCondition> _check_condition;
+
+ void start_direct_put_dispatch(DistributorStripeMessageSender& sender);
+ void start_conditional_put(DistributorStripeMessageSender& sender);
+ void on_completed_check_condition(const CheckCondition::Outcome& outcome,
+ DistributorStripeMessageSender& sender);
void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
const api::StorageCommand& originalCommand,
std::vector<MessageTracker::ToSend>& messagesToSend);
void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId,
- const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch);
+ uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch);
[[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const;
[[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const;
[[nodiscard]] bool at_least_one_storage_node_is_available() const;
-
- std::shared_ptr<api::PutCommand> _msg;
- DistributorStripeOperationContext& _op_ctx;
- DistributorBucketSpace &_bucketSpace;
+ [[nodiscard]] bool has_condition() const noexcept;
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
index 5fd42021a63..dd6e1e93791 100644
--- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp
@@ -8,15 +8,13 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/vdslib/state/clusterstate.h>
-
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.callback.doc.removelocation");
-
+LOG_SETUP(".distributor.operations.external.remove_location");
-using namespace storage::distributor;
-using namespace storage;
using document::BucketSpace;
+namespace storage::distributor {
+
RemoveLocationOperation::RemoveLocationOperation(
const DistributorNodeContext& node_ctx,
DistributorStripeOperationContext& op_ctx,
@@ -120,3 +118,5 @@ RemoveLocationOperation::onClose(DistributorStripeMessageSender& sender)
_tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED,
"Process is shutting down"));
}
+
+}
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp
index 6b2550ab547..0b250a36b27 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp
@@ -4,8 +4,7 @@
#include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h>
#include <sstream>
-namespace storage {
-namespace distributor {
+namespace storage::distributor {
StatBucketListOperation::StatBucketListOperation(
const BucketDatabase& bucketDb,
@@ -61,4 +60,3 @@ StatBucketListOperation::onStart(DistributorStripeMessageSender& sender)
}
}
-}
diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
index 85f36e7a963..a0b3f12f76b 100644
--- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp
@@ -6,7 +6,7 @@
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.callback.statbucket");
+LOG_SETUP(".distributor.operations.external.stat_bucket");
namespace storage::distributor {
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 515b72520ec..0cb4b223c11 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -1,6 +1,7 @@
// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
#include "getoperation.h"
+#include "intermediate_message_sender.h"
#include "putoperation.h"
#include "twophaseupdateoperation.h"
#include "updateoperation.h"
@@ -15,7 +16,7 @@
#include <cinttypes>
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.callback.twophaseupdate");
+LOG_SETUP(".distributor.operations.external.two_phase_update");
using namespace std::literals::string_literals;
using document::BucketSpace;
@@ -56,63 +57,6 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() = default;
-namespace {
-
-struct IntermediateMessageSender : DistributorStripeMessageSender {
- SentMessageMap& msgMap;
- std::shared_ptr<Operation> callback;
- DistributorStripeMessageSender& forward;
- std::shared_ptr<api::StorageReply> _reply;
-
- IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorStripeMessageSender & fwd);
- ~IntermediateMessageSender() override;
-
- void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override {
- msgMap.insert(cmd->getMsgId(), callback);
- forward.sendCommand(cmd);
- };
-
- void sendReply(const std::shared_ptr<api::StorageReply>& reply) override {
- _reply = reply;
- }
-
- int getDistributorIndex() const override {
- return forward.getDistributorIndex();
- }
-
- const ClusterContext & cluster_context() const override {
- return forward.cluster_context();
- }
-
- PendingMessageTracker& getPendingMessageTracker() override {
- return forward.getPendingMessageTracker();
- }
-
- const PendingMessageTracker& getPendingMessageTracker() const override {
- return forward.getPendingMessageTracker();
- }
-
- const OperationSequencer& operation_sequencer() const noexcept override {
- return forward.operation_sequencer();
- }
-
- OperationSequencer& operation_sequencer() noexcept override {
- return forward.operation_sequencer();
- }
-};
-
-IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm,
- std::shared_ptr<Operation> cb,
- DistributorStripeMessageSender & fwd)
- : msgMap(mm),
- callback(std::move(cb)),
- forward(fwd)
-{ }
-
-IntermediateMessageSender::~IntermediateMessageSender() = default;
-
-}
-
const char*
TwoPhaseUpdateOperation::stateToString(SendState state) noexcept
{
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 7c1b6b3eed6..486ed766510 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -135,26 +135,28 @@ private:
// Precondition: reply has not yet been sent.
vespalib::string update_doc_id() const;
- UpdateMetricSet& _updateMetric;
- PersistenceOperationMetricSet& _putMetric;
- PersistenceOperationMetricSet& _getMetric;
- PersistenceOperationMetricSet& _metadata_get_metrics;
+ using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>;
+
+ UpdateMetricSet& _updateMetric;
+ PersistenceOperationMetricSet& _putMetric;
+ PersistenceOperationMetricSet& _getMetric;
+ PersistenceOperationMetricSet& _metadata_get_metrics;
std::shared_ptr<api::UpdateCommand> _updateCmd;
- std::shared_ptr<api::UpdateReply> _updateReply;
- const DistributorNodeContext& _node_ctx;
- DistributorStripeOperationContext& _op_ctx;
- const DocumentSelectionParser& _parser;
- DistributorBucketSpace &_bucketSpace;
- SentMessageMap _sentMessageMap;
- SendState _sendState;
- Mode _mode;
- mbus::Trace _trace;
- document::BucketId _updateDocBucketId;
- std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
+ std::shared_ptr<api::UpdateReply> _updateReply;
+ const DistributorNodeContext& _node_ctx;
+ DistributorStripeOperationContext& _op_ctx;
+ const DocumentSelectionParser& _parser;
+ DistributorBucketSpace& _bucketSpace;
+ SentMessageMap _sentMessageMap;
+ SendState _sendState;
+ Mode _mode;
+ mbus::Trace _trace;
+ document::BucketId _updateDocBucketId;
+ ReplicaState _replicas_at_get_send_time;
std::optional<framework::MilliSecTimer> _single_get_latency_timer;
- uint16_t _fast_path_repair_source_node;
- bool _use_initial_cheap_metadata_fetch_phase;
- bool _replySent;
+ uint16_t _fast_path_repair_source_node;
+ bool _use_initial_cheap_metadata_fetch_phase;
+ bool _replySent;
};
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
index f3ce71b08d6..8988f2589ce 100644
--- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp
@@ -12,7 +12,7 @@
#include <vespa/log/log.h>
-LOG_SETUP(".distributor.callback.doc.update");
+LOG_SETUP(".distributor.operations.external.update");
using document::BucketSpace;
diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
index ca47ab7478c..d34b9da0013 100644
--- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp
@@ -15,7 +15,7 @@
#include <optional>
#include <vespa/log/log.h>
-LOG_SETUP(".visitoroperation");
+LOG_SETUP(".distributor.operations.external.visitor");
namespace storage::distributor {
diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h
index 1ed7a4cd8c2..70bee311f78 100644
--- a/storage/src/vespa/storage/distributor/sentmessagemap.h
+++ b/storage/src/vespa/storage/distributor/sentmessagemap.h
@@ -8,8 +8,7 @@ namespace storage::distributor {
class Operation;
-class SentMessageMap
-{
+class SentMessageMap {
public:
SentMessageMap();
~SentMessageMap();
diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h
index e6f2023fa48..0607f3792f3 100644
--- a/storage/src/vespa/storageapi/message/persistence.h
+++ b/storage/src/vespa/storageapi/message/persistence.h
@@ -27,6 +27,7 @@ public:
~TestAndSetCommand() override;
void setCondition(const TestAndSetCondition & condition) { _condition = condition; }
+ void clear_condition() { _condition = TestAndSetCondition(); }
const TestAndSetCondition & getCondition() const { return _condition; }
bool hasTestAndSetCondition() const noexcept override { return _condition.isPresent(); }