aboutsummaryrefslogtreecommitdiffstats
path: root/storage/src/tests/distributor/putoperationtest.cpp
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@yahooinc.com>2023-07-18 13:12:29 +0000
committerTor Brede Vekterli <vekterli@yahooinc.com>2023-08-18 13:32:45 +0000
commit538d79ce480069322c04ca87436b80e0146e2dfa (patch)
tree4ef0e03edced933286650b7e4e7ed713617c80c6 /storage/src/tests/distributor/putoperationtest.cpp
parent5a2dd2d26e3c518e7c54ccd29c9be27e8accb7df (diff)
Implement edge-triggered distributor operation cancelling
Will be used for ensuring active operations do not mutate the bucket database upon completion with stale entries for buckets that may no longer be valid for that distributor to handle. Removes the need for today's "always-on" implicit checks for node state and bucket ownership upon every single DB update (which is potentially ABA-susceptible). Moving to edge-triggering is intentionally done to avoid ABA problems. Cancellation cases are: * Distributor ownership of bucket changed * Subset of target nodes has become unavailable Note: cancellation is not yet wired in; this code is cold.
Diffstat (limited to 'storage/src/tests/distributor/putoperationtest.cpp')
-rw-r--r--storage/src/tests/distributor/putoperationtest.cpp155
1 files changed, 154 insertions, 1 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp
index 76b6741442e..431b7595571 100644
--- a/storage/src/tests/distributor/putoperationtest.cpp
+++ b/storage/src/tests/distributor/putoperationtest.cpp
@@ -6,6 +6,7 @@
#include <vespa/storage/distributor/top_level_distributor.h>
#include <vespa/storage/distributor/distributor_bucket_space.h>
#include <vespa/storage/distributor/distributor_stripe.h>
+#include <vespa/storage/distributor/operations/cancel_scope.h>
#include <vespa/storage/distributor/operations/external/putoperation.h>
#include <vespa/storageapi/message/bucket.h>
#include <vespa/storageapi/message/persistence.h>
@@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R
_sender.getCommands(true, true, 4));
}
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_ownership_change());
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+
+ // DB is not touched (note: normally node 1 would be removed at the cancel-edge).
+ ASSERT_EQ("BucketId(0x4000000000008f09) : "
+ "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), "
+ "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)",
+ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
+ // No new requests sent
+ ASSERT_EQ("", _sender.getCommands(true, true, 4));
+}
+
+TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) {
+ setup_stripe(2, 2, "distributor:1 storage:2");
+
+ auto doc = createDummyDocument("test", "test");
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true));
+
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1
+ sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0
+
+ // Bucket info recheck only sent to node 1, as it's not cancelled
+ ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1",
+ _sender.getCommands(true, true, 4));
+}
+
TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) {
setup_stripe(1, 1, "storage:1 distributor:1");
auto cfg = make_config();
@@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck
_sender.getLastReply());
}
+TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) {
+ setup_stripe(2, 2, "storage:2 distributor:1");
+ createAndSendSampleDocument(TIMEOUT);
+
+ ASSERT_EQ("Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 0,"
+ "Put(BucketId(0x4000000000001dd4), "
+ "id:test:testdoctype1::, timestamp 100, size 45) => 1",
+ _sender.getCommands(true, true));
+
+ op->cancel(_sender, CancelScope::of_ownership_change());
+
+ sendReply(0);
+ sendReply(1);
+
+ ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), "
+ "timestamp 100) ReturnCode(NONE)",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, storage_failed) {
setup_stripe(2, 1, "storage:1 distributor:1");
@@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
{
std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
- std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release());
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
ASSERT_TRUE(sreply);
sreply->remapBucketId(document::BucketId(17, 13));
@@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) {
dumpBucket(document::BucketId(17, 13)));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) {
setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
@@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_
dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId())));
}
+// TODO make this redundant through operation cancelling
TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
@@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending
// TODO probably also do this for updates and removes
// TODO consider if we should use the pending state verbatim for computing targets if it exists
+// TODO make this redundant through operation cancelling
+// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this??
TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) {
setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3");
auto doc = createDummyDocument("test", "test");
@@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state
_sender.getLastReply(true));
}
+TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2});
+ op->cancel(_sender, CancelScope::of_ownership_change());
+
+ // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless
+ // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path.
+ {
+ std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0);
+ std::shared_ptr<api::StorageReply> reply(msg2->makeReply());
+ auto* sreply = dynamic_cast<api::PutReply*>(reply.get());
+ ASSERT_TRUE(sreply);
+ sreply->remapBucketId(remap_bucket);
+ sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5));
+ op->receive(_sender, reply);
+ }
+
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9));
+
+ EXPECT_EQ("NONEXISTING", dumpBucket(bucket));
+ EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket));
+}
+
+TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) {
+ setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3");
+
+ auto doc = createDummyDocument("test", "uri");
+ auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId());
+ addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t");
+
+ sendPut(createPut(doc));
+ ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true));
+
+ // Simulate nodes 0 and 2 going down
+ operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2});
+ // Cancelling shall be cumulative
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->cancel(_sender, CancelScope::of_node_subset({2}));
+
+ sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7));
+ sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8));
+ sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7));
+
+ EXPECT_EQ("BucketId(0x4000000000000593) : "
+ "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)",
+ dumpBucket(bucket));
+}
+
TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) {
setup_stripe(Redundancy(2), NodeCount(2),
"distributor:1 storage:2 .0.s:r .1.s:r");
@@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) {
_sender.getLastReply());
}
+TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ op->cancel(_sender, CancelScope::of_ownership_change());
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
+TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) {
+ ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes());
+
+ op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false));
+ // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill
+ // the write phase for all replicas.
+ op->cancel(_sender, CancelScope::of_node_subset({0}));
+ op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false));
+
+ ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true));
+ ASSERT_EQ("PutReply(id:test:testdoctype1::test, "
+ "BucketId(0x0000000000000000), timestamp 100) "
+ "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: "
+ "Operation has been cancelled (likely due to a cluster state change))",
+ _sender.getLastReply());
+}
+
TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) {
setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1");
auto doc = createDummyDocument("test", "test");