summaryrefslogtreecommitdiffstats
path: root/storage
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-04-20 11:33:11 +0200
committerGitHub <noreply@github.com>2023-04-20 11:33:11 +0200
commitb78746c2f8cac3e87b79ca362acb57c0a5d9f4df (patch)
treeebb3354ad7870dab119b6692296535caf75c7f2c /storage
parentd525287130a57e0de1e0f89332d8bbf67481e528 (diff)
parent9c04f6c0508de1aeea5a163262ae14b1a0bb1faf (diff)
Merge pull request #26788 from vespa-engine/vekterli/add-condition-match-metadata-aggregation-for-newest-replica
Add condition support to distributor `GetOperation`
Diffstat (limited to 'storage')
-rw-r--r--storage/src/tests/distributor/getoperationtest.cpp137
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/getoperation.cpp9
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp1
-rw-r--r--storage/src/vespa/storage/distributor/operations/external/newest_replica.h11
-rw-r--r--storage/src/vespa/storageapi/message/persistence.cpp12
5 files changed, 139 insertions, 31 deletions
diff --git a/storage/src/tests/distributor/getoperationtest.cpp b/storage/src/tests/distributor/getoperationtest.cpp
index 36a1495579f..6601fcabbeb 100644
--- a/storage/src/tests/distributor/getoperationtest.cpp
+++ b/storage/src/tests/distributor/getoperationtest.cpp
@@ -19,11 +19,11 @@
#include <vespa/vespalib/gtest/gtest.h>
#include <iomanip>
-using std::shared_ptr;
using config::ConfigGetter;
using config::FileSpec;
using document::test::makeDocumentBucket;
using document::BucketId;
+using documentapi::TestAndSetCondition;
using namespace ::testing;
namespace storage::distributor {
@@ -54,23 +54,28 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
op.reset();
}
- void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
- auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::AllFields::NAME);
+ void start_operation(std::shared_ptr<api::GetCommand> cmd, api::InternalReadConsistency consistency) {
op = std::make_unique<GetOperation>(
node_context(), getDistributorBucketSpace(),
getDistributorBucketSpace().getBucketDatabase().acquire_read_guard(),
- msg, metrics().gets,
+ std::move(cmd), metrics().gets,
consistency);
op->start(_sender);
}
+ void sendGet(api::InternalReadConsistency consistency = api::InternalReadConsistency::Strong) {
+ auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::AllFields::NAME);
+ start_operation(std::move(msg), consistency);
+ }
+
static constexpr uint32_t LastCommand = UINT32_MAX;
void sendReply(uint32_t idx,
api::ReturnCode::Result result,
std::string authorVal,
uint32_t timestamp,
- bool is_tombstone = false)
+ bool is_tombstone = false,
+ bool condition_matched = false)
{
if (idx == LastCommand) {
idx = _sender.commands().size() - 1;
@@ -91,7 +96,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
document::StringFieldValue(authorVal));
}
- auto reply = std::make_shared<api::GetReply>(*tmp, doc, timestamp, false, is_tombstone);
+ auto reply = std::make_shared<api::GetReply>(*tmp, doc, timestamp, false, is_tombstone, condition_matched);
reply->setResult(result);
op->receive(_sender, reply);
@@ -101,6 +106,10 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
sendReply(idx, api::ReturnCode::OK, "", tombstone_ts, true);
}
+ void reply_with_condition_match(uint32_t idx, uint32_t timestamp, bool condition_match) {
+ sendReply(idx, api::ReturnCode::OK, "", timestamp, false, condition_match);
+ }
+
void replyWithFailure() {
sendReply(LastCommand, api::ReturnCode::IO_FAILURE, "", 0);
}
@@ -147,6 +156,7 @@ struct GetOperationTest : Test, DistributorStripeTestUtil {
}
void do_test_read_consistency_is_propagated(api::InternalReadConsistency consistency);
+ void set_up_condition_match_get_operation();
};
GetOperationTest::GetOperationTest() = default;
@@ -154,8 +164,10 @@ GetOperationTest::~GetOperationTest() = default;
namespace {
-NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node, bool is_tombstone) {
- return NewestReplica::of(ts, bucket_id, node, is_tombstone);
+NewestReplica replica_of(api::Timestamp ts, const document::BucketId& bucket_id, uint16_t node,
+ bool is_tombstone, bool condition_matched)
+{
+ return NewestReplica::of(ts, bucket_id, node, is_tombstone, condition_matched);
}
}
@@ -177,7 +189,7 @@ TEST_F(GetOperationTest, simple) {
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted_replica_available) {
@@ -198,7 +210,7 @@ TEST_F(GetOperationTest, ask_all_checksum_groups_if_inconsistent_even_if_trusted
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 0, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
@@ -221,7 +233,7 @@ TEST_F(GetOperationTest, ask_all_nodes_if_bucket_is_inconsistent) {
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(2), bucketId, 1, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, send_to_all_invalid_copies) {
@@ -289,7 +301,7 @@ TEST_F(GetOperationTest, inconsistent_split) {
EXPECT_FALSE(last_reply_had_consistent_replicas());
// Bucket with highest timestamp should be returned. In this case it's the one on node 0.
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(2), BucketId(16, 0x0593), 0, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found) {
@@ -333,7 +345,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_not_found_deleted) {
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 1, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, multi_inconsistent_bucket) {
@@ -383,7 +395,7 @@ TEST_F(GetOperationTest, multi_inconsistent_bucket_fail) {
EXPECT_FALSE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
// First send to node 2 fails, second is to node 3 which returned the highest timestamp
- EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 3, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, return_not_found_when_bucket_not_in_db) {
@@ -423,7 +435,7 @@ TEST_F(GetOperationTest, not_found) {
// the caller may want to perform special logic if all replicas are in sync
// but are missing the document.
// FIXME make sure all callers are aware of this!
- EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(0), bucketId, 0, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, not_found_on_subset_of_replicas_marks_get_as_inconsistent) {
@@ -468,7 +480,7 @@ TEST_F(GetOperationTest, resend_on_storage_failure) {
// Replica had read failure, but they're still in sync. An immutable Get won't change that fact.
EXPECT_TRUE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 2, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_inconsistent) {
@@ -485,7 +497,7 @@ TEST_F(GetOperationTest, storage_failure_of_out_of_sync_replica_is_tracked_as_in
EXPECT_TRUE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(3), bucketId, 2, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, resend_on_storage_failure_all_fail) {
@@ -534,7 +546,7 @@ TEST_F(GetOperationTest, send_to_ideal_copy_if_bucket_in_sync) {
_sender.getLastReply());
EXPECT_TRUE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 1, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
@@ -565,7 +577,7 @@ TEST_F(GetOperationTest, multiple_copies_with_failure_on_local_node) {
EXPECT_TRUE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
ASSERT_TRUE(op->newest_replica().has_value());
- EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(3), BucketId(16, 0x0593), 2, false, false), *op->newest_replica());
}
TEST_F(GetOperationTest, can_get_documents_when_all_replica_nodes_retired) {
@@ -610,7 +622,7 @@ TEST_F(GetOperationTest, replicas_considered_consistent_if_all_equal_tombstone_t
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_TRUE(last_reply_had_consistent_replicas());
EXPECT_FALSE(last_reply_has_document());
- EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, true), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(100), bucketId, 0, true, false), *op->newest_replica());
}
TEST_F(GetOperationTest, newer_tombstone_hides_older_document) {
@@ -628,7 +640,7 @@ TEST_F(GetOperationTest, newer_tombstone_hides_older_document) {
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
EXPECT_FALSE(last_reply_has_document());
- EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 1, true), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 1, true, false), *op->newest_replica());
}
TEST_F(GetOperationTest, older_tombstone_does_not_hide_newer_document) {
@@ -646,7 +658,88 @@ TEST_F(GetOperationTest, older_tombstone_does_not_hide_newer_document) {
EXPECT_FALSE(op->any_replicas_failed());
EXPECT_FALSE(last_reply_had_consistent_replicas());
EXPECT_TRUE(last_reply_has_document());
- EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 0, false), *op->newest_replica());
+ EXPECT_EQ(replica_of(api::Timestamp(200), bucketId, 0, false, false), *op->newest_replica());
+}
+
+TEST_F(GetOperationTest, provided_condition_is_propagated_to_sent_gets) {
+ setClusterState("distributor:1 storage:1");
+ addNodesToBucketDB(bucketId, "0=123");
+
+ TestAndSetCondition my_cond("my_cool_condition");
+ auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME);
+ msg->set_condition(my_cond);
+
+ start_operation(std::move(msg), api::InternalReadConsistency::Strong);
+ ASSERT_EQ("Get => 0", _sender.getCommands(true));
+ auto& cmd = dynamic_cast<const api::GetCommand&>(*_sender.command(0));
+ EXPECT_EQ(cmd.condition().getSelection(), my_cond.getSelection());
+}
+
+void GetOperationTest::set_up_condition_match_get_operation() {
+ setClusterState("distributor:1 storage:3");
+ addNodesToBucketDB(bucketId, "0=100,2=200,1=300");
+
+ TestAndSetCondition my_cond("my_cool_condition");
+ auto msg = std::make_shared<api::GetCommand>(makeDocumentBucket(BucketId(0)), docId, document::NoFields::NAME);
+ msg->set_condition(my_cond);
+ start_operation(std::move(msg), api::InternalReadConsistency::Strong);
+
+ ASSERT_EQ("Get => 0,Get => 2,Get => 1", _sender.getCommands(true));
+}
+
+TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica_mismatch_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation());
+ // node 0 (send index 0) has an old doc without a match
+ // node 2 (send index 1) has an old tombstone without match
+ // node 1 (send index 2) has a new doc without a match
+ // Newest replica should reflect node 1's results
+ ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(0, 200, false));
+ ASSERT_NO_FATAL_FAILURE(reply_with_tombstone(1, 100));
+ ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(2, 300, false));
+
+ ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, timestamp 300) ReturnCode(NONE)",
+ _sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
+ EXPECT_FALSE(last_reply_has_document());
+ EXPECT_EQ(replica_of(api::Timestamp(300), bucketId, 1, false, false), *op->newest_replica());
+}
+
+TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica_match_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation());
+ // node 0 (send index 0) has a new doc with a match
+ // node 2 (send index 1) has an old tombstone without match
+ // node 1 (send index 2) has an old doc without a match
+ // Newest replica should reflect node 0's results
+ ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(0, 400, true));
+ ASSERT_NO_FATAL_FAILURE(reply_with_tombstone(1, 300));
+ ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(2, 200, false));
+
+ ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, timestamp 400) ReturnCode(NONE)",
+ _sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
+ EXPECT_FALSE(last_reply_has_document());
+ EXPECT_EQ(replica_of(api::Timestamp(400), bucketId, 0, false, true), *op->newest_replica());
+}
+
+TEST_F(GetOperationTest, condition_match_result_is_aggregated_for_newest_replica_tombstone_case) {
+ ASSERT_NO_FATAL_FAILURE(set_up_condition_match_get_operation());
+ // node 0 (send index 0) has an old doc with a match
+ // node 2 (send index 1) has a new tombstone without match
+ // node 1 (send index 2) has an old doc without a match
+ // Newest replica should reflect node 2's results
+ ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(0, 400, true));
+ ASSERT_NO_FATAL_FAILURE(reply_with_tombstone(1, 500));
+ ASSERT_NO_FATAL_FAILURE(reply_with_condition_match(2, 300, false));
+
+ // Timestamp 0 in reply signals "not found" to clients
+ ASSERT_EQ("GetReply(BucketId(0x0000000000000000), id:ns:text/html::uri, timestamp 0) ReturnCode(NONE)",
+ _sender.getLastReply());
+ EXPECT_FALSE(op->any_replicas_failed());
+ EXPECT_FALSE(last_reply_had_consistent_replicas());
+ EXPECT_FALSE(last_reply_has_document());
+ EXPECT_EQ(replica_of(api::Timestamp(500), bucketId, 2, true, false), *op->newest_replica());
}
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
index 41260115297..44148bdbf25 100644
--- a/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/getoperation.cpp
@@ -112,6 +112,9 @@ GetOperation::sendForChecksum(DistributorStripeMessageSender& sender, const docu
_msg->getFieldSet(), _msg->getBeforeTimestamp());
copyMessageSettings(*_msg, *command);
command->set_internal_read_consistency(_desired_read_consistency);
+ if (_msg->has_condition()) {
+ command->set_condition(_msg->condition());
+ }
LOG(spam, "Sending %s to node %d", command->toString(true).c_str(), res[best].copy.getNode());
@@ -175,9 +178,9 @@ GetOperation::onReceive(DistributorStripeMessageSender& sender, const std::share
if (!_newest_replica.has_value() || getreply->getLastModifiedTimestamp() > _newest_replica->timestamp) {
_returnCode = getreply->getResult();
assert(response.second[i].to_node != UINT16_MAX);
- _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id,
- send_state.to_node, getreply->is_tombstone());
- _doc = getreply->getDocument(); // May be empty (tombstones).
+ _newest_replica = NewestReplica::of(getreply->getLastModifiedTimestamp(), bucket_id, send_state.to_node,
+ getreply->is_tombstone(), getreply->condition_matched());
+ _doc = getreply->getDocument(); // May be empty (tombstones or metadata-only).
}
} else {
_any_replicas_failed = true;
diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
index 4632501d59b..cfc03882ed3 100644
--- a/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
+++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.cpp
@@ -9,6 +9,7 @@ std::ostream& operator<<(std::ostream& os, const NewestReplica& nr) {
<< ", bucket_id " << nr.bucket_id
<< ", node " << nr.node
<< ", is_tombstone " << nr.is_tombstone
+ << ", condition_matched " << nr.condition_matched
<< ')';
return os;
}
diff --git a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
index 5fcb719aee2..94274747f30 100644
--- a/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
+++ b/storage/src/vespa/storage/distributor/operations/external/newest_replica.h
@@ -18,23 +18,26 @@ struct NewestReplica {
document::BucketId bucket_id;
uint16_t node {UINT16_MAX};
bool is_tombstone {false};
+ bool condition_matched {false}; // Only relevant if a condition was initially sent
static NewestReplica of(api::Timestamp timestamp,
const document::BucketId& bucket_id,
uint16_t node,
- bool is_tombstone) noexcept {
- return {timestamp, bucket_id, node, is_tombstone};
+ bool is_tombstone,
+ bool condition_matched) noexcept {
+ return {timestamp, bucket_id, node, is_tombstone, condition_matched};
}
static NewestReplica make_empty() {
- return {api::Timestamp(0), document::BucketId(), 0, false};
+ return {api::Timestamp(0), document::BucketId(), 0, false, false};
}
bool operator==(const NewestReplica& rhs) const noexcept {
return ((timestamp == rhs.timestamp) &&
(bucket_id == rhs.bucket_id) &&
(node == rhs.node) &&
- (is_tombstone == rhs.is_tombstone));
+ (is_tombstone == rhs.is_tombstone) &&
+ (condition_matched == rhs.condition_matched));
}
bool operator!=(const NewestReplica& rhs) const noexcept {
return !(*this == rhs);
diff --git a/storage/src/vespa/storageapi/message/persistence.cpp b/storage/src/vespa/storageapi/message/persistence.cpp
index 1b09639fd9b..a8fa9a0bba1 100644
--- a/storage/src/vespa/storageapi/message/persistence.cpp
+++ b/storage/src/vespa/storageapi/message/persistence.cpp
@@ -202,7 +202,11 @@ GetCommand::getSummary() const
{
vespalib::asciistream stream;
stream << "Get(BucketId(" << vespalib::hex << getBucketId().getId() << "), " << _docId.toString()
- << ", beforetimestamp " << vespalib::dec << _beforeTimestamp << ')';
+ << ", beforetimestamp " << vespalib::dec << _beforeTimestamp;
+ if (has_condition()) {
+ stream << ", condition " << condition().getSelection();
+ }
+ stream << ')';
return stream.str();
}
@@ -211,7 +215,11 @@ GetCommand::getSummary() const
void
GetCommand::print(std::ostream& out, bool verbose, const std::string& indent) const
{
- out << "Get(" << getBucketId() << ", " << _docId << ")";
+ out << "Get(" << getBucketId() << ", " << _docId;
+ if (has_condition()) {
+ out << ", condition " << condition().getSelection();
+ }
+ out << ")";
if (verbose) {
out << " : ";
BucketCommand::print(out, verbose, indent);