aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-05-11 15:59:40 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-05-12 11:08:13 +0000
commit47a72589eed7fd6b538345962127da87e3eb71c3 (patch)
treeb1ac3f1a20f16e8a0aa628f8368b3eafa80a2780
parentbef1950a75be8b256df07ca5ef6aacd1731c5ef9 (diff)
Wire MessageBus reply traces through conditional Put pipeline
`CheckCondition::Outcome` now exposes the resulting trace (if any) of the operations that were sent as part of the condition probe. The outcome-accessor is changed to return a non-const reference to allow for moving away the trace payload and into a higher-level reply. Turns out GetOperation did not aggregate reply traces as expected, and PersistenceMessageTrackerImpl did not transfer trace state upon failures (only success case). This makes this commit bigger than initially expected, but trace coverage should now be improved on a general basis, not just for conditional Put operations.
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp25
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h9
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp31
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp49
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h30
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h31
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h8
12 files changed, 206 insertions, 58 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index d4b7d7019d7..1b5cede8af6 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -9,7 +9,8 @@
#include <vespa/storage/distributor/persistence_operation_metric_set.h>
#include <vespa/storageapi/message/persistence.h>
#include <tests/distributor/distributor_stripe_test_util.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
using namespace ::testing;
@@ -27,6 +28,7 @@ public:
BucketId _bucket_id{16, 1234};
TestAndSetCondition _tas_cond{"foo or bar"};
PersistenceOperationMetricSet _metrics{"dummy_metrics", nullptr};
+ uint32_t _trace_level{5};
CheckConditionTest();
~CheckConditionTest() override;
@@ -52,7 +54,8 @@ public:
auto bucket = Bucket(FixedBucketSpaces::default_space(), _bucket_id);
assert(_bucket_id.contains(doc_bucket));
return CheckCondition::create_if_inconsistent_replicas(bucket, bucket_space, _doc_id, _tas_cond,
- node_context(), operation_context(), _metrics);
+ node_context(), operation_context(), _metrics,
+ _trace_level);
}
std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) {
@@ -86,6 +89,12 @@ public:
return make_reply(*sent_get_command(cmd_idx), ts, true, false);
}
+ std::shared_ptr<api::GetReply> make_trace_reply(size_t cmd_idx, api::Timestamp ts, std::string trace_message) {
+ auto reply = make_reply(*sent_get_command(cmd_idx), ts, true, false);
+ MBUS_TRACE(reply->getTrace(), _trace_level, trace_message);
+ return reply;
+ }
+
std::shared_ptr<api::GetReply> make_failed_reply(size_t cmd_idx) {
auto reply = make_reply(*sent_get_command(cmd_idx), 0, false, false);
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "did a bork"));
@@ -149,6 +158,7 @@ TEST_F(CheckConditionTest, starting_sends_condition_probe_gets) {
EXPECT_EQ(cmd->condition(), _tas_cond);
EXPECT_EQ(cmd->getFieldSet(), NoFields::NAME);
EXPECT_EQ(cmd->internal_read_consistency(), api::InternalReadConsistency::Strong);
+ EXPECT_EQ(cmd->getTrace().getLevel(), _trace_level);
}
TEST_F(CheckConditionTest, condition_matching_completes_check_with_match_outcome) {
@@ -232,4 +242,15 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_trace_reply(0, 100, "hello"));
+ cond.handle_reply(_sender, make_trace_reply(1, 200, "world"));
+ }, [&](auto& outcome) {
+ auto trace_str = outcome.trace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("hello"));
+ EXPECT_THAT(trace_str, HasSubstr("world"));
+ });
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 707418512f9..9963b2c96b4 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -220,6 +220,15 @@ public:
return cmd;
}
+ template <typename ReplyType>
+ requires std::is_base_of_v<api::StorageReply, ReplyType>
+ [[nodiscard]] std::shared_ptr<ReplyType> sent_reply(size_t idx) {
+ assert(idx < _sender.replies().size());
+ auto reply = std::dynamic_pointer_cast<ReplyType>(_sender.reply(idx));
+ assert(reply != nullptr);
+ return reply;
+ }
+
void config_enable_condition_probing(bool enable);
void tag_content_node_supports_condition_probing(uint16_t index, bool supported);
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 6601fcabbeb..600a4de462e 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -16,8 +16,9 @@
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <iomanip>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
using config::ConfigGetter;
using config::FileSpec;
@@ -75,7 +76,8 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
std::string authorVal,
uint32_t timestamp,
bool is_tombstone = false,
- bool condition_matched = false)
+ bool condition_matched = false,
+ std::string trace_msg = "")
{
if (idx == LastCommand) {
idx = _sender.commands().size() - 1;
@@ -98,6 +100,9 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
auto reply = std::make_shared<api::GetReply>(*tmp, doc, timestamp, false, is_tombstone, condition_matched);
reply->setResult(result);
+ if (!trace_msg.empty()) {
+ MBUS_TRACE(reply->getTrace(), 1, trace_msg);
+ }
op->receive(_sender, reply);
}
@@ -110,6 +115,10 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
sendReply(idx, api::ReturnCode::OK, "", timestamp, false, condition_match);
}
+ void reply_with_trace(uint32_t idx, uint32_t timestamp, std::string trace_message) {
+ sendReply(idx, api::ReturnCode::OK, "", timestamp, false, true, std::move(trace_message));
+ }
+
void replyWithFailure() {
sendReply(LastCommand, api::ReturnCode::IO_FAILURE, "", 0);
}
@@ -682,6 +691,7 @@ void GetOperationTest::set_up_condition_match_get_operation() {
TestAndSetCondition my_cond("my_cool_condition");
auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME);
msg->set_condition(my_cond);
+ msg->getTrace().setLevel(9); // FIXME a very tiny bit dirty to set this here >_>
start_operation(std::move(msg), api::InternalReadConsistency::Strong);
ASSERT_EQ("Get => 0,Get => 2,Get => 1", _sender.getCommands(true));
@@ -742,4 +752,21 @@ TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica
EXPECT_EQ(replica_of(api::Timestamp(500), bucketId, 2, true, false), *op->newest_replica());
}
+TEST_F(GetOperationTest, trace_is_aggregated_from_all_sub_replies_and_propagated_to_operation_reply) {
+ ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation());
+
+ ASSERT_NO_FATAL_FAILURE(reply_with_trace(0, 400, "foo"));
+ ASSERT_NO_FATAL_FAILURE(reply_with_trace(1, 500, "bar"));
+ ASSERT_NO_FATAL_FAILURE(reply_with_trace(2, 300, "baz"));
+
+ ASSERT_EQ(_sender.replies().size(), 1);
+ auto get_reply = sent_reply<api::GetReply>(0);
+
+ auto trace_str = get_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("foo"));
+ EXPECT_THAT(trace_str, HasSubstr("bar"));
+ EXPECT_THAT(trace_str, HasSubstr("baz"));
+}
+
+
}
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 405f25f9359..ff375e5b902 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -10,20 +10,18 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/config/helper/configgetter.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
-using std::shared_ptr;
using config::ConfigGetter;
using config::FileSpec;
using vespalib::string;
using namespace document;
-using namespace storage;
-using namespace storage::api;
-using namespace storage::lib;
using namespace std::literals::string_literals;
using document::test::makeDocumentBucket;
+using storage::api::TestAndSetCondition;
using namespace ::testing;
@@ -493,7 +491,8 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
- PutReply* sreply = (PutReply*)reply.get();
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
op->receive(_sender, reply);
@@ -690,6 +689,7 @@ void PutOperationTest::set_up_tas_put_with_2_inconsistent_replica_nodes(bool cre
auto put = createPut(doc);
put->setCondition(TestAndSetCondition("test.foo"));
put->set_create_if_non_existent(create);
+ put->getTrace().setLevel(9);
sendPut(std::move(put));
ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
}
@@ -821,4 +821,41 @@ TEST_F(PutOperationTest, conditional_put_no_replicas_case_with_create_set_acts_a
EXPECT_TRUE(put_n1->get_create_if_non_existent());
}
+TEST_F(PutOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ ASSERT_EQ(sent_get_command(0)->getTrace().getLevel(), 9);
+ auto get_reply = make_get_reply(*sent_get_command(0), 50, false, true);
+ MBUS_TRACE(get_reply->getTrace(), 1, "a foo walks into a bar");
+
+ op->receive(_sender, get_reply);
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0,Put => 1,Put => 0", _sender.getCommands(true));
+ sendReply(2);
+ sendReply(3);
+ ASSERT_EQ(_sender.replies().size(), 1);
+ auto put_reply = sent_reply<api::PutReply>(0);
+
+ auto trace_str = put_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("a foo walks into a bar"));
+}
+
+TEST_F(PutOperationTest, trace_is_propagated_from_condition_probe_gets_failed_probe_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ auto get_reply = make_get_reply(*sent_get_command(0), 50, false, false);
+ MBUS_TRACE(get_reply->getTrace(), 1, "a foo walks into a zoo");
+
+ op->receive(_sender, get_reply);
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ(_sender.replies().size(), 1);
+ auto put_reply = sent_reply<api::PutReply>(0);
+
+ auto trace_str = put_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("a foo walks into a zoo"));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index 7f7e6b08c1f..9f7dbcaa132 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -16,6 +16,29 @@ LOG_SETUP(".distributor.operations.external.check_condition");
namespace storage::distributor {
+CheckCondition::Outcome::Outcome(api::ReturnCode error_code, vespalib::Trace trace) noexcept
+ : _error_code(std::move(error_code)),
+ _result(Result::HasError),
+ _trace(std::move(trace))
+{
+}
+
+CheckCondition::Outcome::Outcome(Result result, vespalib::Trace trace) noexcept
+ : _error_code(),
+ _result(result),
+ _trace(std::move(trace))
+{
+}
+
+CheckCondition::Outcome::Outcome(Result result) noexcept
+ : _error_code(),
+ _result(result),
+ _trace()
+{
+}
+
+CheckCondition::Outcome::~Outcome() = default;
+
CheckCondition::CheckCondition(Outcome known_outcome,
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
@@ -36,6 +59,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& metric,
+ uint32_t trace_level,
private_ctor_tag)
: _doc_id_bucket(bucket),
_bucket_space(bucket_space),
@@ -48,6 +72,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
// Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID.
auto get_cmd = std::make_shared<api::GetCommand>(_doc_id_bucket, doc_id, document::NoFields::NAME);
get_cmd->set_condition(tas_condition);
+ get_cmd->getTrace().setLevel(trace_level);
_cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space,
_bucket_space.getBucketDatabase().acquire_read_guard(),
std::move(get_cmd),
@@ -129,7 +154,8 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
if (reply->getResult().success()) {
if (_cond_get_op->any_replicas_failed()) {
_outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED,
- "One or more replicas failed during test-and-set condition evaluation"));
+ "One or more replicas failed during test-and-set condition evaluation"),
+ reply->steal_trace());
return;
}
const auto state_version_now = _bucket_space.getClusterState().getVersion();
@@ -143,13 +169,14 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
// explicitly edge-handled...!
_outcome.emplace(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
"Bucket ownership or replica set changed between condition "
- "read and operation write phases"));
+ "read and operation write phases"),
+ reply->steal_trace());
} else {
auto maybe_newest = _cond_get_op->newest_replica();
- _outcome.emplace(newest_replica_to_outcome(maybe_newest));
+ _outcome.emplace(newest_replica_to_outcome(maybe_newest), reply->steal_trace());
}
} else {
- _outcome.emplace(reply->getResult());
+ _outcome.emplace(reply->getResult(), reply->steal_trace());
}
}
@@ -193,7 +220,8 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
const documentapi::TestAndSetCondition& tas_condition,
const DistributorNodeContext& node_ctx,
const DistributorStripeOperationContext& op_ctx,
- PersistenceOperationMetricSet& metric)
+ PersistenceOperationMetricSet& metric,
+ uint32_t trace_level)
{
// TODO move this check to the caller?
if (!op_ctx.distributor_config().enable_condition_probing()) {
@@ -210,7 +238,7 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
return {}; // Want write-repair, but one or more nodes are too old to use the feature
}
return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space,
- node_ctx, metric, private_ctor_tag{});
+ node_ctx, metric, trace_level, private_ctor_tag{});
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
index ea3bfff2e3e..2a659c55081 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -60,17 +60,10 @@ public:
NotFound,
};
- explicit Outcome(api::ReturnCode error_code) noexcept
- : _error_code(std::move(error_code)),
- _result(Result::HasError)
- {
- }
-
- explicit Outcome(Result result) noexcept
- : _error_code(),
- _result(result)
- {
- }
+ Outcome(api::ReturnCode error_code, vespalib::Trace trace) noexcept;
+ Outcome(Result result, vespalib::Trace trace) noexcept;
+ explicit Outcome(Result result) noexcept;
+ ~Outcome();
[[nodiscard]] bool failed() const noexcept {
return _error_code.failed();
@@ -88,9 +81,18 @@ public:
return _result == Result::NotFound;
}
+ [[nodiscard]] const vespalib::Trace& trace() const noexcept {
+ return _trace;
+ }
+
+ [[nodiscard]] vespalib::Trace&& steal_trace() noexcept {
+ return std::move(_trace);
+ }
+
private:
api::ReturnCode _error_code;
Result _result;
+ vespalib::Trace _trace;
};
private:
const document::Bucket _doc_id_bucket;
@@ -113,6 +115,7 @@ public:
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& metric,
+ uint32_t trace_level,
private_ctor_tag);
~CheckCondition();
@@ -121,7 +124,7 @@ public:
const std::shared_ptr<api::StorageReply>& reply);
void cancel(DistributorStripeMessageSender& sender);
- [[nodiscard]] const std::optional<Outcome>& maybe_outcome() const noexcept {
+ [[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept {
return _outcome;
}
@@ -132,7 +135,8 @@ public:
const documentapi::TestAndSetCondition& tas_condition,
const DistributorNodeContext& node_ctx,
const DistributorStripeOperationContext& op_ctx,
- PersistenceOperationMetricSet& metric);
+ PersistenceOperationMetricSet& metric,
+ uint32_t trace_level = 0); // TODO remove default value
private:
[[nodiscard]] bool replica_set_changed_after_get_operation() const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 72943eaf713..a261a898283 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -63,6 +63,7 @@ GetOperation::GetOperation(const DistributorNodeContext& node_ctx,
_newest_replica(),
_metric(metric),
_operationTimer(node_ctx.clock()),
+ _trace(_msg->getTrace().getLevel()),
_desired_read_consistency(desired_read_consistency),
_has_replica_inconsistency(false),
_any_replicas_failed(false)
@@ -156,7 +157,7 @@ GetOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
LOG(debug, "Received %s", msg->toString(true).c_str());
- _msg->getTrace().addChild(getreply->steal_trace());
+ _trace.addChild(getreply->steal_trace()); // TODO sweet, sweet nectar of distributed tracing
bool allDone = true;
for (auto& response : _responses) {
for (uint32_t i = 0; i < response.second.size(); i++) {
@@ -247,6 +248,10 @@ GetOperation::sendReply(DistributorStripeMessageSender& sender)
const auto timestamp = (newest.is_tombstone ? api::Timestamp(0) : newest.timestamp);
auto repl = std::make_shared<api::GetReply>(*_msg, _doc, timestamp, !_has_replica_inconsistency);
repl->setResult(_returnCode);
+ if (!_trace.isEmpty()) {
+ _trace.setStrict(false);
+ repl->getTrace().addChild(std::move(_trace));
+ }
update_internal_metrics();
sender.sendReply(repl);
_msg.reset();
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 9d6b9f1986d..a6de9370dcb 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -88,28 +88,25 @@ private:
};
using GroupVector = std::vector<BucketChecksumGroup>;
+ using DbReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>;
// Organize the different copies by bucket/checksum pairs. We should
// try to request GETs from each bucket and each different checksum
// within that bucket.
- std::map<GroupId, GroupVector> _responses;
-
- const DistributorNodeContext& _node_ctx;
- const DistributorBucketSpace &_bucketSpace;
-
- std::shared_ptr<api::GetCommand> _msg;
-
- api::ReturnCode _returnCode;
+ std::map<GroupId, GroupVector> _responses;
+ const DistributorNodeContext& _node_ctx;
+ const DistributorBucketSpace& _bucketSpace;
+ std::shared_ptr<api::GetCommand> _msg;
+ api::ReturnCode _returnCode;
std::shared_ptr<document::Document> _doc;
-
- std::optional<NewestReplica> _newest_replica;
-
- PersistenceOperationMetricSet& _metric;
- framework::MilliSecTimer _operationTimer;
- std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
- api::InternalReadConsistency _desired_read_consistency;
- bool _has_replica_inconsistency;
- bool _any_replicas_failed;
+ std::optional<NewestReplica> _newest_replica;
+ PersistenceOperationMetricSet& _metric;
+ framework::MilliSecTimer _operationTimer;
+ DbReplicaState _replicas_in_db;
+ vespalib::Trace _trace;
+ api::InternalReadConsistency _desired_read_consistency;
+ bool _has_replica_inconsistency;
+ bool _any_replicas_failed;
void sendReply(DistributorStripeMessageSender& sender);
bool sendForChecksum(DistributorStripeMessageSender& sender, const document::BucketId& id, GroupVector& res);
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 96252c37844..952aeff0800 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -156,13 +156,13 @@ void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender)
document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id);
_check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(),
_msg->getCondition(), _node_ctx, _op_ctx,
- _temp_metric);
+ _temp_metric, _msg->getTrace().getLevel());
if (!_check_condition) {
start_direct_put_dispatch(sender);
} else {
// Inconsistent replicas; need write repair
_check_condition->start_and_send(sender);
- const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately
+ auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately
if (outcome) {
on_completed_check_condition(*outcome, sender);
}
@@ -265,7 +265,7 @@ PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
_tracker.receiveReply(sender, dynamic_cast<api::BucketInfoReply&>(*msg));
} else {
_check_condition->handle_reply(sender, msg);
- const auto& outcome = _check_condition->maybe_outcome();
+ auto& outcome = _check_condition->maybe_outcome();
if (!outcome) {
return; // Condition check not done yet
}
@@ -274,9 +274,12 @@ PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
}
void
-PutOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome,
+PutOperation::on_completed_check_condition(CheckCondition::Outcome& outcome,
DistributorStripeMessageSender& sender)
{
+ if (!outcome.trace().isEmpty()) {
+ _tracker.add_trace_tree_to_reply(outcome.steal_trace());
+ }
const bool effectively_matched = (outcome.matched_condition()
|| (outcome.not_found() && _msg->get_create_if_non_existent()));
if (effectively_matched) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index d80e9cc00d7..6befb8d3e38 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -50,7 +50,7 @@ private:
void start_direct_put_dispatch(DistributorStripeMessageSender& sender);
void start_conditional_put(DistributorStripeMessageSender& sender);
- void on_completed_check_condition(const CheckCondition::Outcome& outcome,
+ void on_completed_check_condition(CheckCondition::Outcome& outcome,
DistributorStripeMessageSender& sender);
void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
const api::StorageCommand& originalCommand,
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index 08b99cf89a8..a30663bde2f 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -58,6 +58,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode
if (_reply.get()) {
_reply->setResult(result);
updateMetrics();
+ transfer_trace_state_to_reply();
sender.sendReply(_reply);
_reply.reset();
}
@@ -222,10 +223,7 @@ PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
}
updateMetrics();
- if ( ! _trace.isEmpty()) {
- _trace.setStrict(false);
- _reply->getTrace().addChild(std::move(_trace));
- }
+ transfer_trace_state_to_reply();
sender.sendReply(_reply);
_reply = std::shared_ptr<api::BucketInfoReply>();
@@ -288,6 +286,15 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(
}
void
+PersistenceMessageTrackerImpl::transfer_trace_state_to_reply()
+{
+ if (!_trace.isEmpty()) {
+ _trace.setStrict(false);
+ _reply->getTrace().addChild(std::move(_trace));
+ }
+}
+
+void
PersistenceMessageTrackerImpl::updateFromReply(
MessageSender& sender,
api::BucketInfoReply& reply,
@@ -318,4 +325,10 @@ PersistenceMessageTrackerImpl::updateFromReply(
}
}
+void
+PersistenceMessageTrackerImpl::add_trace_tree_to_reply(vespalib::Trace trace)
+{
+ _trace.addChild(std::move(trace));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index ca330859259..923ecf45649 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -23,10 +23,12 @@ struct PersistenceMessageTracker {
virtual void queueCommand(api::BucketCommand::SP, uint16_t target) = 0;
virtual void flushQueue(MessageSender&) = 0;
virtual uint16_t handleReply(api::BucketReply& reply) = 0;
+ virtual void add_trace_tree_to_reply(vespalib::Trace trace) = 0;
};
-class PersistenceMessageTrackerImpl : public PersistenceMessageTracker,
- public MessageTracker
+class PersistenceMessageTrackerImpl final
+ : public PersistenceMessageTracker,
+ public MessageTracker
{
private:
using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
@@ -92,12 +94,14 @@ private:
void updateFailureResult(const api::BucketInfoReply& reply);
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
+ void transfer_trace_state_to_reply();
void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) override {
MessageTracker::queueCommand(std::move(msg), target);
}
void flushQueue(MessageSender& s) override { MessageTracker::flushQueue(s); }
uint16_t handleReply(api::BucketReply& r) override { return MessageTracker::handleReply(r); }
+ void add_trace_tree_to_reply(vespalib::Trace trace) override;
};
}