diff options
25 files changed, 990 insertions, 130 deletions
diff --git a/storage/src/tests/distributor/CMakeLists.txt b/storage/src/tests/distributor/CMakeLists.txt index 11bbf2c241a..88ffa4e764a 100644 --- a/storage/src/tests/distributor/CMakeLists.txt +++ b/storage/src/tests/distributor/CMakeLists.txt @@ -9,6 +9,7 @@ vespa_add_executable(storage_distributor_gtest_runner_app TEST bucketdbmetricupdatertest.cpp bucketgctimecalculatortest.cpp bucketstateoperationtest.cpp + check_condition_test.cpp distributor_bucket_space_repo_test.cpp distributor_bucket_space_test.cpp distributor_host_info_reporter_test.cpp diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp new file mode 100644 index 00000000000..d4b7d7019d7 --- /dev/null +++ b/storage/src/tests/distributor/check_condition_test.cpp @@ -0,0 +1,235 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include <vespa/document/bucket/bucket.h> +#include <vespa/document/bucket/fixed_bucket_spaces.h> +#include <vespa/document/base/documentid.h> +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/documentapi/messagebus/messages/testandsetcondition.h> +#include <vespa/storage/distributor/node_supported_features.h> +#include <vespa/storage/distributor/operations/external/check_condition.h> +#include <vespa/storage/distributor/persistence_operation_metric_set.h> +#include <vespa/storageapi/message/persistence.h> +#include <tests/distributor/distributor_stripe_test_util.h> +#include <vespa/vespalib/gtest/gtest.h> + +using namespace ::testing; + +namespace storage::distributor { + +using namespace document; +using documentapi::TestAndSetCondition; + +class CheckConditionTest + : public Test, + public DistributorStripeTestUtil +{ +public: + DocumentId _doc_id{"id:foo:testdoctype1:n=1234:bar"}; + BucketId _bucket_id{16, 1234}; + TestAndSetCondition _tas_cond{"foo or bar"}; + PersistenceOperationMetricSet _metrics{"dummy_metrics", nullptr}; + + CheckConditionTest(); + ~CheckConditionTest() override; + + void SetUp() override { + createLinks(); + // By default, set up 2 nodes {0, 1} with mutually out of sync replica state + // and with both reporting that they support condition probing. + setup_stripe(2, 2, "version:1 storage:2 distributor:1"); + config_enable_condition_probing(true); + tag_content_node_supports_condition_probing(0, true); + tag_content_node_supports_condition_probing(1, true); + addNodesToBucketDB(_bucket_id, "0=10/20/30/t,1=40/50/60"); + }; + + void TearDown() override { + close(); + } + + std::shared_ptr<CheckCondition> create_check_condition() { + auto& bucket_space = getDistributorBucketSpace(); + auto doc_bucket = BucketIdFactory{}.getBucketId(_doc_id); + auto bucket = Bucket(FixedBucketSpaces::default_space(), _bucket_id); + assert(_bucket_id.contains(doc_bucket)); + return CheckCondition::create_if_inconsistent_replicas(bucket, bucket_space, _doc_id, _tas_cond, + node_context(), operation_context(), _metrics); + } + + std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) { + return sent_command<api::GetCommand>(idx); + } + + std::shared_ptr<api::PutCommand> sent_put_command(size_t idx) { + return sent_command<api::PutCommand>(idx); + } + + static std::shared_ptr<api::GetReply> make_reply(const api::GetCommand& cmd, api::Timestamp ts, + bool is_tombstone, bool condition_matched) + { + return std::make_shared<api::GetReply>(cmd, std::shared_ptr<document::Document>(), ts, + false, is_tombstone, condition_matched); + } + + std::shared_ptr<api::GetReply> make_matched_reply(size_t cmd_idx, api::Timestamp ts = 1000) { + return make_reply(*sent_get_command(cmd_idx), ts, false, true); + } + + std::shared_ptr<api::GetReply> make_mismatched_reply(size_t cmd_idx, api::Timestamp ts = 1000) { + return make_reply(*sent_get_command(cmd_idx), ts, false, false); + } + + std::shared_ptr<api::GetReply> make_not_found_non_tombstone_reply(size_t cmd_idx) { + return make_reply(*sent_get_command(cmd_idx), 0, false, false); + } + + std::shared_ptr<api::GetReply> make_tombstone_reply(size_t cmd_idx, api::Timestamp ts = 1000) { + return make_reply(*sent_get_command(cmd_idx), ts, true, false); + } + + std::shared_ptr<api::GetReply> make_failed_reply(size_t cmd_idx) { + auto reply = make_reply(*sent_get_command(cmd_idx), 0, false, false); + reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "did a bork")); + return reply; + } + + void test_cond_with_2_gets_sent(const std::function<void(CheckCondition&)>& reply_invoker, + const std::function<void(const CheckCondition::Outcome&)>& outcome_checker) + { + auto cond = create_check_condition(); + ASSERT_TRUE(cond); + cond->start_and_send(_sender); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_invoker(*cond); + auto& outcome = cond->maybe_outcome(); + ASSERT_TRUE(outcome); + outcome_checker(*outcome); + } +}; + +CheckConditionTest::CheckConditionTest() = default; +CheckConditionTest::~CheckConditionTest() = default; + +TEST_F(CheckConditionTest, no_checker_returned_when_config_disabled) { + config_enable_condition_probing(false); + auto cond = create_check_condition(); + EXPECT_FALSE(cond); +} + +TEST_F(CheckConditionTest, no_checker_returned_when_probing_not_supported_on_at_least_one_node) { + tag_content_node_supports_condition_probing(1, false); + auto cond = create_check_condition(); + EXPECT_FALSE(cond); +} + +TEST_F(CheckConditionTest, no_checker_returned_when_bucket_replicas_are_consistent) { + addNodesToBucketDB(_bucket_id, "0=10/20/30/t,1=10/20/30"); + auto cond = create_check_condition(); + EXPECT_FALSE(cond); +} + +TEST_F(CheckConditionTest, no_checker_returned_when_empty_replica_set) { + removeFromBucketDB(_bucket_id); + auto cond = create_check_condition(); + EXPECT_FALSE(cond); +} + +TEST_F(CheckConditionTest, starting_sends_condition_probe_gets) { + auto cond = create_check_condition(); + ASSERT_TRUE(cond); + EXPECT_FALSE(cond->maybe_outcome()); + // Nothing should be sent prior to start_and_send() + ASSERT_EQ("", _sender.getCommands(true)); + // We don't test too much of the Get functionality, as that's already covered by GetOperation tests. + // But we test the main binding glue between the two components. + cond->start_and_send(_sender); + EXPECT_FALSE(cond->maybe_outcome()); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + auto cmd = sent_get_command(0); + EXPECT_EQ(cmd->getDocumentId(), _doc_id); + EXPECT_EQ(cmd->condition(), _tas_cond); + EXPECT_EQ(cmd->getFieldSet(), NoFields::NAME); + EXPECT_EQ(cmd->internal_read_consistency(), api::InternalReadConsistency::Strong); +} + +TEST_F(CheckConditionTest, condition_matching_completes_check_with_match_outcome) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_TRUE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_FALSE(outcome.failed()); + }); +} + +TEST_F(CheckConditionTest, newest_document_version_is_authoritative_for_condition_match) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0, 1001)); + cond.handle_reply(_sender, make_mismatched_reply(1, 1000)); + }, [&](auto& outcome) { + EXPECT_TRUE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_FALSE(outcome.failed()); + }); +} + +TEST_F(CheckConditionTest, condition_mismatching_completes_check_with_mismatch_outcome) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0, 1000)); + cond.handle_reply(_sender, make_mismatched_reply(1, 1001)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_FALSE(outcome.failed()); + }); +} + +TEST_F(CheckConditionTest, not_found_non_tombstone_completes_check_with_not_found_outcome) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_not_found_non_tombstone_reply(0)); + cond.handle_reply(_sender, make_not_found_non_tombstone_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_TRUE(outcome.not_found()); + EXPECT_FALSE(outcome.failed()); + }); +} + +TEST_F(CheckConditionTest, not_found_with_tombstone_completes_check_with_not_found_outcome) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0, 1000)); + cond.handle_reply(_sender, make_tombstone_reply(1, 1001)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_TRUE(outcome.not_found()); + EXPECT_FALSE(outcome.failed()); + }); +} + +TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + cond.handle_reply(_sender, make_failed_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + }); +} + +TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + // Simulate node 0 going down, with new cluster state version push and implicit DB removal + enable_cluster_state("version:2 storage:1 distributor:1"); + addNodesToBucketDB(_bucket_id, "1=10/20/30"); + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::BUCKET_NOT_FOUND); + }); +} + +} diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.cpp b/storage/src/tests/distributor/distributor_stripe_test_util.cpp index 58902e2eb94..7a64eda28ff 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.cpp +++ b/storage/src/tests/distributor/distributor_stripe_test_util.cpp @@ -568,4 +568,18 @@ DistributorStripeTestUtil::setSystemState(const lib::ClusterState& systemState) _stripe->enableClusterStateBundle(lib::ClusterStateBundle(systemState)); } +void +DistributorStripeTestUtil::config_enable_condition_probing(bool enable) { + auto cfg = make_config(); + cfg->set_enable_condition_probing(enable); + configure_stripe(cfg); +} + +void +DistributorStripeTestUtil::tag_content_node_supports_condition_probing(uint16_t index, bool supported) { + NodeSupportedFeatures features; + features.document_condition_probe = supported; + set_node_supported_features(index, features); +} + } diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index 4b489d41f7d..707418512f9 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -40,7 +40,7 @@ class DistributorStripeTestUtil : public DoneInitializeHandler, public StripeHostInfoNotifier { public: DistributorStripeTestUtil(); - ~DistributorStripeTestUtil(); + ~DistributorStripeTestUtil() override; /** * Sets up the storage link chain. @@ -211,6 +211,18 @@ public: void simulate_set_pending_cluster_state(const vespalib::string& state_str); void clear_pending_cluster_state_bundle(); + template <typename CmdType> + requires std::is_base_of_v<api::StorageCommand, CmdType> + [[nodiscard]] std::shared_ptr<CmdType> sent_command(size_t idx) { + assert(idx < _sender.commands().size()); + auto cmd = std::dynamic_pointer_cast<CmdType>(_sender.command(idx)); + assert(cmd != nullptr); + return cmd; + } + + void config_enable_condition_probing(bool enable); + void tag_content_node_supports_condition_probing(uint16_t index, bool supported); + protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 2a3f06b1e8c..bc0118b6d5e 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -63,7 +63,7 @@ public: } std::shared_ptr<api::StorageCommand> msg = _sender.command(idx); - api::StorageReply::SP reply(msg->makeReply().release()); + api::StorageReply::SP reply(msg->makeReply()); dynamic_cast<api::BucketInfoReply&>(*reply).setBucketInfo(info); reply->setResult(result); @@ -96,6 +96,30 @@ public: } void set_up_3_nodes_and_send_put_with_create_bucket_acks(); + + std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) { + return sent_command<api::GetCommand>(idx); + } + + std::shared_ptr<api::PutCommand> sent_put_command(size_t idx) { + return sent_command<api::PutCommand>(idx); + } + + static std::shared_ptr<api::GetReply> make_get_reply(const api::GetCommand& cmd, api::Timestamp ts, + bool is_tombstone, bool condition_matched) + { + return std::make_shared<api::GetReply>(cmd, std::shared_ptr<document::Document>(), ts, + false, is_tombstone, condition_matched); + } + + std::shared_ptr<api::GetReply> make_failed_get_reply(size_t cmd_idx) { + auto reply = make_get_reply(*sent_get_command(cmd_idx), 0, false, false); + reply->setResult(api::ReturnCode(api::ReturnCode::INTERNAL_FAILURE, "did a bork")); + return reply; + } + + void set_up_tas_put_with_2_inconsistent_replica_nodes(); + }; PutOperationTest::~PutOperationTest() = default; @@ -650,4 +674,86 @@ TEST_F(PutOperationTest, minority_failure_override_not_in_effect_for_non_tas_err _sender.getLastReply()); } +void PutOperationTest::set_up_tas_put_with_2_inconsistent_replica_nodes() { + setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1"); + config_enable_condition_probing(true); + tag_content_node_supports_condition_probing(0, true); + tag_content_node_supports_condition_probing(1, true); + + auto doc = createDummyDocument("test", "test"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + addNodesToBucketDB(bucket, "1=10/20/30,0=20/30/40"); + + auto put = createPut(doc); + put->setCondition(TestAndSetCondition("test.foo")); + sendPut(std::move(put)); + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); +} + +TEST_F(PutOperationTest, matching_condition_probe_sends_unconditional_puts_to_all_nodes) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 50, false, true)); + op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, true)); + + ASSERT_EQ("Get => 1,Get => 0,Put => 1,Put => 0", _sender.getCommands(true)); // Note: cumulative message list + + auto put_n1 = sent_put_command(2); + EXPECT_FALSE(put_n1->hasTestAndSetCondition()); + auto put_n0 = sent_put_command(3); + EXPECT_FALSE(put_n0->hasTestAndSetCondition()); + + // Ensure replies are no longer routed to condition checker + ASSERT_TRUE(_sender.replies().empty()); + sendReply(2); // put to node 1 + ASSERT_TRUE(_sender.replies().empty()); + sendReply(3); // put to node 0 + ASSERT_EQ(_sender.replies().size(), 1); + EXPECT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(NONE)", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, mismatching_condition_probe_fails_op_with_tas_error) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 50, false, false)); + op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Condition did not match document)", + _sender.getLastReply()); +} + +// TODO change semantics for Not Found... +TEST_F(PutOperationTest, not_found_condition_probe_fails_op_with_tas_error) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(TEST_AND_SET_CONDITION_FAILED, Document does not exist)", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + op->receive(_sender, make_failed_get_reply(1)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "One or more replicas failed during test-and-set condition evaluation)", + _sender.getLastReply()); +} + } diff --git a/storage/src/vespa/storage/config/distributorconfiguration.cpp b/storage/src/vespa/storage/config/distributorconfiguration.cpp index c83c5cdd245..c6e3ebb7087 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.cpp +++ b/storage/src/vespa/storage/config/distributorconfiguration.cpp @@ -53,6 +53,7 @@ DistributorConfiguration::DistributorConfiguration(StorageComponent& component) _use_unordered_merge_chaining(false), _inhibit_default_merges_when_global_merges_pending(false), _enable_two_phase_garbage_collection(false), + _enable_condition_probing(false), _minimumReplicaCountingMode(ReplicaCountingMode::TRUSTED) { } diff --git a/storage/src/vespa/storage/config/distributorconfiguration.h b/storage/src/vespa/storage/config/distributorconfiguration.h index a18770ac69f..e3664276518 100644 --- a/storage/src/vespa/storage/config/distributorconfiguration.h +++ b/storage/src/vespa/storage/config/distributorconfiguration.h @@ -287,6 +287,12 @@ public: [[nodiscard]] bool enable_two_phase_garbage_collection() const noexcept { return _enable_two_phase_garbage_collection; } + void set_enable_condition_probing(bool enable) noexcept { + _enable_condition_probing = enable; + } + [[nodiscard]] bool enable_condition_probing() const noexcept { + return _enable_condition_probing; + } uint32_t num_distributor_stripes() const noexcept { return _num_distributor_stripes; } @@ -347,6 +353,7 @@ private: bool _use_unordered_merge_chaining; bool _inhibit_default_merges_when_global_merges_pending; bool _enable_two_phase_garbage_collection; + bool _enable_condition_probing; DistrConfig::MinimumReplicaCountingMode _minimumReplicaCountingMode; diff --git a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt index 8556815b241..f6f079d8d5f 100644 --- a/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt +++ b/storage/src/vespa/storage/distributor/operations/external/CMakeLists.txt @@ -1,7 +1,9 @@ # Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. vespa_add_library(storage_distributoroperationexternal OBJECT SOURCES + check_condition.cpp getoperation.cpp + intermediate_message_sender.cpp newest_replica.cpp putoperation.cpp read_for_write_visitor_operation.cpp diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp new file mode 100644 index 00000000000..7f7e6b08c1f --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp @@ -0,0 +1,216 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "check_condition.h" +#include "getoperation.h" +#include "intermediate_message_sender.h" +#include <vespa/document/fieldset/fieldsets.h> +#include <vespa/storage/distributor/distributor_bucket_space.h> +#include <vespa/storage/distributor/distributor_node_context.h> +#include <vespa/storage/distributor/distributor_stripe_operation_context.h> +#include <vespa/storage/distributor/node_supported_features_repo.h> +#include <vespa/vdslib/state/clusterstate.h> +#include <vespa/storageapi/message/persistence.h> +#include <cassert> + +#include <vespa/log/log.h> +LOG_SETUP(".distributor.operations.external.check_condition"); + +namespace storage::distributor { + +CheckCondition::CheckCondition(Outcome known_outcome, + const DistributorBucketSpace& bucket_space, + const DistributorNodeContext& node_ctx, + private_ctor_tag) + : _doc_id_bucket(), + _bucket_space(bucket_space), + _node_ctx(node_ctx), + _cluster_state_version_at_creation_time(_bucket_space.getClusterState().getVersion()), + _cond_get_op(), + _sent_message_map(), + _outcome(known_outcome) +{ +} + +CheckCondition::CheckCondition(const document::Bucket& bucket, + const document::DocumentId& doc_id, + const documentapi::TestAndSetCondition& tas_condition, + const DistributorBucketSpace& bucket_space, + const DistributorNodeContext& node_ctx, + PersistenceOperationMetricSet& metric, + private_ctor_tag) + : _doc_id_bucket(bucket), + _bucket_space(bucket_space), + _node_ctx(node_ctx), + _cluster_state_version_at_creation_time(_bucket_space.getClusterState().getVersion()), + _cond_get_op(), + _sent_message_map() +{ + // Condition checks only return metadata back to the distributor and thus have an empty fieldset. + // Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID. + auto get_cmd = std::make_shared<api::GetCommand>(_doc_id_bucket, doc_id, document::NoFields::NAME); + get_cmd->set_condition(tas_condition); + _cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space, + _bucket_space.getBucketDatabase().acquire_read_guard(), + std::move(get_cmd), + metric, api::InternalReadConsistency::Strong); +} + +CheckCondition::~CheckCondition() = default; + +void CheckCondition::start_and_send(DistributorStripeMessageSender& sender) { + IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); + _cond_get_op->start(proxy_sender); + if (proxy_sender._reply) { + // Could not send any Get ops at all; bail out immediately + handle_internal_get_operation_reply(std::move(proxy_sender._reply)); + } +} + +void +CheckCondition::handle_reply(DistributorStripeMessageSender& sender, + const std::shared_ptr<api::StorageReply>& reply) +{ + auto op = _sent_message_map.pop(reply->getMsgId()); + assert(op == _cond_get_op); // We only wrap a single operation + IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); + _cond_get_op->onReceive(proxy_sender, reply); + if (proxy_sender._reply) { + handle_internal_get_operation_reply(std::move(proxy_sender._reply)); + } +} + +void CheckCondition::cancel(DistributorStripeMessageSender& sender) { + IntermediateMessageSender proxy_sender(_sent_message_map, _cond_get_op, sender); + _cond_get_op->onClose(proxy_sender); + // We don't propagate any generated reply from the GetOperation, as its existence + // is an implementation detail. +} + +// FIXME this is a (logic-inverted) duplicate of TwoPhaseUpdateOperation and partially of +// GetOperation, but all can be removed entirely once we redesign how operations are aborted +// across cluster state edges...! +bool CheckCondition::replica_set_changed_after_get_operation() const { + auto entries = get_bucket_database_entries(_bucket_space, _doc_id_bucket.getBucketId()); + + std::vector<std::pair<document::BucketId, uint16_t>> replicas_in_db_now; + for (const auto & e : entries) { + for (uint32_t i = 0; i < e->getNodeCount(); i++) { + const auto& copy = e->getNodeRef(i); + replicas_in_db_now.emplace_back(e.getBucketId(), copy.getNode()); + } + } + return (replicas_in_db_now != _cond_get_op->replicas_in_db()); +} + +CheckCondition::Outcome::Result +CheckCondition::newest_replica_to_outcome(const std::optional<NewestReplica>& newest) noexcept { + if (!newest) { + // Did not find any replicas to send to; implicitly Not Found + return Outcome::Result::NotFound; + } + if (newest->condition_matched) { + return Outcome::Result::MatchedCondition; + } else if (newest->is_tombstone || newest->timestamp == 0) { + return Outcome::Result::NotFound; + } else { + return Outcome::Result::DidNotMatchCondition; + } +} + +std::vector<BucketDatabase::Entry> +CheckCondition::get_bucket_database_entries(const DistributorBucketSpace& bucket_space, + const document::BucketId& bucket_id) +{ + std::vector<BucketDatabase::Entry> entries; + bucket_space.getBucketDatabase().getParents(bucket_id, entries); + return entries; +} + +void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::StorageReply> reply) { + if (reply->getResult().success()) { + if (_cond_get_op->any_replicas_failed()) { + _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED, + "One or more replicas failed during test-and-set condition evaluation")); + return; + } + const auto state_version_now = _bucket_space.getClusterState().getVersion(); + if ((state_version_now != _cluster_state_version_at_creation_time) + && replica_set_changed_after_get_operation()) + { + // BUCKET_NOT_FOUND is semantically (usually) inaccurate here, but it's what we use for this purpose + // in existing operations. Checking the replica set will implicitly check for ownership changes, + // as it will be empty if the distributor no longer owns the bucket. + // FIXME but it doesn't handle ABA-cases, so we still want to redesign operation aborting to be + // explicitly edge-handled...! + _outcome.emplace(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, + "Bucket ownership or replica set changed between condition " + "read and operation write phases")); + } else { + auto maybe_newest = _cond_get_op->newest_replica(); + _outcome.emplace(newest_replica_to_outcome(maybe_newest)); + } + } else { + _outcome.emplace(reply->getResult()); + } +} + +bool CheckCondition::bucket_has_consistent_replicas(std::span<const BucketDatabase::Entry> entries) { + // Fast path iff bucket exists AND is consistent (split and copies). Same as TwoPhaseUpdateOperation. + // TODO consolidate logic + if (entries.size() != 1) { + return false; + } + return entries[0]->validAndConsistent(); +} +bool +CheckCondition::all_nodes_support_document_condition_probe(std::span<const BucketDatabase::Entry> entries, + const DistributorStripeOperationContext& op_ctx) +{ + // TODO move node set feature checking to repo itself + const auto& features_repo = op_ctx.node_supported_features_repo(); + for (const auto& entry : entries) { + for (uint32_t i = 0; i < entry->getNodeCount(); ++i) { + if (!features_repo.node_supported_features(entry->getNodeRef(i).getNode()).document_condition_probe) { + return false; + } + } + } + + return true; +} + +std::shared_ptr<CheckCondition> +CheckCondition::create_not_found(const DistributorBucketSpace& bucket_space, + const DistributorNodeContext& node_ctx) +{ + return std::make_shared<CheckCondition>(Outcome(Outcome::Result::NotFound), + bucket_space, node_ctx, private_ctor_tag{}); +} + +std::shared_ptr<CheckCondition> +CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket, + const DistributorBucketSpace& bucket_space, + const document::DocumentId& doc_id, + const documentapi::TestAndSetCondition& tas_condition, + const DistributorNodeContext& node_ctx, + const DistributorStripeOperationContext& op_ctx, + PersistenceOperationMetricSet& metric) +{ + // TODO move this check to the caller? + if (!op_ctx.distributor_config().enable_condition_probing()) { + return {}; + } + auto entries = get_bucket_database_entries(bucket_space, bucket.getBucketId()); + if (entries.empty()) { + return {}; // Not found + } + if (bucket_has_consistent_replicas(entries)) { + return {}; // Replicas are consistent; no need for write-repair + } + if (!all_nodes_support_document_condition_probe(entries, op_ctx)) { + return {}; // Want write-repair, but one or more nodes are too old to use the feature + } + return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space, + node_ctx, metric, private_ctor_tag{}); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h new file mode 100644 index 00000000000..ea3bfff2e3e --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h @@ -0,0 +1,160 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include "newest_replica.h" +#include <vespa/document/bucket/bucket.h> +#include <vespa/storage/bucketdb/bucketdatabase.h> +#include <vespa/storage/distributor/sentmessagemap.h> +#include <vespa/storageapi/messageapi/returncode.h> +#include <memory> +#include <optional> +#include <span> +#include <vector> + +namespace document { class DocumentId; } +namespace documentapi { class TestAndSetCondition; } +namespace storage::api { class StorageReply; } + +namespace storage::distributor { + +class DistributorBucketSpace; +class DistributorNodeContext; +class DistributorStripeMessageSender; +class DistributorStripeOperationContext; +class GetOperation; +class PersistenceOperationMetricSet; + +/** + * Encapsulates all logic required to evaluate a test-and-set condition for a + * particular document ID across a set of bucket replicas. + * + * Usage and lifecycle: + * + * 1. Invoke start_and_send() once. + * 2. Check if maybe_outcome() indicates that a result is ready. If so, the + * instance can be safely discarded after the outcome has been handled. + * 3. Wait for responses to messages sent. + * 4. When a GetReply is received, invoke handle_reply() with it. Note that + * this may transparently send new requests in case of transient failure + * responses. + * 5. Check if maybe_outcome() is ready, c.f. #2. + * 6. Go to 3. + * + * Although this class appears very similar to an Operation in that it sends + * requests and accepts responses, it is not an actual Operation itself. + * It is instead intended to be directly used _by_ other phased operations that + * require distributed condition checking as part of their write repair logic. + * One major distinction between a CheckCondition and an Operation is that this + * class does _not_ generate a Reply to communicate its result. Instead, the + * caller must check maybe_outcome() after every operation on the instance to + * poll if the condition evaluation is done (or has failed). + */ +class CheckCondition { +public: + class Outcome { + public: + enum class Result { + HasError, + MatchedCondition, + DidNotMatchCondition, + NotFound, + }; + + explicit Outcome(api::ReturnCode error_code) noexcept + : _error_code(std::move(error_code)), + _result(Result::HasError) + { + } + + explicit Outcome(Result result) noexcept + : _error_code(), + _result(result) + { + } + + [[nodiscard]] bool failed() const noexcept { + return _error_code.failed(); + } + + const api::ReturnCode& error_code() const noexcept { + return _error_code; + } + + [[nodiscard]] bool matched_condition() const noexcept { + return _result == Result::MatchedCondition; + } + + [[nodiscard]] bool not_found() const noexcept { + return _result == Result::NotFound; + } + + private: + api::ReturnCode _error_code; + Result _result; + }; +private: + const document::Bucket _doc_id_bucket; + const DistributorBucketSpace& _bucket_space; + const DistributorNodeContext& _node_ctx; + const uint32_t _cluster_state_version_at_creation_time; // TODO encapsulate this better + std::shared_ptr<GetOperation> _cond_get_op; + SentMessageMap _sent_message_map; + std::optional<Outcome> _outcome; + + struct private_ctor_tag {}; +public: + CheckCondition(Outcome known_outcome, + const DistributorBucketSpace& bucket_space, + const DistributorNodeContext& node_ctx, + private_ctor_tag); + CheckCondition(const document::Bucket& bucket, + const document::DocumentId& doc_id, + const documentapi::TestAndSetCondition& tas_condition, + const DistributorBucketSpace& bucket_space, + const DistributorNodeContext& node_ctx, + PersistenceOperationMetricSet& metric, + private_ctor_tag); + ~CheckCondition(); + + void start_and_send(DistributorStripeMessageSender& sender); + void handle_reply(DistributorStripeMessageSender& sender, + const std::shared_ptr<api::StorageReply>& reply); + void cancel(DistributorStripeMessageSender& sender); + + [[nodiscard]] const std::optional<Outcome>& maybe_outcome() const noexcept { + return _outcome; + } + + [[nodiscard]] static std::shared_ptr<CheckCondition> create_if_inconsistent_replicas( + const document::Bucket& bucket, + const DistributorBucketSpace& bucket_space, + const document::DocumentId& doc_id, + const documentapi::TestAndSetCondition& tas_condition, + const DistributorNodeContext& node_ctx, + const DistributorStripeOperationContext& op_ctx, + PersistenceOperationMetricSet& metric); +private: + [[nodiscard]] bool replica_set_changed_after_get_operation() const; + + void handle_internal_get_operation_reply(std::shared_ptr<api::StorageReply> reply); + + [[nodiscard]] static Outcome::Result newest_replica_to_outcome( + const std::optional<NewestReplica>& newest) noexcept; + + [[nodiscard]] static bool bucket_has_consistent_replicas( + std::span<const BucketDatabase::Entry> entries); + + [[nodiscard]] static bool all_nodes_support_document_condition_probe( + std::span<const BucketDatabase::Entry> entries, + const DistributorStripeOperationContext& op_ctx); + + [[nodiscard]] static std::vector<BucketDatabase::Entry> get_bucket_database_entries( + const DistributorBucketSpace& bucket_space, + const document::BucketId& bucket_id); + + [[nodiscard]] static std::shared_ptr<CheckCondition> create_not_found( + const DistributorBucketSpace& bucket_space, + const DistributorNodeContext& node_ctx); +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 44148bdbf25..72943eaf713 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -12,7 +12,7 @@ #include <cassert> #include <vespa/log/log.h> -LOG_SETUP(".distributor.callback.doc.get"); +LOG_SETUP(".distributor.operations.external.get"); using document::BucketSpace; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 96b5d970904..9d6b9f1986d 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -24,8 +24,8 @@ class GetOperation : public Operation { public: GetOperation(const DistributorNodeContext& node_ctx, - const DistributorBucketSpace &bucketSpace, - const std::shared_ptr<BucketDatabase::ReadGuard> & read_guard, + const DistributorBucketSpace& bucketSpace, + const std::shared_ptr<BucketDatabase::ReadGuard>& read_guard, std::shared_ptr<api::GetCommand> msg, PersistenceOperationMetricSet& metric, api::InternalReadConsistency desired_read_consistency = api::InternalReadConsistency::Strong); diff --git a/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp new file mode 100644 index 00000000000..a7a008e2e09 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.cpp @@ -0,0 +1,52 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "intermediate_message_sender.h" +#include <vespa/storageapi/messageapi/storagecommand.h> +#include <vespa/storageapi/messageapi/storagereply.h> + +namespace storage::distributor { + +IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm, + std::shared_ptr<Operation> cb, + DistributorStripeMessageSender& fwd) noexcept + : msgMap(mm), + callback(std::move(cb)), + forward(fwd) +{ +} + +IntermediateMessageSender::~IntermediateMessageSender() = default; + +void IntermediateMessageSender::sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) { + msgMap.insert(cmd->getMsgId(), callback); + forward.sendCommand(cmd); +}; + +void IntermediateMessageSender::sendReply(const std::shared_ptr<api::StorageReply>& reply) { + _reply = reply; +} + +int IntermediateMessageSender::getDistributorIndex() const { + return forward.getDistributorIndex(); +} + +const ClusterContext& IntermediateMessageSender::cluster_context() const { + return forward.cluster_context(); +} + +PendingMessageTracker& IntermediateMessageSender::getPendingMessageTracker() { + return forward.getPendingMessageTracker(); +} + +const PendingMessageTracker& IntermediateMessageSender::getPendingMessageTracker() const { + return forward.getPendingMessageTracker(); +} + +const OperationSequencer& IntermediateMessageSender::operation_sequencer() const noexcept { + return forward.operation_sequencer(); +} + +OperationSequencer& IntermediateMessageSender::operation_sequencer() noexcept { + return forward.operation_sequencer(); +} + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h new file mode 100644 index 00000000000..6d7ac5b1860 --- /dev/null +++ b/storage/src/vespa/storage/distributor/operations/external/intermediate_message_sender.h @@ -0,0 +1,33 @@ +// Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#pragma once + +#include <vespa/storage/distributor/distributormessagesender.h> +#include <vespa/storage/distributor/sentmessagemap.h> +#include <memory> + +namespace storage::api { class StorageReply; } + +namespace storage::distributor { + +struct IntermediateMessageSender final : DistributorStripeMessageSender { + SentMessageMap& msgMap; + std::shared_ptr<Operation> callback; + DistributorStripeMessageSender& forward; + std::shared_ptr<api::StorageReply> _reply; + + IntermediateMessageSender(SentMessageMap& mm, + std::shared_ptr<Operation> cb, + DistributorStripeMessageSender& fwd) noexcept; + ~IntermediateMessageSender() override; + + void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override; + void sendReply(const std::shared_ptr<api::StorageReply>& reply) override; + int getDistributorIndex() const override; + const ClusterContext& cluster_context() const override; + PendingMessageTracker& getPendingMessageTracker() override; + const PendingMessageTracker& getPendingMessageTracker() const override; + const OperationSequencer& operation_sequencer() const noexcept override; + OperationSequencer& operation_sequencer() noexcept override; +}; + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index aa5ad217ae9..2e2eacd88c1 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -1,7 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. +#include "check_condition.h" #include "putoperation.h" - #include <vespa/document/fieldvalue/document.h> #include <vespa/storage/distributor/activecopy.h> #include <vespa/storage/distributor/distributor_bucket_space.h> @@ -23,15 +23,19 @@ namespace storage::distributor { PutOperation::PutOperation(const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, - DistributorBucketSpace& bucketSpace, + DistributorBucketSpace& bucket_space, std::shared_ptr<api::PutCommand> msg, - PersistenceOperationMetricSet& metric, SequencingHandle sequencingHandle) - : SequencedOperation(std::move(sequencingHandle)), - _trackerInstance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()), - _tracker(_trackerInstance), + PersistenceOperationMetricSet& metric, + SequencingHandle sequencing_handle) + : SequencedOperation(std::move(sequencing_handle)), + _tracker_instance(metric, std::make_shared<api::PutReply>(*msg), node_ctx, op_ctx, msg->getTimestamp()), + _tracker(_tracker_instance), _msg(std::move(msg)), + _doc_id_bucket_id(document::BucketIdFactory{}.getBucketId(_msg->getDocumentId())), + _node_ctx(node_ctx), _op_ctx(op_ctx), - _bucketSpace(bucketSpace) + _temp_metric(metric), // TODO + _bucket_space(bucket_space) { } @@ -61,10 +65,10 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi if (setOneActive) { assert(!multipleBuckets); (void) multipleBuckets; - BucketDatabase::Entry entry(_bucketSpace.getBucketDatabase().get(lastBucket)); + BucketDatabase::Entry entry(_bucket_space.getBucketDatabase().get(lastBucket)); std::vector<uint16_t> idealState( - _bucketSpace.get_ideal_service_layer_nodes_bundle(lastBucket).get_available_nodes()); - active = ActiveCopy::calculate(idealState, _bucketSpace.getDistribution(), entry, + _bucket_space.get_ideal_service_layer_nodes_bundle(lastBucket).get_available_nodes()); + active = ActiveCopy::calculate(idealState, _bucket_space.getDistribution(), entry, _op_ctx.distributor_config().max_activation_inhibited_out_of_sync_groups()); LOG(debug, "Active copies for bucket %s: %s", entry.getBucketId().toString().c_str(), active.toString().c_str()); for (uint32_t i=0; i<active.size(); ++i) { @@ -72,7 +76,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi copy.setActive(true); entry->updateNode(copy); } - _bucketSpace.getBucketDatabase().update(entry); + _bucket_space.getBucketDatabase().update(entry); } for (uint32_t i=0, n=copies.size(); i<n; ++i) { if (!copies[i].isNewCopy()) continue; @@ -91,7 +95,7 @@ PutOperation::insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetLi void PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, - const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch) + uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch) { document::Bucket bucket(bucketSpace, bucketId); auto command = std::make_shared<api::PutCommand>(bucket, _msg->getDocument(), _msg->getTimestamp()); @@ -105,6 +109,7 @@ PutOperation::sendPutToBucketOnNode(document::BucketSpace bucketSpace, const doc } bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const { + // TODO handle this explicitly as part of operation abort/cancel edge auto* pending_state = _op_ctx.pending_cluster_state_or_null(_msg->getBucket().getBucketSpace()); if (!pending_state) { return false; @@ -116,7 +121,7 @@ bool PutOperation::has_unavailable_targets_in_pending_state(const OperationTarge } bool PutOperation::at_least_one_storage_node_is_available() const { - const lib::ClusterState& cluster_state = _bucketSpace.getClusterState(); + const lib::ClusterState& cluster_state = _bucket_space.getClusterState(); const uint16_t storage_node_index_ubound = cluster_state.getNodeCount(lib::NodeType::STORAGE); for (uint16_t i = 0; i < storage_node_index_ubound; i++) { @@ -129,29 +134,59 @@ bool PutOperation::at_least_one_storage_node_is_available() const { return false; } +bool PutOperation::has_condition() const noexcept { + return _msg->hasTestAndSetCondition(); +} + void PutOperation::onStart(DistributorStripeMessageSender& sender) { - document::BucketIdFactory bucketIdFactory; - document::BucketId bid = bucketIdFactory.getBucketId(_msg->getDocumentId()); + LOG(debug, "Received Put %s for bucket %s", + _msg->getDocumentId().toString().c_str(), _doc_id_bucket_id.toString().c_str()); + + if (!has_condition()) { + start_direct_put_dispatch(sender); + } else { + start_conditional_put(sender); + } +} + +void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender) { + document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id); + _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(), + _msg->getCondition(), _node_ctx, _op_ctx, + _temp_metric); + if (!_check_condition) { + start_direct_put_dispatch(sender); + } else { + // Inconsistent replicas; need write repair + _check_condition->start_and_send(sender); + const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately + if (outcome) { + on_completed_check_condition(*outcome, sender); + } + } +} - LOG(debug, "Received PUT %s for bucket %s", _msg->getDocumentId().toString().c_str(), bid.toString().c_str()); +void PutOperation::start_direct_put_dispatch(DistributorStripeMessageSender& sender) { + LOG(debug, "Starting fast path Put %s for bucket %s", + _msg->getDocumentId().toString().c_str(), _doc_id_bucket_id.toString().c_str()); if (at_least_one_storage_node_is_available()) { std::vector<document::BucketId> bucketsToCheckForSplit; - OperationTargetResolverImpl targetResolver(_bucketSpace, _bucketSpace.getBucketDatabase(), - _op_ctx.distributor_config().getMinimalBucketSplit(), - _bucketSpace.getDistribution().getRedundancy(), - _msg->getBucket().getBucketSpace()); - OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, bid)); + OperationTargetResolverImpl targetResolver(_bucket_space, _bucket_space.getBucketDatabase(), + _op_ctx.distributor_config().getMinimalBucketSplit(), + _bucket_space.getDistribution().getRedundancy(), + _msg->getBucket().getBucketSpace()); + OperationTargetList targets(targetResolver.getTargets(OperationTargetResolver::PUT, _doc_id_bucket_id)); for (const auto& target : targets) { if (_op_ctx.has_pending_message(target.getNode().getIndex(), target.getBucket(), api::MessageType::DELETEBUCKET_ID)) { _tracker.fail(sender, api::ReturnCode(api::ReturnCode::BUCKET_DELETED, - "Bucket was being deleted while we got a PUT, failing operation to be safe")); + "Bucket was being deleted while we got a PUT, failing operation to be safe")); return; } } @@ -165,7 +200,7 @@ PutOperation::onStart(DistributorStripeMessageSender& sender) // Mark any entries we're not feeding to as not trusted. std::vector<BucketDatabase::Entry> entries; - _bucketSpace.getBucketDatabase().getParents(bid, entries); + _bucket_space.getBucketDatabase().getParents(_doc_id_bucket_id, entries); std::vector<PersistenceMessageTracker::ToSend> createBucketBatch; if (targets.hasAnyNewCopies()) { @@ -214,7 +249,7 @@ PutOperation::onStart(DistributorStripeMessageSender& sender) bool PutOperation::shouldImplicitlyActivateReplica(const OperationTargetList& targets) const { - const auto& config(_op_ctx.distributor_config()); + const auto& config = _op_ctx.distributor_config(); if (config.isBucketActivationDisabled()) { return false; } @@ -225,15 +260,48 @@ void PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::shared_ptr<api::StorageReply>& msg) { LOG(debug, "Received %s", msg->toString(true).c_str()); - _tracker.receiveReply(sender, static_cast<api::BucketInfoReply&>(*msg)); + if (!_check_condition) { + _tracker.receiveReply(sender, dynamic_cast<api::BucketInfoReply&>(*msg)); + } else { + _check_condition->handle_reply(sender, msg); + const auto& outcome = _check_condition->maybe_outcome(); + if (!outcome) { + return; // Condition check not done yet + } + on_completed_check_condition(*outcome, sender); + } +} + +void +PutOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome, + DistributorStripeMessageSender& sender) +{ + if (outcome.matched_condition()) { + _msg->clear_condition(); // Transform to unconditional Put + start_direct_put_dispatch(sender); + } else if (outcome.not_found()) { + // TODO create:true combined with not found + // TODO "not found" not a TaS error... + _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, + "Document does not exist")); + } else if (outcome.failed()) { + api::ReturnCode wrapped_error(outcome.error_code().getResult(), + "Failed during write repair condition probe step. Reason: " + outcome.error_code().getMessage()); + _tracker.fail(sender, wrapped_error); + } else { + _tracker.fail(sender, api::ReturnCode(api::ReturnCode::TEST_AND_SET_CONDITION_FAILED, + "Condition did not match document")); + } + _check_condition.reset(); } void PutOperation::onClose(DistributorStripeMessageSender& sender) { - const char* error = "Process is shutting down"; - LOG(debug, "%s", error); - _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, error)); + if (_check_condition) { + _check_condition->cancel(sender); + } + _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } } diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index 283395f1406..d80e9cc00d7 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -2,6 +2,7 @@ #pragma once +#include "check_condition.h" #include <vespa/storage/distributor/operations/sequenced_operation.h> #include <vespa/storage/distributor/persistencemessagetracker.h> @@ -37,24 +38,32 @@ public: void onClose(DistributorStripeMessageSender& sender) override; private: - PersistenceMessageTrackerImpl _trackerInstance; - PersistenceMessageTracker& _tracker; - + PersistenceMessageTrackerImpl _tracker_instance; + PersistenceMessageTracker& _tracker; + std::shared_ptr<api::PutCommand> _msg; + document::BucketId _doc_id_bucket_id; + const DistributorNodeContext& _node_ctx; + DistributorStripeOperationContext& _op_ctx; + PersistenceOperationMetricSet& _temp_metric; + DistributorBucketSpace& _bucket_space; + std::shared_ptr<CheckCondition> _check_condition; + + void start_direct_put_dispatch(DistributorStripeMessageSender& sender); + void start_conditional_put(DistributorStripeMessageSender& sender); + void on_completed_check_condition(const CheckCondition::Outcome& outcome, + DistributorStripeMessageSender& sender); void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive, const api::StorageCommand& originalCommand, std::vector<MessageTracker::ToSend>& messagesToSend); void sendPutToBucketOnNode(document::BucketSpace bucketSpace, const document::BucketId& bucketId, - const uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch); + uint16_t node, std::vector<PersistenceMessageTracker::ToSend>& putBatch); [[nodiscard]] bool shouldImplicitlyActivateReplica(const OperationTargetList& targets) const; [[nodiscard]] bool has_unavailable_targets_in_pending_state(const OperationTargetList& targets) const; [[nodiscard]] bool at_least_one_storage_node_is_available() const; - - std::shared_ptr<api::PutCommand> _msg; - DistributorStripeOperationContext& _op_ctx; - DistributorBucketSpace &_bucketSpace; + [[nodiscard]] bool has_condition() const noexcept; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp index 5fd42021a63..dd6e1e93791 100644 --- a/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/removelocationoperation.cpp @@ -8,15 +8,13 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/vdslib/state/clusterstate.h> - #include <vespa/log/log.h> -LOG_SETUP(".distributor.callback.doc.removelocation"); - +LOG_SETUP(".distributor.operations.external.remove_location"); -using namespace storage::distributor; -using namespace storage; using document::BucketSpace; +namespace storage::distributor { + RemoveLocationOperation::RemoveLocationOperation( const DistributorNodeContext& node_ctx, DistributorStripeOperationContext& op_ctx, @@ -120,3 +118,5 @@ RemoveLocationOperation::onClose(DistributorStripeMessageSender& sender) _tracker.fail(sender, api::ReturnCode(api::ReturnCode::ABORTED, "Process is shutting down")); } + +} diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp index 6b2550ab547..0b250a36b27 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketlistoperation.cpp @@ -4,8 +4,7 @@ #include <vespa/storage/distributor/maintenance/maintenanceoperationgenerator.h> #include <sstream> -namespace storage { -namespace distributor { +namespace storage::distributor { StatBucketListOperation::StatBucketListOperation( const BucketDatabase& bucketDb, @@ -61,4 +60,3 @@ StatBucketListOperation::onStart(DistributorStripeMessageSender& sender) } } -} diff --git a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp index 85f36e7a963..a0b3f12f76b 100644 --- a/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/statbucketoperation.cpp @@ -6,7 +6,7 @@ #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/log/log.h> -LOG_SETUP(".distributor.callback.statbucket"); +LOG_SETUP(".distributor.operations.external.stat_bucket"); namespace storage::distributor { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp index 515b72520ec..0cb4b223c11 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp @@ -1,6 +1,7 @@ // Copyright Yahoo. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. #include "getoperation.h" +#include "intermediate_message_sender.h" #include "putoperation.h" #include "twophaseupdateoperation.h" #include "updateoperation.h" @@ -15,7 +16,7 @@ #include <cinttypes> #include <vespa/log/log.h> -LOG_SETUP(".distributor.callback.twophaseupdate"); +LOG_SETUP(".distributor.operations.external.two_phase_update"); using namespace std::literals::string_literals; using document::BucketSpace; @@ -56,63 +57,6 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation( TwoPhaseUpdateOperation::~TwoPhaseUpdateOperation() = default; -namespace { - -struct IntermediateMessageSender : DistributorStripeMessageSender { - SentMessageMap& msgMap; - std::shared_ptr<Operation> callback; - DistributorStripeMessageSender& forward; - std::shared_ptr<api::StorageReply> _reply; - - IntermediateMessageSender(SentMessageMap& mm, std::shared_ptr<Operation> cb, DistributorStripeMessageSender & fwd); - ~IntermediateMessageSender() override; - - void sendCommand(const std::shared_ptr<api::StorageCommand>& cmd) override { - msgMap.insert(cmd->getMsgId(), callback); - forward.sendCommand(cmd); - }; - - void sendReply(const std::shared_ptr<api::StorageReply>& reply) override { - _reply = reply; - } - - int getDistributorIndex() const override { - return forward.getDistributorIndex(); - } - - const ClusterContext & cluster_context() const override { - return forward.cluster_context(); - } - - PendingMessageTracker& getPendingMessageTracker() override { - return forward.getPendingMessageTracker(); - } - - const PendingMessageTracker& getPendingMessageTracker() const override { - return forward.getPendingMessageTracker(); - } - - const OperationSequencer& operation_sequencer() const noexcept override { - return forward.operation_sequencer(); - } - - OperationSequencer& operation_sequencer() noexcept override { - return forward.operation_sequencer(); - } -}; - -IntermediateMessageSender::IntermediateMessageSender(SentMessageMap& mm, - std::shared_ptr<Operation> cb, - DistributorStripeMessageSender & fwd) - : msgMap(mm), - callback(std::move(cb)), - forward(fwd) -{ } - -IntermediateMessageSender::~IntermediateMessageSender() = default; - -} - const char* TwoPhaseUpdateOperation::stateToString(SendState state) noexcept { diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h index 7c1b6b3eed6..486ed766510 100644 --- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h @@ -135,26 +135,28 @@ private: // Precondition: reply has not yet been sent. vespalib::string update_doc_id() const; - UpdateMetricSet& _updateMetric; - PersistenceOperationMetricSet& _putMetric; - PersistenceOperationMetricSet& _getMetric; - PersistenceOperationMetricSet& _metadata_get_metrics; + using ReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>; + + UpdateMetricSet& _updateMetric; + PersistenceOperationMetricSet& _putMetric; + PersistenceOperationMetricSet& _getMetric; + PersistenceOperationMetricSet& _metadata_get_metrics; std::shared_ptr<api::UpdateCommand> _updateCmd; - std::shared_ptr<api::UpdateReply> _updateReply; - const DistributorNodeContext& _node_ctx; - DistributorStripeOperationContext& _op_ctx; - const DocumentSelectionParser& _parser; - DistributorBucketSpace &_bucketSpace; - SentMessageMap _sentMessageMap; - SendState _sendState; - Mode _mode; - mbus::Trace _trace; - document::BucketId _updateDocBucketId; - std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time; + std::shared_ptr<api::UpdateReply> _updateReply; + const DistributorNodeContext& _node_ctx; + DistributorStripeOperationContext& _op_ctx; + const DocumentSelectionParser& _parser; + DistributorBucketSpace& _bucketSpace; + SentMessageMap _sentMessageMap; + SendState _sendState; + Mode _mode; + mbus::Trace _trace; + document::BucketId _updateDocBucketId; + ReplicaState _replicas_at_get_send_time; std::optional<framework::MilliSecTimer> _single_get_latency_timer; - uint16_t _fast_path_repair_source_node; - bool _use_initial_cheap_metadata_fetch_phase; - bool _replySent; + uint16_t _fast_path_repair_source_node; + bool _use_initial_cheap_metadata_fetch_phase; + bool _replySent; }; } diff --git a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp index f3ce71b08d6..8988f2589ce 100644 --- a/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/updateoperation.cpp @@ -12,7 +12,7 @@ #include <vespa/log/log.h> -LOG_SETUP(".distributor.callback.doc.update"); +LOG_SETUP(".distributor.operations.external.update"); using document::BucketSpace; diff --git a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp index ca47ab7478c..d34b9da0013 100644 --- a/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/visitoroperation.cpp @@ -15,7 +15,7 @@ #include <optional> #include <vespa/log/log.h> -LOG_SETUP(".visitoroperation"); +LOG_SETUP(".distributor.operations.external.visitor"); namespace storage::distributor { diff --git a/storage/src/vespa/storage/distributor/sentmessagemap.h b/storage/src/vespa/storage/distributor/sentmessagemap.h index 1ed7a4cd8c2..70bee311f78 100644 --- a/storage/src/vespa/storage/distributor/sentmessagemap.h +++ b/storage/src/vespa/storage/distributor/sentmessagemap.h @@ -8,8 +8,7 @@ namespace storage::distributor { class Operation; -class SentMessageMap -{ +class SentMessageMap { public: SentMessageMap(); ~SentMessageMap(); diff --git a/storage/src/vespa/storageapi/message/persistence.h b/storage/src/vespa/storageapi/message/persistence.h index e6f2023fa48..0607f3792f3 100644 --- a/storage/src/vespa/storageapi/message/persistence.h +++ b/storage/src/vespa/storageapi/message/persistence.h @@ -27,6 +27,7 @@ public: ~TestAndSetCommand() override; void setCondition(const TestAndSetCondition & condition) { _condition = condition; } + void clear_condition() { _condition = TestAndSetCondition(); } const TestAndSetCondition & getCondition() const { return _condition; } bool hasTestAndSetCondition() const noexcept override { return _condition.isPresent(); } |