aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorHenning Baldersheim <balder@yahoo-inc.com>2023-05-12 18:09:52 +0200
committerGitHub <noreply@github.com>2023-05-12 18:09:52 +0200
commit9ea2404553422d5029d96bba84f8916bc4f06343 (patch)
tree0c23684663cee7a04c729840bb9904069f95948f
parentaff2a48b41d3014883f450ddbcafe5df58f8cbe4 (diff)
parent47a72589eed7fd6b538345962127da87e3eb71c3 (diff)
Merge pull request #27092 from vespa-engine/vekterli/expose-reply-traces-from-get-ops-and-condition-probes
Wire MessageBus reply traces through conditional Put pipeline
-rw-r--r--storage/src/tests/distributor/check_condition_test.cpp25
-rw-r--r--storage/src/tests/distributor/distributor_stripe_test_util.h9
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp31
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp49
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.cpp40
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/check_condition.h30
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp7
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.h31
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.cpp11
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/putoperation.h2
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.cpp21
-rw-r--r--storage/src/vespa/storage/distributor/persistencemessagetracker.h8
12 files changed, 206 insertions, 58 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp
index d4b7d7019d7..1b5cede8af6 100644
--- a/storage/src/tests/distributor/check_condition_test.cpp
+++ b/storage/src/tests/distributor/check_condition_test.cpp
@@ -9,7 +9,8 @@
#include <vespa/storage/distributor/persistence_operation_metric_set.h>
#include <vespa/storageapi/message/persistence.h>
#include <tests/distributor/distributor_stripe_test_util.h>
-#include <vespa/vespalib/gtest/gtest.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
using namespace ::testing;
@@ -27,6 +28,7 @@ public:
BucketId _bucket_id{16, 1234};
TestAndSetCondition _tas_cond{"foo or bar"};
PersistenceOperationMetricSet _metrics{"dummy_metrics", nullptr};
+ uint32_t _trace_level{5};
CheckConditionTest();
~CheckConditionTest() override;
@@ -52,7 +54,8 @@ public:
auto bucket = Bucket(FixedBucketSpaces::default_space(), _bucket_id);
assert(_bucket_id.contains(doc_bucket));
return CheckCondition::create_if_inconsistent_replicas(bucket, bucket_space, _doc_id, _tas_cond,
- node_context(), operation_context(), _metrics);
+ node_context(), operation_context(), _metrics,
+ _trace_level);
}
std::shared_ptr<api::GetCommand> sent_get_command(size_t idx) {
@@ -86,6 +89,12 @@ public:
return make_reply(*sent_get_command(cmd_idx), ts, true, false);
}
+ std::shared_ptr<api::GetReply> make_trace_reply(size_t cmd_idx, api::Timestamp ts, std::string trace_message) {
+ auto reply = make_reply(*sent_get_command(cmd_idx), ts, true, false);
+ MBUS_TRACE(reply->getTrace(), _trace_level, trace_message);
+ return reply;
+ }
+
std::shared_ptr<api::GetReply> make_failed_reply(size_t cmd_idx) {
auto reply = make_reply(*sent_get_command(cmd_idx), 0, false, false);
reply->setResult(api::ReturnCode(api::ReturnCode::ABORTED, "did a bork"));
@@ -149,6 +158,7 @@ TEST_F(CheckConditionTest, starting_sends_condition_probe_gets) {
EXPECT_EQ(cmd->condition(), _tas_cond);
EXPECT_EQ(cmd->getFieldSet(), NoFields::NAME);
EXPECT_EQ(cmd->internal_read_consistency(), api::InternalReadConsistency::Strong);
+ EXPECT_EQ(cmd->getTrace().getLevel(), _trace_level);
}
TEST_F(CheckConditionTest, condition_matching_completes_check_with_match_outcome) {
@@ -232,4 +242,15 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_
});
}
+TEST_F(CheckConditionTest, nested_get_traces_are_propagated_to_outcome) {
+ test_cond_with_2_gets_sent([&](auto& cond) {
+ cond.handle_reply(_sender, make_trace_reply(0, 100, "hello"));
+ cond.handle_reply(_sender, make_trace_reply(1, 200, "world"));
+ }, [&](auto& outcome) {
+ auto trace_str = outcome.trace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("hello"));
+ EXPECT_THAT(trace_str, HasSubstr("world"));
+ });
+}
+
}
diff --git a/storage/src/tests/distributor/distributor_stripe_test_util.h b/storage/src/tests/distributor/distributor_stripe_test_util.h
index 707418512f9..9963b2c96b4 100644
--- a/storage/src/tests/distributor/distributor_stripe_test_util.h
+++ b/storage/src/tests/distributor/distributor_stripe_test_util.h
@@ -220,6 +220,15 @@ public:
return cmd;
}
+ template <typename ReplyType>
+ requires std::is_base_of_v<api::StorageReply, ReplyType>
+ [[nodiscard]] std::shared_ptr<ReplyType> sent_reply(size_t idx) {
+ assert(idx < _sender.replies().size());
+ auto reply = std::dynamic_pointer_cast<ReplyType>(_sender.reply(idx));
+ assert(reply != nullptr);
+ return reply;
+ }
+
void config_enable_condition_probing(bool enable);
void tag_content_node_supports_condition_probing(uint16_t index, bool supported);
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 6601fcabbeb..600a4de462e 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -16,8 +16,9 @@
#include <vespa/storage/distributor/externaloperationhandler.h>
#include <vespa/storage/distributor/operations/external/getoperation.h>
#include <vespa/storageapi/message/persistence.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <iomanip>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
using config::ConfigGetter;
using config::FileSpec;
@@ -75,7 +76,8 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
std::string authorVal,
uint32_t timestamp,
bool is_tombstone = false,
- bool condition_matched = false)
+ bool condition_matched = false,
+ std::string trace_msg = "")
{
if (idx == LastCommand) {
idx = _sender.commands().size() - 1;
@@ -98,6 +100,9 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
auto reply = std::make_shared<api::GetReply>(*tmp, doc, timestamp, false, is_tombstone, condition_matched);
reply->setResult(result);
+ if (!trace_msg.empty()) {
+ MBUS_TRACE(reply->getTrace(), 1, trace_msg);
+ }
op->receive(_sender, reply);
}
@@ -110,6 +115,10 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
sendReply(idx, api::ReturnCode::OK, "", timestamp, false, condition_match);
}
+ void reply_with_trace(uint32_t idx, uint32_t timestamp, std::string trace_message) {
+ sendReply(idx, api::ReturnCode::OK, "", timestamp, false, true, std::move(trace_message));
+ }
+
void replyWithFailure() {
sendReply(LastCommand, api::ReturnCode::IO_FAILURE, "", 0);
}
@@ -682,6 +691,7 @@ void GetOperationTest::set_up_condition_match_get_operation() {
TestAndSetCondition my_cond("my_cool_condition");
auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME);
msg->set_condition(my_cond);
+ msg->getTrace().setLevel(9); // FIXME a very tiny bit dirty to set this here >_>
start_operation(std::move(msg), api::InternalReadConsistency::Strong);
ASSERT_EQ("Get => 0,Get => 2,Get => 1", _sender.getCommands(true));
@@ -742,4 +752,21 @@ TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica
EXPECT_EQ(replica_of(api::Timestamp(500), bucketId, 2, true, false), *op->newest_replica());
}
+TEST_F(GetOperationTest, trace_is_aggregated_from_all_sub_replies_and_propagated_to_operation_reply) {
+ ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation());
+
+ ASSERT_NO_FATAL_FAILURE(reply_with_trace(0, 400, "foo"));
+ ASSERT_NO_FATAL_FAILURE(reply_with_trace(1, 500, "bar"));
+ ASSERT_NO_FATAL_FAILURE(reply_with_trace(2, 300, "baz"));
+
+ ASSERT_EQ(_sender.replies().size(), 1);
+ auto get_reply = sent_reply<api::GetReply>(0);
+
+ auto trace_str = get_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("foo"));
+ EXPECT_THAT(trace_str, HasSubstr("bar"));
+ EXPECT_THAT(trace_str, HasSubstr("baz"));
+}
+
+
}
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 405f25f9359..ff375e5b902 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -10,20 +10,18 @@
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
#include <vespa/storageapi/message/state.h>
-#include <vespa/vespalib/gtest/gtest.h>
#include <vespa/vespalib/text/stringtokenizer.h>
#include <vespa/config/helper/configgetter.h>
+#include <gtest/gtest.h>
+#include <gmock/gmock.h>
-using std::shared_ptr;
using config::ConfigGetter;
using config::FileSpec;
using vespalib::string;
using namespace document;
-using namespace storage;
-using namespace storage::api;
-using namespace storage::lib;
using namespace std::literals::string_literals;
using document::test::makeDocumentBucket;
+using storage::api::TestAndSetCondition;
using namespace ::testing;
@@ -493,7 +491,8 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
- PutReply* sreply = (PutReply*)reply.get();
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
op->receive(_sender, reply);
@@ -690,6 +689,7 @@ void PutOperationTest::set_up_tas_put_with_2_inconsistent_replica_nodes(bool cre
auto put = createPut(doc);
put->setCondition(TestAndSetCondition("test.foo"));
put->set_create_if_non_existent(create);
+ put->getTrace().setLevel(9);
sendPut(std::move(put));
ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
}
@@ -821,4 +821,41 @@ TEST_F(PutOperationTest, conditional_put_no_replicas_case_with_create_set_acts_a
EXPECT_TRUE(put_n1->get_create_if_non_existent());
}
+TEST_F(PutOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ ASSERT_EQ(sent_get_command(0)->getTrace().getLevel(), 9);
+ auto get_reply = make_get_reply(*sent_get_command(0), 50, false, true);
+ MBUS_TRACE(get_reply->getTrace(), 1, "a foo walks into a bar");
+
+ op->receive(_sender, get_reply);
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, true));
+
+ ASSERT_EQ("Get => 1,Get => 0,Put => 1,Put => 0", _sender.getCommands(true));
+ sendReply(2);
+ sendReply(3);
+ ASSERT_EQ(_sender.replies().size(), 1);
+ auto put_reply = sent_reply<api::PutReply>(0);
+
+ auto trace_str = put_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("a foo walks into a bar"));
+}
+
+TEST_F(PutOperationTest, trace_is_propagated_from_condition_probe_gets_failed_probe_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ auto get_reply = make_get_reply(*sent_get_command(0), 50, false, false);
+ MBUS_TRACE(get_reply->getTrace(), 1, "a foo walks into a zoo");
+
+ op->receive(_sender, get_reply);
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 50, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ(_sender.replies().size(), 1);
+ auto put_reply = sent_reply<api::PutReply>(0);
+
+ auto trace_str = put_reply->getTrace().toString();
+ EXPECT_THAT(trace_str, HasSubstr("a foo walks into a zoo"));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
index 7f7e6b08c1f..9f7dbcaa132 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.cpp
@@ -16,6 +16,29 @@ LOG_SETUP(".distributor.operations.external.check_condition");
namespace storage::distributor {
+CheckCondition::Outcome::Outcome(api::ReturnCode error_code, vespalib::Trace trace) noexcept
+ : _error_code(std::move(error_code)),
+ _result(Result::HasError),
+ _trace(std::move(trace))
+{
+}
+
+CheckCondition::Outcome::Outcome(Result result, vespalib::Trace trace) noexcept
+ : _error_code(),
+ _result(result),
+ _trace(std::move(trace))
+{
+}
+
+CheckCondition::Outcome::Outcome(Result result) noexcept
+ : _error_code(),
+ _result(result),
+ _trace()
+{
+}
+
+CheckCondition::Outcome::~Outcome() = default;
+
CheckCondition::CheckCondition(Outcome known_outcome,
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
@@ -36,6 +59,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& metric,
+ uint32_t trace_level,
private_ctor_tag)
: _doc_id_bucket(bucket),
_bucket_space(bucket_space),
@@ -48,6 +72,7 @@ CheckCondition::CheckCondition(const document::Bucket& bucket,
// Side note: the BucketId provided to the GetCommand is ignored; GetOperation computes explicitly from the doc ID.
auto get_cmd = std::make_shared<api::GetCommand>(_doc_id_bucket, doc_id, document::NoFields::NAME);
get_cmd->set_condition(tas_condition);
+ get_cmd->getTrace().setLevel(trace_level);
_cond_get_op = std::make_shared<GetOperation>(_node_ctx, _bucket_space,
_bucket_space.getBucketDatabase().acquire_read_guard(),
std::move(get_cmd),
@@ -129,7 +154,8 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
if (reply->getResult().success()) {
if (_cond_get_op->any_replicas_failed()) {
_outcome.emplace(api::ReturnCode(api::ReturnCode::ABORTED,
- "One or more replicas failed during test-and-set condition evaluation"));
+ "One or more replicas failed during test-and-set condition evaluation"),
+ reply->steal_trace());
return;
}
const auto state_version_now = _bucket_space.getClusterState().getVersion();
@@ -143,13 +169,14 @@ void CheckCondition::handle_internal_get_operation_reply(std::shared_ptr<api::St
// explicitly edge-handled...!
_outcome.emplace(api::ReturnCode(api::ReturnCode::BUCKET_NOT_FOUND,
"Bucket ownership or replica set changed between condition "
- "read and operation write phases"));
+ "read and operation write phases"),
+ reply->steal_trace());
} else {
auto maybe_newest = _cond_get_op->newest_replica();
- _outcome.emplace(newest_replica_to_outcome(maybe_newest));
+ _outcome.emplace(newest_replica_to_outcome(maybe_newest), reply->steal_trace());
}
} else {
- _outcome.emplace(reply->getResult());
+ _outcome.emplace(reply->getResult(), reply->steal_trace());
}
}
@@ -193,7 +220,8 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
const documentapi::TestAndSetCondition& tas_condition,
const DistributorNodeContext& node_ctx,
const DistributorStripeOperationContext& op_ctx,
- PersistenceOperationMetricSet& metric)
+ PersistenceOperationMetricSet& metric,
+ uint32_t trace_level)
{
// TODO move this check to the caller?
if (!op_ctx.distributor_config().enable_condition_probing()) {
@@ -210,7 +238,7 @@ CheckCondition::create_if_inconsistent_replicas(const document::Bucket& bucket,
return {}; // Want write-repair, but one or more nodes are too old to use the feature
}
return std::make_shared<CheckCondition>(bucket, doc_id, tas_condition, bucket_space,
- node_ctx, metric, private_ctor_tag{});
+ node_ctx, metric, trace_level, private_ctor_tag{});
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/check_condition.h b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
index ea3bfff2e3e..2a659c55081 100644
--- a/storage/src/vespa/storage/distributor/operations/external/check_condition.h
+++ b/storage/src/vespa/storage/distributor/operations/external/check_condition.h
@@ -60,17 +60,10 @@ public:
NotFound,
};
- explicit Outcome(api::ReturnCode error_code) noexcept
- : _error_code(std::move(error_code)),
- _result(Result::HasError)
- {
- }
-
- explicit Outcome(Result result) noexcept
- : _error_code(),
- _result(result)
- {
- }
+ Outcome(api::ReturnCode error_code, vespalib::Trace trace) noexcept;
+ Outcome(Result result, vespalib::Trace trace) noexcept;
+ explicit Outcome(Result result) noexcept;
+ ~Outcome();
[[nodiscard]] bool failed() const noexcept {
return _error_code.failed();
@@ -88,9 +81,18 @@ public:
return _result == Result::NotFound;
}
+ [[nodiscard]] const vespalib::Trace& trace() const noexcept {
+ return _trace;
+ }
+
+ [[nodiscard]] vespalib::Trace&& steal_trace() noexcept {
+ return std::move(_trace);
+ }
+
private:
api::ReturnCode _error_code;
Result _result;
+ vespalib::Trace _trace;
};
private:
const document::Bucket _doc_id_bucket;
@@ -113,6 +115,7 @@ public:
const DistributorBucketSpace& bucket_space,
const DistributorNodeContext& node_ctx,
PersistenceOperationMetricSet& metric,
+ uint32_t trace_level,
private_ctor_tag);
~CheckCondition();
@@ -121,7 +124,7 @@ public:
const std::shared_ptr<api::StorageReply>& reply);
void cancel(DistributorStripeMessageSender& sender);
- [[nodiscard]] const std::optional<Outcome>& maybe_outcome() const noexcept {
+ [[nodiscard]] std::optional<Outcome>& maybe_outcome() noexcept {
return _outcome;
}
@@ -132,7 +135,8 @@ public:
const documentapi::TestAndSetCondition& tas_condition,
const DistributorNodeContext& node_ctx,
const DistributorStripeOperationContext& op_ctx,
- PersistenceOperationMetricSet& metric);
+ PersistenceOperationMetricSet& metric,
+ uint32_t trace_level = 0); // TODO remove default value
private:
[[nodiscard]] bool replica_set_changed_after_get_operation() const;
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 72943eaf713..a261a898283 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -63,6 +63,7 @@ GetOperation::GetOperation(const DistributorNodeContext& node_ctx,
_newest_replica(),
_metric(metric),
_operationTimer(node_ctx.clock()),
+ _trace(_msg->getTrace().getLevel()),
_desired_read_consistency(desired_read_consistency),
_has_replica_inconsistency(false),
_any_replicas_failed(false)
@@ -156,7 +157,7 @@ GetOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
LOG(debug, "Received %s", msg->toString(true).c_str());
- _msg->getTrace().addChild(getreply->steal_trace());
+ _trace.addChild(getreply->steal_trace()); // TODO sweet, sweet nectar of distributed tracing
bool allDone = true;
for (auto& response : _responses) {
for (uint32_t i = 0; i < response.second.size(); i++) {
@@ -247,6 +248,10 @@ GetOperation::sendReply(DistributorStripeMessageSender& sender)
const auto timestamp = (newest.is_tombstone ? api::Timestamp(0) : newest.timestamp);
auto repl = std::make_shared<api::GetReply>(*_msg, _doc, timestamp, !_has_replica_inconsistency);
repl->setResult(_returnCode);
+ if (!_trace.isEmpty()) {
+ _trace.setStrict(false);
+ repl->getTrace().addChild(std::move(_trace));
+ }
update_internal_metrics();
sender.sendReply(repl);
_msg.reset();
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.h b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
index 9d6b9f1986d..a6de9370dcb 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.h
@@ -88,28 +88,25 @@ private:
};
using GroupVector = std::vector<BucketChecksumGroup>;
+ using DbReplicaState = std::vector<std::pair<document::BucketId, uint16_t>>;
// Organize the different copies by bucket/checksum pairs. We should
// try to request GETs from each bucket and each different checksum
// within that bucket.
- std::map<GroupId, GroupVector> _responses;
-
- const DistributorNodeContext& _node_ctx;
- const DistributorBucketSpace &_bucketSpace;
-
- std::shared_ptr<api::GetCommand> _msg;
-
- api::ReturnCode _returnCode;
+ std::map<GroupId, GroupVector> _responses;
+ const DistributorNodeContext& _node_ctx;
+ const DistributorBucketSpace& _bucketSpace;
+ std::shared_ptr<api::GetCommand> _msg;
+ api::ReturnCode _returnCode;
std::shared_ptr<document::Document> _doc;
-
- std::optional<NewestReplica> _newest_replica;
-
- PersistenceOperationMetricSet& _metric;
- framework::MilliSecTimer _operationTimer;
- std::vector<std::pair<document::BucketId, uint16_t>> _replicas_in_db;
- api::InternalReadConsistency _desired_read_consistency;
- bool _has_replica_inconsistency;
- bool _any_replicas_failed;
+ std::optional<NewestReplica> _newest_replica;
+ PersistenceOperationMetricSet& _metric;
+ framework::MilliSecTimer _operationTimer;
+ DbReplicaState _replicas_in_db;
+ vespalib::Trace _trace;
+ api::InternalReadConsistency _desired_read_consistency;
+ bool _has_replica_inconsistency;
+ bool _any_replicas_failed;
void sendReply(DistributorStripeMessageSender& sender);
bool sendForChecksum(DistributorStripeMessageSender& sender, const document::BucketId& id, GroupVector& res);
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
index 96252c37844..952aeff0800 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.cpp
@@ -156,13 +156,13 @@ void PutOperation::start_conditional_put(DistributorStripeMessageSender& sender)
document::Bucket bucket(_msg->getBucket().getBucketSpace(), _doc_id_bucket_id);
_check_condition = CheckCondition::create_if_inconsistent_replicas(bucket, _bucket_space, _msg->getDocumentId(),
_msg->getCondition(), _node_ctx, _op_ctx,
- _temp_metric);
+ _temp_metric, _msg->getTrace().getLevel());
if (!_check_condition) {
start_direct_put_dispatch(sender);
} else {
// Inconsistent replicas; need write repair
_check_condition->start_and_send(sender);
- const auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately
+ auto& outcome = _check_condition->maybe_outcome(); // Might be done immediately
if (outcome) {
on_completed_check_condition(*outcome, sender);
}
@@ -265,7 +265,7 @@ PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
_tracker.receiveReply(sender, dynamic_cast<api::BucketInfoReply&>(*msg));
} else {
_check_condition->handle_reply(sender, msg);
- const auto& outcome = _check_condition->maybe_outcome();
+ auto& outcome = _check_condition->maybe_outcome();
if (!outcome) {
return; // Condition check not done yet
}
@@ -274,9 +274,12 @@ PutOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
}
void
-PutOperation::on_completed_check_condition(const CheckCondition::Outcome& outcome,
+PutOperation::on_completed_check_condition(CheckCondition::Outcome& outcome,
DistributorStripeMessageSender& sender)
{
+ if (!outcome.trace().isEmpty()) {
+ _tracker.add_trace_tree_to_reply(outcome.steal_trace());
+ }
const bool effectively_matched = (outcome.matched_condition()
|| (outcome.not_found() && _msg->get_create_if_non_existent()));
if (effectively_matched) {
diff --git a/storage/src/vespa/storage/distributor/operations/external/putoperation.h b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
index d80e9cc00d7..6befb8d3e38 100644
--- a/storage/src/vespa/storage/distributor/operations/external/putoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/putoperation.h
@@ -50,7 +50,7 @@ private:
void start_direct_put_dispatch(DistributorStripeMessageSender& sender);
void start_conditional_put(DistributorStripeMessageSender& sender);
- void on_completed_check_condition(const CheckCondition::Outcome& outcome,
+ void on_completed_check_condition(CheckCondition::Outcome& outcome,
DistributorStripeMessageSender& sender);
void insertDatabaseEntryAndScheduleCreateBucket(const OperationTargetList& copies, bool setOneActive,
const api::StorageCommand& originalCommand,
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
index 08b99cf89a8..a30663bde2f 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.cpp
@@ -58,6 +58,7 @@ PersistenceMessageTrackerImpl::fail(MessageSender& sender, const api::ReturnCode
if (_reply.get()) {
_reply->setResult(result);
updateMetrics();
+ transfer_trace_state_to_reply();
sender.sendReply(_reply);
_reply.reset();
}
@@ -222,10 +223,7 @@ PersistenceMessageTrackerImpl::sendReply(MessageSender& sender)
}
updateMetrics();
- if ( ! _trace.isEmpty()) {
- _trace.setStrict(false);
- _reply->getTrace().addChild(std::move(_trace));
- }
+ transfer_trace_state_to_reply();
sender.sendReply(_reply);
_reply = std::shared_ptr<api::BucketInfoReply>();
@@ -288,6 +286,15 @@ PersistenceMessageTrackerImpl::handlePersistenceReply(
}
void
+PersistenceMessageTrackerImpl::transfer_trace_state_to_reply()
+{
+ if (!_trace.isEmpty()) {
+ _trace.setStrict(false);
+ _reply->getTrace().addChild(std::move(_trace));
+ }
+}
+
+void
PersistenceMessageTrackerImpl::updateFromReply(
MessageSender& sender,
api::BucketInfoReply& reply,
@@ -318,4 +325,10 @@ PersistenceMessageTrackerImpl::updateFromReply(
}
}
+void
+PersistenceMessageTrackerImpl::add_trace_tree_to_reply(vespalib::Trace trace)
+{
+ _trace.addChild(std::move(trace));
+}
+
}
diff --git a/storage/src/vespa/storage/distributor/persistencemessagetracker.h b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
index ca330859259..923ecf45649 100644
--- a/storage/src/vespa/storage/distributor/persistencemessagetracker.h
+++ b/storage/src/vespa/storage/distributor/persistencemessagetracker.h
@@ -23,10 +23,12 @@ struct PersistenceMessageTracker {
virtual void queueCommand(api::BucketCommand::SP, uint16_t target) = 0;
virtual void flushQueue(MessageSender&) = 0;
virtual uint16_t handleReply(api::BucketReply& reply) = 0;
+ virtual void add_trace_tree_to_reply(vespalib::Trace trace) = 0;
};
-class PersistenceMessageTrackerImpl : public PersistenceMessageTracker,
- public MessageTracker
+class PersistenceMessageTrackerImpl final
+ : public PersistenceMessageTracker,
+ public MessageTracker
{
private:
using BucketInfoMap = std::map<document::Bucket, std::vector<BucketCopy>>;
@@ -92,12 +94,14 @@ private:
void updateFailureResult(const api::BucketInfoReply& reply);
void handleCreateBucketReply(api::BucketInfoReply& reply, uint16_t node);
void handlePersistenceReply(api::BucketInfoReply& reply, uint16_t node);
+ void transfer_trace_state_to_reply();
void queueCommand(std::shared_ptr<api::BucketCommand> msg, uint16_t target) override {
MessageTracker::queueCommand(std::move(msg), target);
}
void flushQueue(MessageSender& s) override { MessageTracker::flushQueue(s); }
uint16_t handleReply(api::BucketReply& r) override { return MessageTracker::handleReply(r); }
+ void add_trace_tree_to_reply(vespalib::Trace trace) override;
};
}