diff options
author | Henning Baldersheim <balder@yahoo-inc.com> | 2023-05-12 18:09:52 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-12 18:09:52 +0200 |
commit | 9ea2404553422d5029d96bba84f8916bc4f06343 (patch) | |
tree | 0c23684663cee7a04c729840bb9904069f95948f | |
parent | aff2a48b41d3014883f450ddbcafe5df58f8cbe4 (diff) | |
parent | 47a72589eed7fd6b538345962127da87e3eb71c3 (diff) |
Merge pull request #27092 from vespa-engine/vekterli/expose-reply-traces-from-get-ops-and-condition-probes
Wire MessageBus reply traces through conditional Put pipeline
12 files changed, 206 insertions, 58 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp index d4b7d7019d7..1b5cede8af6 100644 --- a/storage/src/tests/distributor/check_condition_test.cpp +++ b/storage/src/tests/distributor/check_condition_test.cpp @@ -9,7 +9,8 @@ #include <vespa/storage/distributor/persistence_operation_metric_set.h> #include <vespa/storageapi/message/persistence.h> #include <tests/distributor/distributor_stripe_test_util.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> +#include <gmock/gmock.h> using namespace ::testing; @@ -27,6 +28,7 @@ public: BucketId _bucket_id{16, 1234}; TestAndSetCondition _tas_cond{"foo or bar"}; PersistenceOperationMetricSet _metrics{"dummy_metrics", nullptr}; + uint32_t _trace_level{5}; CheckConditionTest(); ~CheckConditionTest() override; @@ -52,7 +54,8 @@ public: auto bucket = Bucket(FixedBucketSpaces::default_space(), _bucket_id); assert(_bucket_id.contains(doc_bucket)); return CheckCondition::create_if_inconsistent_replicas(bucket, bucket_space, _doc_id, _tas_cond, - node_context(), operation_context(), _metrics); + node_context(), operation_context(), _metrics, + _trace_level); } std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) { @@ -86,6 +89,12 @@ public: return make_reply(*sent_get_command(cmd_idx), ts, true, false); } + std::shared_ptr<api::GetReply> make_trace_reply(size_t cmd_idx, api::Timestamp ts, std::string trace_message) { + auto reply = make_reply(*sent_get_command(cmd_idx), ts, true, false); + MBUS_TRACE(reply->getTrace(), _trace_level, trace_message); + return reply; + } + std::shared_ptr<api::GetReply> make_failed_reply(size_t cmd_idx) { auto reply = make_reply(*sent_get_command(cmd_idx), 0, false, false); reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "did a bork")); @@ -149,6 +158,7 @@ TEST_F(CheckConditionTest, starting_sends_condition_probe_gets) { EXPECT_EQ(cmd->condition(), _tas_cond); EXPECT_EQ(cmd->getFieldSet(), NoFields::NAME); EXPECT_EQ(cmd->internal_read_consistency(), api::InternalReadConsistency::Strong); + EXPECT_EQ(cmd->getTrace().getLevel(), _trace_level); } TEST_F(CheckConditionTest, condition_matching_completes_check_with_match_outcome) { @@ -232,4 +242,15 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_ }); } +TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_trace_reply(0, 100, "hello")); + cond.handle_reply(_sender, make_trace_reply(1, 200, "world")); + }, [&](auto& outcome) { + auto trace_str = outcome.trace().toString(); + EXPECT_THAT(trace_str, HasSubstr("hello")); + EXPECT_THAT(trace_str, HasSubstr("world")); + }); +} + } diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h index 707418512f9..9963b2c96b4 100644 --- a/storage/src/tests/distributor/distributor_stripe_test_util.h +++ b/storage/src/tests/distributor/distributor_stripe_test_util.h @@ -220,6 +220,15 @@ public: return cmd; } + template <typename ReplyType> + requires std::is_base_of_v<api::StorageReply, ReplyType> + [[nodiscard]] std::shared_ptr<ReplyType> sent_reply(size_t idx) { + assert(idx < _sender.replies().size()); + auto reply = std::dynamic_pointer_cast<ReplyType>(_sender.reply(idx)); + assert(reply != nullptr); + return reply; + } + void config_enable_condition_probing(bool enable); void tag_content_node_supports_condition_probing(uint16_t index, bool supported); diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp index 6601fcabbeb..600a4de462e 100644 --- a/storage/src/tests/distributor/getoperationtest.cpp +++ b/storage/src/tests/distributor/getoperationtest.cpp @@ -16,8 +16,9 @@ #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/operations/external/getoperation.h> #include <vespa/storageapi/message/persistence.h> -#include <vespa/vespalib/gtest/gtest.h> #include <iomanip> +#include <gtest/gtest.h> +#include <gmock/gmock.h> using config::ConfigGetter; using config::FileSpec; @@ -75,7 +76,8 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { std::string authorVal, uint32_t timestamp, bool is_tombstone = false, - bool condition_matched = false) + bool condition_matched = false, + std::string trace_msg = "") { if (idx == LastCommand) { idx = _sender.commands().size() - 1; @@ -98,6 +100,9 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { auto reply = std::make_shared<api::GetReply>(*tmp, doc, timestamp, false, is_tombstone, condition_matched); reply->setResult(result); + if (!trace_msg.empty()) { + MBUS_TRACE(reply->getTrace(), 1, trace_msg); + } op->receive(_sender, reply); } @@ -110,6 +115,10 @@ struct GetOperationTest : Test, DistributorStripeTestUtil { sendReply(idx, api::ReturnCode::OK, "", timestamp, false, condition_match); } + void reply_with_trace(uint32_t idx, uint32_t timestamp, std::string trace_message) { + sendReply(idx, api::ReturnCode::OK, "", timestamp, false, true, std::move(trace_message)); + } + void replyWithFailure() { sendReply(LastCommand, api::ReturnCode::IO_FAILURE, "", 0); } @@ -682,6 +691,7 @@ void GetOperationTest::set_up_condition_match_get_operation() { TestAndSetCondition my_cond("my_cool_condition"); auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME); msg->set_condition(my_cond); + msg->getTrace().setLevel(9); // FIXME a very tiny bit dirty to set this here >_> start_operation(std::move(msg), api::InternalReadConsistency::Strong); ASSERT_EQ("Get => 0,Get => 2,Get => 1", _sender.getCommands(true)); @@ -742,4 +752,21 @@ TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica EXPECT_EQ(replica_of(api::Timestamp(500), bucketId, 2, true, false), *op->newest_replica()); } +TEST_F(GetOperationTest, trace_is_aggregated_from_all_sub_replies_and_propagated_to_operation_reply) { + ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation()); + + ASSERT_NO_FATAL_FAILURE(reply_with_trace(0, 400, "foo")); + ASSERT_NO_FATAL_FAILURE(reply_with_trace(1, 500, "bar")); + ASSERT_NO_FATAL_FAILURE(reply_with_trace(2, 300, "baz")); + + ASSERT_EQ(_sender.replies().size(), 1); + auto get_reply = sent_reply<api::GetReply>(0); + + auto trace_str = get_reply->getTrace().toString(); + EXPECT_THAT(trace_str, HasSubstr("foo")); + EXPECT_THAT(trace_str, HasSubstr("bar")); + EXPECT_THAT(trace_str, HasSubstr("baz")); +} + + } diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 405f25f9359..ff375e5b902 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -10,20 +10,18 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/gtest/gtest.h> #include <vespa/vespalib/text/stringtokenizer.h> #include <vespa/config/helper/configgetter.h> +#include <gtest/gtest.h> +#include <gmock/gmock.h> -using std::shared_ptr; using config::ConfigGetter; using config::FileSpec; using vespalib::string; using namespace document; -using namespace storage; -using namespace storage::api; -using namespace storage::lib; using namespace std::literals::string_literals; using document::test::makeDocumentBucket; +using storage::api::TestAndSetCondition; using namespace ::testing; @@ -493,7 +491,8 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { { std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release()); - PutReply* sreply = (PutReply*)reply.get(); + auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); + ASSERT_TRUE(sreply); sreply->remapBucketId(document::BucketId(17, 13)); sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5)); op->receive(_sender, reply); @@ -690,6 +689,7 @@ void PutOperationTest::set_up_tas_put_with_2_inconsistent_replica_nodes(bool cre auto put = createPut(doc); put->setCondition(TestAndSetCondition("test.foo")); put->set_create_if_non_existent(create); + put->getTrace().setLevel(9); sendPut(std::move(put)); ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); } @@ -821,4 +821,41 @@ TEST_F(PutOperationTest, conditional_put_no_replicas_case_with_create_set_acts_a EXPECT_TRUE(put_n1->get_create_if_non_existent()); } +TEST_F(PutOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + ASSERT_EQ(sent_get_command(0)->getTrace().getLevel(), 9); + auto get_reply = make_get_reply(*sent_get_command(0), 50, false, true); + MBUS_TRACE(get_reply->getTrace(), 1, "a foo walks into a bar"); + + op->receive(_sender, get_reply); + op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, true)); + + ASSERT_EQ("Get => 1,Get => 0,Put => 1,Put => 0", _sender.getCommands(true)); + sendReply(2); + sendReply(3); + ASSERT_EQ(_sender.replies().size(), 1); + auto put_reply = sent_reply<api::PutReply>(0); + + auto trace_str = put_reply->getTrace().toString(); + EXPECT_THAT(trace_str, HasSubstr("a foo walks into a bar")); +} + +TEST_F(PutOperationTest, trace_is_propagated_from_condition_probe_gets_failed_probe_case) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + auto get_reply = make_get_reply(*sent_get_command(0), 50, false, false); + MBUS_TRACE(get_reply->getTrace(), 1, "a foo walks into a zoo"); + + op->receive(_sender, get_reply); + op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ(_sender.replies().size(), 1); + auto put_reply = sent_reply<api::PutReply>(0); + + auto trace_str = put_reply->getTrace().toString(); + EXPECT_THAT(trace_str, HasSubstr("a foo walks into a zoo")); +} + } diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp index 7f7e6b08c1f..9f7dbcaa132 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp @@ -16,6 +16,29 @@ LOG_SETUP(".distributor.operations.external.check_condition"); namespace storage::distributor { +CheckCondition::Outcome::Outcome(api::ReturnCode error_code, vespalib::Trace trace) noexcept + : _error_code(std::move(error_code)), + _result(Result::HasError), + _trace(std::move(trace)) +{ +} + +CheckCondition::Outcome::Outcome(Result result, vespalib::Trace trace) noexcept + : _error_code(), + _result(result), + _trace(std::move(trace)) +{ +} + +CheckCondition::Outcome::Outcome(Result result) noexcept + : _error_code(), + _result(result), + _trace() +{ +} + +CheckCondition::Outcome::~Outcome() = default; + CheckCondition::CheckCondition(Outcome known_outcome, const DistributorBucketSpace& bucket_space, const DistributorNodeContext& node_ctx, @@ -36,6 +59,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket, const DistributorBucketSpace& bucket_space, const DistributorNodeContext& node_ctx, PersistenceOperationMetricSet& metric, + uint32_t trace_level, private_ctor_tag) : _doc_id_bucket(bucket), _bucket_space(bucket_space), @@ -48,6 +72,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket, // Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID. auto get_cmd = std::make_shared<api::GetCommand>(_doc_id_bucket, doc_id, document::NoFields::NAME); get_cmd->set_condition(tas_condition); + get_cmd->getTrace().setLevel(trace_level); _cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space, _bucket_space.getBucketDatabase().acquire_read_guard(), std::move(get_cmd), @@ -129,7 +154,8 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St if (reply->getResult().success()) { if (_cond_get_op->any_replicas_failed()) { _outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED, - "One or more replicas failed during test-and-set condition evaluation")); + "One or more replicas failed during test-and-set condition evaluation"), + reply->steal_trace()); return; } const auto state_version_now = _bucket_space.getClusterState().getVersion(); @@ -143,13 +169,14 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St // explicitly edge-handled...! _outcome.emplace(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND, "Bucket ownership or replica set changed between condition " - "read and operation write phases")); + "read and operation write phases"), + reply->steal_trace()); } else { auto maybe_newest = _cond_get_op->newest_replica(); - _outcome.emplace(newest_replica_to_outcome(maybe_newest)); + _outcome.emplace(newest_replica_to_outcome(maybe_newest), reply->steal_trace()); } } else { - _outcome.emplace(reply->getResult()); + _outcome.emplace(reply->getResult(), reply->steal_trace()); } } @@ -193,7 +220,8 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket, const documentapi::TestAndSetCondition& tas_condition, const DistributorNodeContext& node_ctx, const DistributorStripeOperationContext& op_ctx, - PersistenceOperationMetricSet& metric) + PersistenceOperationMetricSet& metric, + uint32_t trace_level) { // TODO move this check to the caller? if (!op_ctx.distributor_config().enable_condition_probing()) { @@ -210,7 +238,7 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket, return {}; // Want write-repair, but one or more nodes are too old to use the feature } return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space, - node_ctx, metric, private_ctor_tag{}); + node_ctx, metric, trace_level, private_ctor_tag{}); } } diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h index ea3bfff2e3e..2a659c55081 100644 --- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h +++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h @@ -60,17 +60,10 @@ public: NotFound, }; - explicit Outcome(api::ReturnCode error_code) noexcept - : _error_code(std::move(error_code)), - _result(Result::HasError) - { - } - - explicit Outcome(Result result) noexcept - : _error_code(), - _result(result) - { - } + Outcome(api::ReturnCode error_code, vespalib::Trace trace) noexcept; + Outcome(Result result, vespalib::Trace trace) noexcept; + explicit Outcome(Result result) noexcept; + ~Outcome(); [[nodiscard]] bool failed() const noexcept { return _error_code.failed(); @@ -88,9 +81,18 @@ public: return _result == Result::NotFound; } + [[nodiscard]] const vespalib::Trace& trace() const noexcept { + return _trace; + } + + [[nodiscard]] vespalib::Trace&& steal_trace() noexcept { + return std::move(_trace); + } + private: api::ReturnCode _error_code; Result _result; + vespalib::Trace _trace; }; private: const document::Bucket _doc_id_bucket; @@ -113,6 +115,7 @@ public: const DistributorBucketSpace& bucket_space, const DistributorNodeContext& node_ctx, PersistenceOperationMetricSet& metric, + uint32_t trace_level, private_ctor_tag); ~CheckCondition(); @@ -121,7 +124,7 @@ public: const std::shared_ptr<api::StorageReply>& reply); void cancel(DistributorStripeMessageSender& sender); - [[nodiscard]] const std::optional<Outcome>& maybe_outcome() const noexcept { + [[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept { return _outcome; } @@ -132,7 +135,8 @@ public: const documentapi::TestAndSetCondition& tas_condition, const DistributorNodeContext& node_ctx, const DistributorStripeOperationContext& op_ctx, - PersistenceOperationMetricSet& metric); + PersistenceOperationMetricSet& metric, + uint32_t trace_level = 0); // TODO remove default value private: [[nodiscard]] bool replica_set_changed_after_get_operation() const; diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp index 72943eaf713..a261a898283 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp @@ -63,6 +63,7 @@ GetOperation::GetOperation(const DistributorNodeContext& node_ctx, _newest_replica(), _metric(metric), _operationTimer(node_ctx.clock()), + _trace(_msg->getTrace().getLevel()), _desired_read_consistency(desired_read_consistency), _has_replica_inconsistency(false), _any_replicas_failed(false) @@ -156,7 +157,7 @@ GetOperation::onReceive(DistributorStripeMessageSender& sender, const std::share LOG(debug, "Received %s", msg->toString(true).c_str()); - _msg->getTrace().addChild(getreply->steal_trace()); + _trace.addChild(getreply->steal_trace()); // TODO sweet, sweet nectar of distributed tracing bool allDone = true; for (auto& response : _responses) { for (uint32_t i = 0; i < response.second.size(); i++) { @@ -247,6 +248,10 @@ GetOperation::sendReply(DistributorStripeMessageSender& sender) const auto timestamp = (newest.is_tombstone ? api::Timestamp(0) : newest.timestamp); auto repl = std::make_shared<api::GetReply>(*_msg, _doc, timestamp, !_has_replica_inconsistency); repl->setResult(_returnCode); + if (!_trace.isEmpty()) { + _trace.setStrict(false); + repl->getTrace().addChild(std::move(_trace)); + } update_internal_metrics(); sender.sendReply(repl); _msg.reset(); diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h index 9d6b9f1986d..a6de9370dcb 100644 --- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h @@ -88,28 +88,25 @@ private: }; using GroupVector = std::vector<BucketChecksumGroup>; + using DbReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>; // Organize the different copies by bucket/checksum pairs. We should // try to request GETs from each bucket and each different checksum // within that bucket. - std::map<GroupId, GroupVector> _responses; - - const DistributorNodeContext& _node_ctx; - const DistributorBucketSpace &_bucketSpace; - - std::shared_ptr<api::GetCommand> _msg; - - api::ReturnCode _returnCode; + std::map<GroupId, GroupVector> _responses; + const DistributorNodeContext& _node_ctx; + const DistributorBucketSpace& _bucketSpace; + std::shared_ptr<api::GetCommand> _msg; + api::ReturnCode _returnCode; std::shared_ptr<document::Document> _doc; - - std::optional<NewestReplica> _newest_replica; - - PersistenceOperationMetricSet& _metric; - framework::MilliSecTimer _operationTimer; - std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db; - api::InternalReadConsistency _desired_read_consistency; - bool _has_replica_inconsistency; - bool _any_replicas_failed; + std::optional<NewestReplica> _newest_replica; + PersistenceOperationMetricSet& _metric; + framework::MilliSecTimer _operationTimer; + DbReplicaState _replicas_in_db; + vespalib::Trace _trace; + api::InternalReadConsistency _desired_read_consistency; + bool _has_replica_inconsistency; + bool _any_replicas_failed; void sendReply(DistributorStripeMessageSender& sender); bool sendForChecksum(DistributorStripeMessageSender& sender, const document::BucketId& id, GroupVector& res); diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp index 96252c37844..952aeff0800 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp @@ -156,13 +156,13 @@ void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender) document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id); _check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(), _msg->getCondition(), _node_ctx, _op_ctx, - _temp_metric); + _temp_metric, _msg->getTrace().getLevel()); if (!_check_condition) { start_direct_put_dispatch(sender); } else { // Inconsistent replicas; need write repair _check_condition->start_and_send(sender); - const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately + auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately if (outcome) { on_completed_check_condition(*outcome, sender); } @@ -265,7 +265,7 @@ PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::share _tracker.receiveReply(sender, dynamic_cast<api::BucketInfoReply&>(*msg)); } else { _check_condition->handle_reply(sender, msg); - const auto& outcome = _check_condition->maybe_outcome(); + auto& outcome = _check_condition->maybe_outcome(); if (!outcome) { return; // Condition check not done yet } @@ -274,9 +274,12 @@ PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::share } void -PutOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome, +PutOperation::on_completed_check_condition(CheckCondition::Outcome& outcome, DistributorStripeMessageSender& sender) { + if (!outcome.trace().isEmpty()) { + _tracker.add_trace_tree_to_reply(outcome.steal_trace()); + } const bool effectively_matched = (outcome.matched_condition() || (outcome.not_found() && _msg->get_create_if_non_existent())); if (effectively_matched) { diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h index d80e9cc00d7..6befb8d3e38 100644 --- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h +++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h @@ -50,7 +50,7 @@ private: void start_direct_put_dispatch(DistributorStripeMessageSender& sender); void start_conditional_put(DistributorStripeMessageSender& sender); - void on_completed_check_condition(const CheckCondition::Outcome& outcome, + void on_completed_check_condition(CheckCondition::Outcome& outcome, DistributorStripeMessageSender& sender); void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive, const api::StorageCommand& originalCommand, diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp index 08b99cf89a8..a30663bde2f 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp @@ -58,6 +58,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode if (_reply.get()) { _reply->setResult(result); updateMetrics(); + transfer_trace_state_to_reply(); sender.sendReply(_reply); _reply.reset(); } @@ -222,10 +223,7 @@ PersistenceMessageTrackerImpl::sendReply(MessageSender& sender) } updateMetrics(); - if ( ! _trace.isEmpty()) { - _trace.setStrict(false); - _reply->getTrace().addChild(std::move(_trace)); - } + transfer_trace_state_to_reply(); sender.sendReply(_reply); _reply = std::shared_ptr<api::BucketInfoReply>(); @@ -288,6 +286,15 @@ PersistenceMessageTrackerImpl::handlePersistenceReply( } void +PersistenceMessageTrackerImpl::transfer_trace_state_to_reply() +{ + if (!_trace.isEmpty()) { + _trace.setStrict(false); + _reply->getTrace().addChild(std::move(_trace)); + } +} + +void PersistenceMessageTrackerImpl::updateFromReply( MessageSender& sender, api::BucketInfoReply& reply, @@ -318,4 +325,10 @@ PersistenceMessageTrackerImpl::updateFromReply( } } +void +PersistenceMessageTrackerImpl::add_trace_tree_to_reply(vespalib::Trace trace) +{ + _trace.addChild(std::move(trace)); +} + } diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h index ca330859259..923ecf45649 100644 --- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h +++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h @@ -23,10 +23,12 @@ struct PersistenceMessageTracker { virtual void queueCommand(api::BucketCommand::SP, uint16_t target) = 0; virtual void flushQueue(MessageSender&) = 0; virtual uint16_t handleReply(api::BucketReply& reply) = 0; + virtual void add_trace_tree_to_reply(vespalib::Trace trace) = 0; }; -class PersistenceMessageTrackerImpl : public PersistenceMessageTracker, - public MessageTracker +class PersistenceMessageTrackerImpl final + : public PersistenceMessageTracker, + public MessageTracker { private: using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>; @@ -92,12 +94,14 @@ private: void updateFailureResult(const api::BucketInfoReply& reply); void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node); void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node); + void transfer_trace_state_to_reply(); void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) override { MessageTracker::queueCommand(std::move(msg), target); } void flushQueue(MessageSender& s) override { MessageTracker::flushQueue(s); } uint16_t handleReply(api::BucketReply& r) override { return MessageTracker::handleReply(r); } + void add_trace_tree_to_reply(vespalib::Trace trace) override; }; } |