aboutsummaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-03-24 13:21:28 +0100
committerGitHub <noreply@github.com>2020-03-24 13:21:28 +0100
commit139e6f662bbc016dbccfda9d02cb53bbe468f31b (patch)
treed284846643900e9f4d35fb9fc6dcd82f04a08dc1 /storage
parentadf7b03ebec1614fa9b0bba2a14042968bc876b5 (diff)
parenta607b174decc213f482e177627cfd6b35fa4aaaa (diff)
Merge pull request #12667 from vespa-engine/vekterli/add-metric-coverage-of-new-update-phases
Track metrics for new inconsistent update phases
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp23
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/distributormetricsset.h1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp29
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h4
5 files changed, 54 insertions, 4 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index f3ce4d92263..962ce085cb0 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -1099,6 +1099,10 @@ TEST_F(ThreePhaseUpdateTest, full_document_get_sent_to_replica_with_highest_time
ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
reply_to_metadata_get(*cb, _sender, 0, 1000U);
reply_to_metadata_get(*cb, _sender, 1, 2000U);
+
+ auto& metrics = getDistributor().getMetrics().update_metadata_gets[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.ok.getValue()); // Technically tracks an entire operation covering multiple Gets.
+
// Node 1 has newest document version at ts=2000
ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
{
@@ -1116,6 +1120,9 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) {
ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2));
replyToGet(*cb, _sender, 2, 2000U);
ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3));
+
+ auto& metrics = getDistributor().getMetrics().update_gets[documentapi::LoadType::DEFAULT];
+ EXPECT_EQ(1, metrics.ok.getValue());
}
TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) {
@@ -1294,6 +1301,22 @@ TEST_F(ThreePhaseUpdateTest, single_get_mbus_trace_is_propagated_to_reply) {
ASSERT_THAT(trace, HasSubstr("it is me, Leclerc! *lifts glasses*"));
}
+TEST_F(ThreePhaseUpdateTest, single_full_get_reply_received_after_close_is_no_op) {
+ auto cb = set_up_2_inconsistent_replicas_and_start_update();
+ ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true));
+ reply_to_metadata_get(*cb, _sender, 0, 0U);
+ reply_to_metadata_get(*cb, _sender, 1, 1000U);
+ ASSERT_EQ("Get => 1", _sender.getCommands(true, false, 2));
+ cb->onClose(_sender);
+ ASSERT_EQ("Update Reply", _sender.getLastReply(false));
+ // Operation closed prior to receiving Get. Note that we should not really get
+ // into this situation since the owner of the operation itself should clear
+ // any mappings associating the reply with the operation, but ensure we handle
+ // it gracefully anyway.
+ replyToGet(*cb, _sender, 2, 2000U);
+ ASSERT_EQ("", _sender.getCommands(true, false, 3)); // Nothing new sent.
+}
+
// XXX currently differs in behavior from content nodes in that updates for
// document IDs without explicit doctypes will _not_ be auto-failed on the
// distributor.
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.cpp b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
index 98e96f9294f..8266aeb29cd 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.cpp
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.cpp
@@ -13,6 +13,7 @@ DistributorMetricSet::DistributorMetricSet(const metrics::LoadTypeSet& lt)
updates(lt, UpdateMetricSet(), this),
update_puts(lt, PersistenceOperationMetricSet("update_puts"), this),
update_gets(lt, PersistenceOperationMetricSet("update_gets"), this),
+ update_metadata_gets(lt, PersistenceOperationMetricSet("update_metadata_gets"), this),
removes(lt, PersistenceOperationMetricSet("removes"), this),
removelocations(lt, PersistenceOperationMetricSet("removelocations"), this),
gets(lt, PersistenceOperationMetricSet("gets"), this),
diff --git a/storage/src/vespa/storage/distributor/distributormetricsset.h b/storage/src/vespa/storage/distributor/distributormetricsset.h
index b5be72e8c14..d9c0711fd14 100644
--- a/storage/src/vespa/storage/distributor/distributormetricsset.h
+++ b/storage/src/vespa/storage/distributor/distributormetricsset.h
@@ -16,6 +16,7 @@ public:
metrics::LoadMetric<UpdateMetricSet> updates;
metrics::LoadMetric<PersistenceOperationMetricSet> update_puts;
metrics::LoadMetric<PersistenceOperationMetricSet> update_gets;
+ metrics::LoadMetric<PersistenceOperationMetricSet> update_metadata_gets;
metrics::LoadMetric<PersistenceOperationMetricSet> removes;
metrics::LoadMetric<PersistenceOperationMetricSet> removelocations;
metrics::LoadMetric<PersistenceOperationMetricSet> gets;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 706b7a447bf..4f49d89929f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -29,12 +29,14 @@ TwoPhaseUpdateOperation::TwoPhaseUpdateOperation(
_updateMetric(metrics.updates[msg->getLoadType()]),
_putMetric(metrics.update_puts[msg->getLoadType()]),
_getMetric(metrics.update_gets[msg->getLoadType()]),
+ _metadata_get_metrics(metrics.update_metadata_gets[msg->getLoadType()]),
_updateCmd(std::move(msg)),
_updateReply(),
_manager(manager),
_bucketSpace(bucketSpace),
_sendState(SendState::NONE_SENT),
_mode(Mode::FAST_PATH),
+ _single_get_latency_timer(),
_fast_path_repair_source_node(0xffff),
_use_initial_cheap_metadata_fetch_phase(
_manager.getDistributor().getConfig().enable_metadata_only_fetch_phase_for_inconsistent_updates()),
@@ -217,9 +219,10 @@ TwoPhaseUpdateOperation::create_initial_safe_path_get_operation() {
LOG(debug, "Update(%s) safe path: sending Get commands with field set '%s' "
"and internal read consistency %s",
update_doc_id().c_str(), field_set, api::to_string(read_consistency));
+ auto& get_metric = (_use_initial_cheap_metadata_fetch_phase ? _metadata_get_metrics : _getMetric);
return std::make_shared<GetOperation>(
_manager, _bucketSpace, _bucketSpace.getBucketDatabase().acquire_read_guard(),
- get, _getMetric, read_consistency);
+ get, get_metric, read_consistency);
}
void
@@ -368,9 +371,7 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
// so we handle its reply separately.
if (_sendState == SendState::SINGLE_GET_SENT) {
assert(msg->getType() == api::MessageType::GET_REPLY);
- LOG(spam, "Received single full Get reply for '%s'", update_doc_id().c_str());
- addTraceFromReply(*msg);
- handleSafePathReceivedGet(sender, dynamic_cast<api::GetReply&>(*msg));
+ handle_safe_path_received_single_full_get(sender, dynamic_cast<api::GetReply&>(*msg));
return;
}
std::shared_ptr<Operation> callback = _sentMessageMap.pop(msg->getMsgId());
@@ -400,6 +401,25 @@ TwoPhaseUpdateOperation::handleSafePathReceive(DistributorMessageSender& sender,
}
}
+void TwoPhaseUpdateOperation::handle_safe_path_received_single_full_get(
+ DistributorMessageSender& sender,
+ api::GetReply& reply)
+{
+ LOG(spam, "Received single full Get reply for '%s'", update_doc_id().c_str());
+ if (_replySent) {
+ return; // Bail out; the operation has been concurrently closed.
+ }
+ addTraceFromReply(reply);
+ if (reply.getResult().success()) {
+ _getMetric.ok.inc();
+ } else {
+ _getMetric.failures.storagefailure.inc();
+ }
+ assert(_single_get_latency_timer.has_value());
+ _getMetric.latency.addValue(_single_get_latency_timer->getElapsedTimeAsDouble());
+ handleSafePathReceivedGet(sender, reply);
+}
+
void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
DistributorMessageSender& sender, api::GetReply& reply,
const std::optional<NewestReplica>& newest_replica,
@@ -443,6 +463,7 @@ void TwoPhaseUpdateOperation::handle_safe_path_received_metadata_get(
// Timestamps were not in sync, so we have to fetch the document from the highest
// timestamped replica, apply the update to it and then explicitly Put the result
// to all replicas.
+ _single_get_latency_timer.emplace(_manager.getClock());
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), newest_replica->bucket_id);
LOG(debug, "Update(%s): sending single payload Get to %s on node %u (had timestamp %" PRIu64 ")",
update_doc_id().c_str(), bucket.toString().c_str(),
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index 4a2f83010c7..2d8f3e8494d 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -7,6 +7,7 @@
#include <vespa/storage/distributor/operations/sequenced_operation.h>
#include <vespa/document/update/documentupdate.h>
#include <set>
+#include <optional>
namespace document {
class Document;
@@ -116,6 +117,7 @@ private:
api::GetReply&,
const std::optional<NewestReplica>&,
bool any_replicas_failed);
+ void handle_safe_path_received_single_full_get(DistributorMessageSender&, api::GetReply&);
void handleSafePathReceivedGet(DistributorMessageSender&, api::GetReply&);
void handleSafePathReceivedPut(DistributorMessageSender&, const api::PutReply&);
bool shouldCreateIfNonExistent() const;
@@ -136,6 +138,7 @@ private:
UpdateMetricSet& _updateMetric;
PersistenceOperationMetricSet& _putMetric;
PersistenceOperationMetricSet& _getMetric;
+ PersistenceOperationMetricSet& _metadata_get_metrics;
std::shared_ptr<api::UpdateCommand> _updateCmd;
std::shared_ptr<api::StorageReply> _updateReply;
DistributorComponent& _manager;
@@ -146,6 +149,7 @@ private:
mbus::TraceNode _trace;
document::BucketId _updateDocBucketId;
std::vector<std::pair<document::BucketId, uint16_t>> _replicas_at_get_send_time;
+ std::optional<framework::MilliSecTimer> _single_get_latency_timer;
uint16_t _fast_path_repair_source_node;
bool _use_initial_cheap_metadata_fetch_phase;
bool _replySent;