diff options
author | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-07-18 13:12:29 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@yahooinc.com> | 2023-08-18 13:32:45 +0000 |
commit | 538d79ce480069322c04ca87436b80e0146e2dfa (patch) | |
tree | 4ef0e03edced933286650b7e4e7ed713617c80c6 /storage/src/tests/distributor/putoperationtest.cpp | |
parent | 5a2dd2d26e3c518e7c54ccd29c9be27e8accb7df (diff) |
Implement edge-triggered distributor operation cancelling
Will be used for ensuring active operations do not mutate
the bucket database upon completion with stale entries for
buckets that may no longer be valid for that distributor to
handle. Removes the need for today's "always-on" implicit
checks for node state and bucket ownership upon every single
DB update (which is potentially ABA-susceptible). Moving to
edge-triggering is intentionally done to avoid ABA problems.
Cancellation cases are:
* Distributor ownership of bucket changed
* Subset of target nodes has become unavailable
Note: cancellation is not yet wired in; this code is cold.
Diffstat (limited to 'storage/src/tests/distributor/putoperationtest.cpp')
-rw-r--r-- | storage/src/tests/distributor/putoperationtest.cpp | 155 |
1 files changed, 154 insertions, 1 deletions
diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 76b6741442e..431b7595571 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> @@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R _sender.getCommands(true, true, 4)); } +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_ownership_change()); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + + // DB is not touched (note: normally node 1 would be removed at the cancel-edge). + ASSERT_EQ("BucketId(0x4000000000008f09) : " + "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), " + "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)", + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); + // No new requests sent + ASSERT_EQ("", _sender.getCommands(true, true, 4)); +} + +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_node_subset({0})); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0 + + // Bucket info recheck only sent to node 1, as it's not cancelled + ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1", + _sender.getCommands(true, true, 4)); +} + TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cfg = make_config(); @@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck _sender.getLastReply()); } +TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) { + setup_stripe(2, 2, "storage:2 distributor:1"); + createAndSendSampleDocument(TIMEOUT); + + ASSERT_EQ("Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 0," + "Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 1", + _sender.getCommands(true, true)); + + op->cancel(_sender, CancelScope::of_ownership_change()); + + sendReply(0); + sendReply(1); + + ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(NONE)", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, storage_failed) { setup_stripe(2, 1, "storage:1 distributor:1"); @@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { { std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); - std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); ASSERT_TRUE(sreply); sreply->remapBucketId(document::BucketId(17, 13)); @@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { dumpBucket(document::BucketId(17, 13))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) { setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); @@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); @@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending // TODO probably also do this for updates and removes // TODO consider if we should use the pending state verbatim for computing targets if it exists +// TODO make this redundant through operation cancelling +// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this?? TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "test"); @@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state _sender.getLastReply(true)); } +TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2}); + op->cancel(_sender, CancelScope::of_ownership_change()); + + // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless + // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path. + { + std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); + auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); + ASSERT_TRUE(sreply); + sreply->remapBucketId(remap_bucket); + sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5)); + op->receive(_sender, reply); + } + + sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9)); + + EXPECT_EQ("NONEXISTING", dumpBucket(bucket)); + EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket)); +} + +TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7)); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)", + dumpBucket(bucket)); +} + TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) { setup_stripe(Redundancy(2), NodeCount(2), "distributor:1 storage:2 .0.s:r .1.s:r"); @@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) { _sender.getLastReply()); } +TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + op->cancel(_sender, CancelScope::of_ownership_change()); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill + // the write phase for all replicas. + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) { setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1"); auto doc = createDummyDocument("test", "test"); |