summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/twophaseupdateoperationtest.cpp17
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp42
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h5
3 files changed, 49 insertions, 15 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
index 5aa2a3e5662..876aa4a258c 100644
--- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
+++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp
@@ -351,6 +351,9 @@ TEST_F(TwoPhaseUpdateOperationTest, simple) {
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 90) ReturnCode(NONE)",
_sender.getLastReply(true));
+
+ EXPECT_EQ(metrics().updates.failures.notfound.getValue(), 0);
+ EXPECT_EQ(metrics().updates.failures.test_and_set_failed.getValue(), 0);
}
TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
@@ -361,6 +364,8 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing) {
EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, BucketId(0x0000000000000000), "
"timestamp 0, timestamp of updated doc: 0) ReturnCode(NONE)",
_sender.getLastReply(true));
+
+ EXPECT_EQ(metrics().updates.failures.notfound.getValue(), 1);
}
TEST_F(TwoPhaseUpdateOperationTest, update_failed) {
@@ -741,6 +746,9 @@ TEST_F(TwoPhaseUpdateOperationTest, non_existing_with_auto_create) {
"timestamp 0, timestamp of updated doc: 200000000) "
"ReturnCode(NONE)",
_sender.getLastReply(true));
+
+ // "Not found" failure not counted when create: true is set, since the update itself isn't failed.
+ EXPECT_EQ(metrics().updates.failures.notfound.getValue(), 0);
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_fails_update_when_mismatching_timestamp_constraint) {
@@ -863,6 +871,9 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_mismatch_fails_with_tas_
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Condition did not match document)",
_sender.getLastReply(true));
+
+ EXPECT_EQ(metrics().updates.failures.notfound.getValue(), 0);
+ EXPECT_EQ(metrics().updates.failures.test_and_set_failed.getValue(), 1);
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_match_sends_puts_with_updated_doc) {
@@ -928,6 +939,9 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_no_
"ReturnCode(TEST_AND_SET_CONDITION_FAILED, "
"Document did not exist)",
_sender.getLastReply(true));
+
+ EXPECT_EQ(metrics().updates.failures.notfound.getValue(), 0); // Not counted as "not found" failure when TaS is present
+ EXPECT_EQ(metrics().updates.failures.test_and_set_failed.getValue(), 1);
}
TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_auto_create_sends_puts) {
@@ -940,6 +954,9 @@ TEST_F(TwoPhaseUpdateOperationTest, safe_path_condition_with_missing_doc_and_aut
replyToGet(*cb, _sender, 0, 100, false);
replyToGet(*cb, _sender, 1, 110, false);
ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 2));
+
+ EXPECT_EQ(metrics().updates.failures.notfound.getValue(), 0); // Not counted as "not found" failure when we auto create
+ EXPECT_EQ(metrics().updates.failures.test_and_set_failed.getValue(), 0);
}
void
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
index 563c2d573cd..00c783d1eae 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.cpp
@@ -138,15 +138,16 @@ TwoPhaseUpdateOperation::transitionTo(SendState newState)
void
TwoPhaseUpdateOperation::ensureUpdateReplyCreated()
{
- if (!_updateReply.get()) {
- _updateReply = _updateCmd->makeReply();
+ if (!_updateReply) {
+ _updateReply = std::dynamic_pointer_cast<api::UpdateReply>(std::shared_ptr<api::StorageReply>(_updateCmd->makeReply()));
+ assert(_updateReply);
}
}
void
TwoPhaseUpdateOperation::sendReply(
DistributorStripeMessageSender& sender,
- std::shared_ptr<api::StorageReply>& reply)
+ std::shared_ptr<api::UpdateReply> reply)
{
assert(!_replySent);
reply->getTrace().addChild(std::move(_trace));
@@ -160,6 +161,12 @@ TwoPhaseUpdateOperation::sendReplyWithResult(
const api::ReturnCode& result)
{
ensureUpdateReplyCreated();
+ // This particular method is called when we synthesize our own UpdateReply,
+ // not when we take over an already produced one from an UpdateOperation.
+ // The latter will already increment the TaS metric implicitly.
+ if (result.getResult() == api::ReturnCode::Result::TEST_AND_SET_CONDITION_FAILED) {
+ _updateMetric.failures.test_and_set_failed.inc();
+ }
_updateReply->setResult(result);
sendReply(sender, _updateReply);
}
@@ -195,7 +202,7 @@ TwoPhaseUpdateOperation::startFastPathUpdate(DistributorStripeMessageSender& sen
transitionTo(SendState::UPDATES_SENT);
if (intermediate._reply.get()) {
- sendReply(sender, intermediate._reply);
+ sendReply(sender, std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply));
}
}
@@ -367,18 +374,26 @@ TwoPhaseUpdateOperation::handleFastPathReceive(DistributorStripeMessageSender& s
if (intermediate._reply.get()) {
assert(_sendState == SendState::UPDATES_SENT);
addTraceFromReply(*intermediate._reply);
- UpdateOperation& cb = static_cast<UpdateOperation&> (callbackOp);
+ auto& cb = dynamic_cast<UpdateOperation&>(callbackOp);
std::pair<document::BucketId, uint16_t> bestNode = cb.getNewestTimestampLocation();
-
- if (!intermediate._reply->getResult().success() ||
- bestNode.first == document::BucketId(0)) {
+ auto intermediate_update_reply = std::dynamic_pointer_cast<api::UpdateReply>(intermediate._reply);
+ assert(intermediate_update_reply);
+
+ if (!intermediate_update_reply->getResult().success() ||
+ bestNode.first == document::BucketId(0))
+ {
+ if (intermediate_update_reply->getResult().success() &&
+ (intermediate_update_reply->getOldTimestamp() == 0))
+ {
+ _updateMetric.failures.notfound.inc();
+ }
// Failed or was consistent
- sendReply(sender, intermediate._reply);
+ sendReply(sender, std::move(intermediate_update_reply));
} else {
LOG(debug, "Update(%s) fast path: was inconsistent!", update_doc_id().c_str());
- _updateReply = intermediate._reply;
+ _updateReply = std::move(intermediate_update_reply);
_fast_path_repair_source_node = bestNode.second;
document::Bucket bucket(_updateCmd->getBucket().getBucketSpace(), bestNode.first);
auto cmd = std::make_shared<api::GetCommand>(bucket, _updateCmd->getDocumentId(), document::AllFields::NAME);
@@ -535,7 +550,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende
document::Document::SP docToUpdate;
api::Timestamp putTimestamp = _op_ctx.generate_unique_timestamp();
- if (reply.getDocument().get()) {
+ if (reply.getDocument()) {
api::Timestamp receivedTimestamp = reply.getLastModifiedTimestamp();
if (!satisfiesUpdateTimestampConstraint(receivedTimestamp)) {
sendReplyWithResult(sender, api::ReturnCode(api::ReturnCode::OK,
@@ -556,6 +571,7 @@ TwoPhaseUpdateOperation::handleSafePathReceivedGet(DistributorStripeMessageSende
docToUpdate = createBlankDocument();
setUpdatedForTimestamp(putTimestamp);
} else {
+ _updateMetric.failures.notfound.inc();
sendReplyWithResult(sender, reply.getResult());
return;
}
@@ -644,7 +660,7 @@ void
TwoPhaseUpdateOperation::setUpdatedForTimestamp(api::Timestamp ts)
{
ensureUpdateReplyCreated();
- static_cast<api::UpdateReply&>(*_updateReply).setOldTimestamp(ts);
+ _updateReply->setOldTimestamp(ts);
}
std::shared_ptr<document::Document>
@@ -700,7 +716,7 @@ TwoPhaseUpdateOperation::onClose(DistributorStripeMessageSender& sender) {
auto candidateReply = std::move(intermediate._reply);
if (candidateReply && candidateReply->getType() == api::MessageType::UPDATE_REPLY) {
assert(_mode == Mode::FAST_PATH);
- sendReply(sender, candidateReply); // Sets _replySent
+ sendReply(sender, std::dynamic_pointer_cast<api::UpdateReply>(candidateReply)); // Sets _replySent
}
} else {
break;
diff --git a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
index aa59a457eb7..45dec86268f 100644
--- a/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
+++ b/storage/src/vespa/storage/distributor/operations/external/twophaseupdateoperation.h
@@ -16,6 +16,7 @@ namespace storage {
namespace api {
class UpdateCommand;
+class UpdateReply;
class CreateBucketReply;
class ReturnCode;
}
@@ -94,7 +95,7 @@ private:
static const char* stateToString(SendState);
void sendReply(DistributorStripeMessageSender&,
- std::shared_ptr<api::StorageReply>&);
+ std::shared_ptr<api::UpdateReply>);
void sendReplyWithResult(DistributorStripeMessageSender&, const api::ReturnCode&);
void ensureUpdateReplyCreated();
@@ -144,7 +145,7 @@ private:
PersistenceOperationMetricSet& _getMetric;
PersistenceOperationMetricSet& _metadata_get_metrics;
std::shared_ptr<api::UpdateCommand> _updateCmd;
- std::shared_ptr<api::StorageReply> _updateReply;
+ std::shared_ptr<api::UpdateReply> _updateReply;
const DistributorNodeContext& _node_ctx;
DistributorStripeOperationContext& _op_ctx;
const DocumentSelectionParser& _parser;