diff options
author | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-02 16:34:33 +0000 |
---|---|---|
committer | Tor Brede Vekterli <vekterli@verizonmedia.com> | 2020-12-02 16:34:33 +0000 |
commit | c56dc531930ed27eadd347646b2ab3ee18cd8e15 (patch) | |
tree | 7813d0010d0922628428e6288df85327fba703a8 | |
parent | 966bf9fc9c422638787c11dee6e74596d2acd699 (diff) |
Test edge case where a bucket is removed before starting deferred visitor
-rw-r--r-- | storage/src/tests/distributor/distributortestutil.h | 3 | ||||
-rw-r--r-- | storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp | 99 |
2 files changed, 76 insertions, 26 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h index 3dc71bcb433..a6bd9d5d84c 100644 --- a/storage/src/tests/distributor/distributortestutil.h +++ b/storage/src/tests/distributor/distributortestutil.h @@ -175,6 +175,9 @@ public: BucketDatabase::Entry getBucket(const document::BucketId& bId) const; std::vector<document::BucketSpace> getBucketSpaces() const; + + DistributorMessageSenderStub& sender() noexcept { return _sender; } + const DistributorMessageSenderStub& sender() const noexcept { return _sender; } protected: vdstestlib::DirConfig _config; std::unique_ptr<TestDistributorApp> _node; diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp index 76112b1c729..daa2ca94bb3 100644 --- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp +++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp @@ -19,6 +19,14 @@ using document::BucketId; namespace storage::distributor { +namespace { + +Bucket default_bucket(BucketId id) { + return Bucket(document::FixedBucketSpaces::default_space(), id); +} + +} + struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { document::TestDocMan _test_doc_man; VisitorOperation::Config _default_config; @@ -46,10 +54,6 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil { close(); } - static Bucket default_bucket(BucketId id) { - return Bucket(document::FixedBucketSpaces::default_space(), id); - } - std::shared_ptr<VisitorOperation> create_nested_visitor_op(bool valid_command = true) { auto cmd = std::make_shared<api::CreateVisitorCommand>( document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", ""); @@ -88,37 +92,80 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true)); } +namespace { + +struct ConcurrentMutationFixture { + ReadForWriteVisitorOperationStarterTest& _test; + std::shared_ptr<api::StorageCommand> _mutation; + + explicit ConcurrentMutationFixture(ReadForWriteVisitorOperationStarterTest& test) : _test(test) {} + + void block_bucket_with_mutation() { + // Pending mutating op to same bucket, prevents visitor from starting + auto update = std::make_shared<document::DocumentUpdate>( + _test._test_doc_man.getTypeRepo(), + *_test._test_doc_man.getTypeRepo().getDocumentType("testdoctype1"), + document::DocumentId("id::testdoctype1:n=4:foo")); + auto update_cmd = std::make_shared<api::UpdateCommand>( + default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0)); + + Operation::SP mutating_op; + _test.getExternalOperationHandler().handleMessage(update_cmd, mutating_op); + ASSERT_TRUE(mutating_op); + _test._op_owner->start(mutating_op, OperationStarter::Priority(120)); + ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0", + _test.sender().getCommands(true, true)); + _mutation = _test.sender().command(0); + // Since pending message tracking normally happens in the distributor itself during sendUp, + // we have to emulate this and explicitly insert the sent message into the pending mapping. + _test.getDistributor().getPendingMessageTracker().insert(_mutation); + } + + void unblock_bucket() { + // Pretend update operation completed + auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply()); + _test.getDistributor().getPendingMessageTracker().reply(*update_reply); + _test._op_owner->handleReply(update_reply); + } +}; + +} + TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_start_deferred_if_pending_ops_to_bucket) { + ConcurrentMutationFixture f(*this); auto op = create_rfw_op(create_nested_visitor_op(true)); - // Pending mutating op to same bucket, prevents visitor from starting - auto update = std::make_shared<document::DocumentUpdate>( - _test_doc_man.getTypeRepo(), - *_test_doc_man.getTypeRepo().getDocumentType("testdoctype1"), - document::DocumentId("id::testdoctype1:n=4:foo")); - auto update_cmd = std::make_shared<api::UpdateCommand>( - default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0)); - - Operation::SP mutating_op; - getExternalOperationHandler().handleMessage(update_cmd, mutating_op); - ASSERT_TRUE(mutating_op); - _op_owner->start(mutating_op, OperationStarter::Priority(120)); - ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0", - _sender.getCommands(true, true)); - // Since pending message tracking normally happens in the distributor itself during sendUp, - // we have to emulate this and explicitly insert the sent message into the pending mapping. - getDistributor().getPendingMessageTracker().insert(_sender.command(0)); + ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation()); _op_owner->start(op, OperationStarter::Priority(120)); // Nothing started yet ASSERT_EQ("", _sender.getCommands(true, false, 1)); - - // Pretend update operation completed - auto update_reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply()); - getDistributor().getPendingMessageTracker().reply(*update_reply); - _op_owner->handleReply(update_reply); + ASSERT_NO_FATAL_FAILURE(f.unblock_bucket()); // Visitor should now be started! ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true, false, 1)); } +TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_bounced_if_bucket_removed_from_db_before_deferred_start) { + ConcurrentMutationFixture f(*this); + auto op = create_rfw_op(create_nested_visitor_op(true)); + ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation()); + + _op_owner->start(op, OperationStarter::Priority(120)); + // Nothing started yet + ASSERT_EQ("", _sender.getCommands(true, false, 1)); + + // Simulate that ownership of bucket has changed, or replica has gone down. + removeFromBucketDB(_sub_bucket); + ASSERT_NO_FATAL_FAILURE(f.unblock_bucket()); + + // No visitor should be sent to the content node + ASSERT_EQ("", _sender.getCommands(true, false, 1)); + // Instead, we should get a "bucket not found" transient error bounce back to the client. + EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) " + "ReturnCode(BUCKET_NOT_FOUND)," + "UpdateReply(id::testdoctype1:n=4:foo, BucketId(0x0000000000000000), " + "timestamp 1, timestamp of updated doc: 0) ReturnCode(NONE)", + _sender.getReplies(false, true)); +} + } |