diff options
Diffstat (limited to 'storage/src/tests/distributor/twophaseupdateoperationtest.cpp')
-rw-r--r-- | storage/src/tests/distributor/twophaseupdateoperationtest.cpp | 169 |
1 files changed, 144 insertions, 25 deletions
diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index da32225cde3..1907335545a 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -4,16 +4,16 @@ #include <vespa/config/helper/configgetter.h> #include <vespa/document/base/testdocrepo.h> #include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/update/arithmeticvalueupdate.h> -#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> +#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storageapi/message/persistence.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> #include <gmock/gmock.h> namespace storage::distributor { @@ -30,8 +30,9 @@ using namespace ::testing; struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { document::TestDocRepo _testRepo; std::shared_ptr<const DocumentTypeRepo> _repo; - const DocumentType* _doc_type; + const DocumentType* _doc_type{nullptr}; DistributorMessageSenderStub _sender; + BucketId _bucket_id{0x400000000000cac4}; TwoPhaseUpdateOperationTest(); ~TwoPhaseUpdateOperationTest() override; @@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { void checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const; - std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); + static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); void SetUp() override { _repo = _testRepo.getTypeRepoSp(); @@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { close(); } - void replyToMessage(Operation& callback, - DistributorMessageSenderStub& sender, - uint32_t index, - uint64_t oldTimestamp, - api::ReturnCode::Result result = api::ReturnCode::OK); + static void replyToMessage( + Operation& callback, + DistributorMessageSenderStub& sender, + uint32_t index, + uint64_t oldTimestamp, + api::ReturnCode::Result result = api::ReturnCode::OK); - void replyToPut( + static void replyToPut( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void replyToCreateBucket( + static void replyToCreateBucket( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void reply_to_metadata_get( + static void reply_to_metadata_get( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& trace_msg = ""); - void reply_to_get_with_tombstone( + static void reply_to_get_with_tombstone( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { Timestamp highest_get_timestamp, Timestamp expected_response_timestamp); - std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { - setup_stripe(2, 2, "storage:2 distributor:1"); + void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas); + + void enable_3phase_updates(bool enable = true) { auto cfg = make_config(); - cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase); + cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable); configure_stripe(cfg); + } + + std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(enable_3phase); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. cb->start(_sender); return cb; @@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<PutCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); + dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5)); if (!traceMsg.empty()) { MBUS_TRACE(reply->getTrace(), 1, traceMsg); } - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, update->setCreateIfNonExistent(options._createIfNonExistent); document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId()); + assert(id == _bucket_id); document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId()); if (bucketState.length()) { @@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste _sender.getLastReply(true)); } +TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=1/2/3"); + op->start(_sender); + + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); + + replyToMessage(*op, _sender, 0, 110); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, _sender, 1, 110); + + // Client operation itself should return success since the update went through on all replica nodes + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 110) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + void TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const @@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) { EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1); } +void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); + op->start(_sender); + + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*op, _sender, 0, 70); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO custom cancellation failure metric? +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true); +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false); +} + TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { setup_stripe(2, 2, "storage:2 distributor:1"); // Create update for wrong doctype which will fail the update. @@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) { EXPECT_EQ(1, m.ok.getValue()); } +TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 2, 2000U); + ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO cancellation metrics? +} + +TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + replyToGet(*op, _sender, 2, 2000U); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0}); + op->cancel(_sender, CancelScope::of_node_subset({0})); + + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3)); + replyToPut(*op, _sender, 3); + replyToPut(*op, _sender, 4); + + // Update itself is ACKed + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 2000) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + // But cancelled replicas are not reintroduced into the bucket DB + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + + TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) { auto cb = set_up_2_inconsistent_replicas_and_start_update(); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more. cb->start(_sender); // Add new replica to deterministic test bucket after gets have been sent - BucketId bucket(0x400000000000cac4); // Always the same in the test. - addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3"); Timestamp old_timestamp = 500; ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); |