aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-02 16:34:33 +0000
committerTor Brede Vekterli <vekterli@verizonmedia.com>2020-12-02 16:34:33 +0000
commitc56dc531930ed27eadd347646b2ab3ee18cd8e15 (patch)
tree7813d0010d0922628428e6288df85327fba703a8
parent966bf9fc9c422638787c11dee6e74596d2acd699 (diff)
Test edge case where a bucket is removed before starting deferred visitor
-rw-r--r--storage/src/tests/distributor/distributortestutil.h3
-rw-r--r--storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp99
2 files changed, 76 insertions, 26 deletions
diff --git a/storage/src/tests/distributor/distributortestutil.h b/storage/src/tests/distributor/distributortestutil.h
index 3dc71bcb433..a6bd9d5d84c 100644
--- a/storage/src/tests/distributor/distributortestutil.h
+++ b/storage/src/tests/distributor/distributortestutil.h
@@ -175,6 +175,9 @@ public:
BucketDatabase::Entry getBucket(const document::BucketId& bId) const;
std::vector<document::BucketSpace> getBucketSpaces() const;
+
+ DistributorMessageSenderStub& sender() noexcept { return _sender; }
+ const DistributorMessageSenderStub& sender() const noexcept { return _sender; }
protected:
vdstestlib::DirConfig _config;
std::unique_ptr<TestDistributorApp> _node;
diff --git a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
index 76112b1c729..daa2ca94bb3 100644
--- a/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
+++ b/storage/src/tests/distributor/read_for_write_visitor_operation_test.cpp
@@ -19,6 +19,14 @@ using document::BucketId;
namespace storage::distributor {
+namespace {
+
+Bucket default_bucket(BucketId id) {
+ return Bucket(document::FixedBucketSpaces::default_space(), id);
+}
+
+}
+
struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
document::TestDocMan _test_doc_man;
VisitorOperation::Config _default_config;
@@ -46,10 +54,6 @@ struct ReadForWriteVisitorOperationStarterTest : Test, DistributorTestUtil {
close();
}
- static Bucket default_bucket(BucketId id) {
- return Bucket(document::FixedBucketSpaces::default_space(), id);
- }
-
std::shared_ptr<VisitorOperation> create_nested_visitor_op(bool valid_command = true) {
auto cmd = std::make_shared<api::CreateVisitorCommand>(
document::FixedBucketSpaces::default_space(), "reindexingvisitor", "foo", "");
@@ -88,37 +92,80 @@ TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_immediately_started_if_n
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true));
}
+namespace {
+
+struct ConcurrentMutationFixture {
+ ReadForWriteVisitorOperationStarterTest& _test;
+ std::shared_ptr<api::StorageCommand> _mutation;
+
+ explicit ConcurrentMutationFixture(ReadForWriteVisitorOperationStarterTest& test) : _test(test) {}
+
+ void block_bucket_with_mutation() {
+ // Pending mutating op to same bucket, prevents visitor from starting
+ auto update = std::make_shared<document::DocumentUpdate>(
+ _test._test_doc_man.getTypeRepo(),
+ *_test._test_doc_man.getTypeRepo().getDocumentType("testdoctype1"),
+ document::DocumentId("id::testdoctype1:n=4:foo"));
+ auto update_cmd = std::make_shared<api::UpdateCommand>(
+ default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0));
+
+ Operation::SP mutating_op;
+ _test.getExternalOperationHandler().handleMessage(update_cmd, mutating_op);
+ ASSERT_TRUE(mutating_op);
+ _test._op_owner->start(mutating_op, OperationStarter::Priority(120));
+ ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0",
+ _test.sender().getCommands(true, true));
+ _mutation = _test.sender().command(0);
+ // Since pending message tracking normally happens in the distributor itself during sendUp,
+ // we have to emulate this and explicitly insert the sent message into the pending mapping.
+ _test.getDistributor().getPendingMessageTracker().insert(_mutation);
+ }
+
+ void unblock_bucket() {
+ // Pretend update operation completed
+ auto update_reply = std::shared_ptr<api::StorageReply>(_mutation->makeReply());
+ _test.getDistributor().getPendingMessageTracker().reply(*update_reply);
+ _test._op_owner->handleReply(update_reply);
+ }
+};
+
+}
+
TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_start_deferred_if_pending_ops_to_bucket) {
+ ConcurrentMutationFixture f(*this);
auto op = create_rfw_op(create_nested_visitor_op(true));
- // Pending mutating op to same bucket, prevents visitor from starting
- auto update = std::make_shared<document::DocumentUpdate>(
- _test_doc_man.getTypeRepo(),
- *_test_doc_man.getTypeRepo().getDocumentType("testdoctype1"),
- document::DocumentId("id::testdoctype1:n=4:foo"));
- auto update_cmd = std::make_shared<api::UpdateCommand>(
- default_bucket(document::BucketId(0)), std::move(update), api::Timestamp(0));
-
- Operation::SP mutating_op;
- getExternalOperationHandler().handleMessage(update_cmd, mutating_op);
- ASSERT_TRUE(mutating_op);
- _op_owner->start(mutating_op, OperationStarter::Priority(120));
- ASSERT_EQ("Update(BucketId(0x4400000000000004), id::testdoctype1:n=4:foo, timestamp 1) => 0",
- _sender.getCommands(true, true));
- // Since pending message tracking normally happens in the distributor itself during sendUp,
- // we have to emulate this and explicitly insert the sent message into the pending mapping.
- getDistributor().getPendingMessageTracker().insert(_sender.command(0));
+ ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation());
_op_owner->start(op, OperationStarter::Priority(120));
// Nothing started yet
ASSERT_EQ("", _sender.getCommands(true, false, 1));
-
- // Pretend update operation completed
- auto update_reply = std::shared_ptr<api::StorageReply>(_sender.command(0)->makeReply());
- getDistributor().getPendingMessageTracker().reply(*update_reply);
- _op_owner->handleReply(update_reply);
+ ASSERT_NO_FATAL_FAILURE(f.unblock_bucket());
// Visitor should now be started!
ASSERT_EQ("Visitor Create => 0", _sender.getCommands(true, false, 1));
}
+TEST_F(ReadForWriteVisitorOperationStarterTest, visitor_bounced_if_bucket_removed_from_db_before_deferred_start) {
+ ConcurrentMutationFixture f(*this);
+ auto op = create_rfw_op(create_nested_visitor_op(true));
+ ASSERT_NO_FATAL_FAILURE(f.block_bucket_with_mutation());
+
+ _op_owner->start(op, OperationStarter::Priority(120));
+ // Nothing started yet
+ ASSERT_EQ("", _sender.getCommands(true, false, 1));
+
+ // Simulate that ownership of bucket has changed, or replica has gone down.
+ removeFromBucketDB(_sub_bucket);
+ ASSERT_NO_FATAL_FAILURE(f.unblock_bucket());
+
+ // No visitor should be sent to the content node
+ ASSERT_EQ("", _sender.getCommands(true, false, 1));
+ // Instead, we should get a "bucket not found" transient error bounce back to the client.
+ EXPECT_EQ("CreateVisitorReply(last=BucketId(0x0000000000000000)) "
+ "ReturnCode(BUCKET_NOT_FOUND),"
+ "UpdateReply(id::testdoctype1:n=4:foo, BucketId(0x0000000000000000), "
+ "timestamp 1, timestamp of updated doc: 0) ReturnCode(NONE)",
+ _sender.getReplies(false, true));
+}
+
}