diff options
Diffstat (limited to 'storage/src/tests/distributor')
6 files changed, 447 insertions, 34 deletions
diff --git a/storage/src/tests/distributor/check_condition_test.cpp b/storage/src/tests/distributor/check_condition_test.cpp index 757a9329ea6..617401dd271 100644 --- a/storage/src/tests/distributor/check_condition_test.cpp +++ b/storage/src/tests/distributor/check_condition_test.cpp @@ -5,6 +5,7 @@ #include <vespa/document/fieldset/fieldsets.h> #include <vespa/documentapi/messagebus/messages/testandsetcondition.h> #include <vespa/storage/distributor/node_supported_features.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/check_condition.h> #include <vespa/storage/distributor/persistence_operation_metric_set.h> #include <vespa/storageapi/message/persistence.h> @@ -227,6 +228,20 @@ TEST_F(CheckConditionTest, failed_gets_completes_check_with_error_outcome) { }); } +TEST_F(CheckConditionTest, check_fails_if_condition_explicitly_cancelled) { + test_cond_with_2_gets_sent([&](auto& cond) { + cond.handle_reply(_sender, make_matched_reply(0)); + cond.cancel(_sender, CancelScope::of_fully_cancelled()); + cond.handle_reply(_sender, make_matched_reply(1)); + }, [&](auto& outcome) { + EXPECT_FALSE(outcome.matched_condition()); + EXPECT_FALSE(outcome.not_found()); + EXPECT_TRUE(outcome.failed()); + EXPECT_EQ(outcome.error_code().getResult(), api::ReturnCode::ABORTED); + }); +} + +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_completion) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); @@ -242,6 +257,7 @@ TEST_F(CheckConditionTest, check_fails_if_replica_set_changed_between_start_and_ }); } +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_pending_transition_case) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); @@ -255,6 +271,7 @@ TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start }); } +// TODO deprecate in favor of cancelling TEST_F(CheckConditionTest, check_fails_if_bucket_ownership_changed_between_start_and_completion_completed_transition_case) { test_cond_with_2_gets_sent([&](auto& cond) { cond.handle_reply(_sender, make_matched_reply(0)); diff --git a/storage/src/tests/distributor/garbagecollectiontest.cpp b/storage/src/tests/distributor/garbagecollectiontest.cpp index 9b5056f2066..b1cf1cbc636 100644 --- a/storage/src/tests/distributor/garbagecollectiontest.cpp +++ b/storage/src/tests/distributor/garbagecollectiontest.cpp @@ -113,9 +113,8 @@ struct GarbageCollectionOperationTest : Test, DistributorStripeTestUtil { ASSERT_EQ(entry->getNodeCount(), info.size()); EXPECT_EQ(entry->getLastGarbageCollectionTime(), last_gc_time); for (size_t i = 0; i < info.size(); ++i) { - EXPECT_EQ(info[i], entry->getNode(i)->getBucketInfo()) - << "Mismatching info for node " << i << ": " << info[i] << " vs " - << entry->getNode(i)->getBucketInfo(); + auto& node = entry->getNodeRef(i); + EXPECT_EQ(info[i], node.getBucketInfo()) << "Mismatching DB bucket info for node " << node.getNode(); } } @@ -172,6 +171,51 @@ TEST_F(GarbageCollectionOperationTest, replica_bucket_info_not_added_to_db_until EXPECT_EQ(70u, gc_removed_documents_metric()); // Use max of received metrics } +TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_if_operation_fully_canceled) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged. Note that in a real scenario, the DB entry will have been removed + // as part of the ownership change, but there are already non-cancellation behaviors that + // avoid creating buckets from scratch in the DB if they do not exist, so just checking to + // see if the bucket exists or not risks hiding missing cancellation edge handling. + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); + // However, we still update our metrics if we _did_ remove documents on one or more nodes + EXPECT_EQ(70u, gc_removed_documents_metric()); +} + +TEST_F(GarbageCollectionOperationTest, no_replica_bucket_info_added_to_db_for_cancelled_node) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_node_subset({0})); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged for node 0, changed for node 1 + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(4567, 90, 500)}, 34)); +} + +TEST_F(GarbageCollectionOperationTest, node_cancellation_is_cumulative) { + auto op = create_op(); + op->start(_sender); + ASSERT_EQ(2, _sender.commands().size()); + + reply_to_nth_request(*op, 0, 1234, 70); + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({1})); + reply_to_nth_request(*op, 1, 4567, 60); + + // DB state is unchanged for both nodes + ASSERT_NO_FATAL_FAILURE(assert_bucket_db_contains({api::BucketInfo(250, 50, 300), api::BucketInfo(250, 50, 300)}, 0)); +} + TEST_F(GarbageCollectionOperationTest, gc_bucket_info_does_not_overwrite_later_sequenced_bucket_info_writes) { auto op = create_op(); op->start(_sender); @@ -363,6 +407,16 @@ TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_bucket_in receive_phase1_replies_and_assert_no_phase_2_started(); } +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_fully_cancelled_between_phases) { + _op->cancel(_sender, CancelScope::of_fully_cancelled()); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + +TEST_F(GarbageCollectionOperationPhase1FailureTest, no_second_phase_if_operation_partially_cancelled_between_phases) { + _op->cancel(_sender, CancelScope::of_node_subset({0})); + receive_phase1_replies_and_assert_no_phase_2_started(); +} + TEST_F(GarbageCollectionOperationTest, document_level_write_locks_are_checked_and_held_if_acquired) { enable_two_phase_gc(); auto op = create_op(); diff --git a/storage/src/tests/distributor/putoperationtest.cpp b/storage/src/tests/distributor/putoperationtest.cpp index 76b6741442e..ee87fe84df6 100644 --- a/storage/src/tests/distributor/putoperationtest.cpp +++ b/storage/src/tests/distributor/putoperationtest.cpp @@ -6,6 +6,7 @@ #include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_bucket_space.h> #include <vespa/storage/distributor/distributor_stripe.h> +#include <vespa/storage/distributor/operations/cancel_scope.h> #include <vespa/storage/distributor/operations/external/putoperation.h> #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> @@ -208,6 +209,43 @@ TEST_F(PutOperationTest, failed_CreateBucket_removes_replica_from_db_and_sends_R _sender.getCommands(true, true, 4)); } +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_if_op_fully_canceled) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_fully_cancelled()); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + + // DB is not touched (note: normally node 1 would be removed at the cancel-edge). + ASSERT_EQ("BucketId(0x4000000000008f09) : " + "node(idx=1,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=true,ready=false), " + "node(idx=0,crc=0x1,docs=0/0,bytes=0/0,trusted=true,active=false,ready=false)", + dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); + // No new requests sent + ASSERT_EQ("", _sender.getCommands(true, true, 4)); +} + +TEST_F(PutOperationTest, failed_CreateBucket_does_not_send_RequestBucketInfo_for_cancelled_nodes) { + setup_stripe(2, 2, "distributor:1 storage:2"); + + auto doc = createDummyDocument("test", "test"); + sendPut(createPut(doc)); + + ASSERT_EQ("Create bucket => 1,Create bucket => 0,Put => 1,Put => 0", _sender.getCommands(true)); + + op->cancel(_sender, CancelScope::of_node_subset({0})); + sendReply(0, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 1 + sendReply(1, api::ReturnCode::TIMEOUT, api::BucketInfo()); // CreateBucket to node 0 + + // Bucket info recheck only sent to node 1, as it's not cancelled + ASSERT_EQ("RequestBucketInfoCommand(1 buckets, super bucket BucketId(0x4000000000008f09). ) => 1", + _sender.getCommands(true, true, 4)); +} + TEST_F(PutOperationTest, send_inline_split_before_put_if_bucket_too_large) { setup_stripe(1, 1, "storage:1 distributor:1"); auto cfg = make_config(); @@ -272,6 +310,26 @@ TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_buck _sender.getLastReply()); } +TEST_F(PutOperationTest, return_success_if_op_acked_on_all_replicas_even_if_operation_cancelled) { + setup_stripe(2, 2, "storage:2 distributor:1"); + createAndSendSampleDocument(TIMEOUT); + + ASSERT_EQ("Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 0," + "Put(BucketId(0x4000000000001dd4), " + "id:test:testdoctype1::, timestamp 100, size 45) => 1", + _sender.getCommands(true, true)); + + op->cancel(_sender, CancelScope::of_fully_cancelled()); + + sendReply(0); + sendReply(1); + + ASSERT_EQ("PutReply(id:test:testdoctype1::, BucketId(0x0000000000000000), " + "timestamp 100) ReturnCode(NONE)", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, storage_failed) { setup_stripe(2, 1, "storage:1 distributor:1"); @@ -491,7 +549,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { { std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); - std::shared_ptr<api::StorageReply> reply(msg2->makeReply().release()); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); ASSERT_TRUE(sreply); sreply->remapBucketId(document::BucketId(17, 13)); @@ -511,6 +569,7 @@ TEST_F(PutOperationTest, update_correct_bucket_on_remapped_put) { dumpBucket(document::BucketId(17, 13))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_state) { setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); @@ -535,6 +594,7 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_active_ dumpBucket(operation_context().make_split_bit_constrained_bucket_id(doc->getId()))); } +// TODO make this redundant through operation cancelling TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); @@ -568,6 +628,8 @@ TEST_F(PutOperationTest, replica_not_resurrected_in_db_when_node_down_in_pending // TODO probably also do this for updates and removes // TODO consider if we should use the pending state verbatim for computing targets if it exists +// TODO make this redundant through operation cancelling +// ... actually; FIXME shouldn't the ExternalOperationHandler already cover this?? TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state) { setup_stripe(Redundancy(3), NodeCount(4), "version:1 distributor:1 storage:3"); auto doc = createDummyDocument("test", "test"); @@ -584,6 +646,65 @@ TEST_F(PutOperationTest, put_is_failed_with_busy_if_target_down_in_pending_state _sender.getLastReply(true)); } +TEST_F(PutOperationTest, db_not_updated_if_operation_cancelled_by_ownership_change) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + auto remap_bucket = BucketId(bucket.getUsedBits() + 1, bucket.getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 1, 2}); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + + // Normally DB updates triggered by replies don't _create_ buckets in the DB, unless + // they're remapped buckets. Use a remapping to ensure we hit a create-if-missing DB path. + { + std::shared_ptr<api::StorageCommand> msg2 = _sender.command(0); + std::shared_ptr<api::StorageReply> reply(msg2->makeReply()); + auto* sreply = dynamic_cast<api::PutReply*>(reply.get()); + ASSERT_TRUE(sreply); + sreply->remapBucketId(remap_bucket); + sreply->setBucketInfo(api::BucketInfo(1,2,3,4,5)); + op->receive(_sender, reply); + } + + sendReply(1, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(7, 8, 9)); + + EXPECT_EQ("NONEXISTING", dumpBucket(bucket)); + EXPECT_EQ("NONEXISTING", dumpBucket(remap_bucket)); +} + +TEST_F(PutOperationTest, individually_cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + auto doc = createDummyDocument("test", "uri"); + auto bucket = operation_context().make_split_bit_constrained_bucket_id(doc->getId()); + addNodesToBucketDB(bucket, "0=1/2/3/t,1=1/2/3/t,2=1/2/3/t"); + + sendPut(createPut(doc)); + ASSERT_EQ("Put => 1,Put => 2,Put => 0", _sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucket), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + sendReply(0, api::ReturnCode::OK, api::BucketInfo(5, 6, 7)); + sendReply(1, api::ReturnCode::OK, api::BucketInfo(6, 7, 8)); + sendReply(2, api::ReturnCode::OK, api::BucketInfo(9, 8, 7)); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=1,crc=0x5,docs=6/6,bytes=7/7,trusted=true,active=false,ready=false)", + dumpBucket(bucket)); +} + TEST_F(PutOperationTest, send_to_retired_nodes_if_no_up_nodes_available) { setup_stripe(Redundancy(2), NodeCount(2), "distributor:1 storage:2 .0.s:r .1.s:r"); @@ -761,6 +882,38 @@ TEST_F(PutOperationTest, failed_condition_probe_fails_op_with_returned_error) { _sender.getLastReply()); } +TEST_F(PutOperationTest, ownership_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(PutOperationTest, replica_subset_cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_put_with_2_inconsistent_replica_nodes()); + + op->receive(_sender, make_get_reply(*sent_get_command(0), 0, false, false)); + // 1 of 2 nodes; we still abort after the read phase since we cannot possibly fulfill + // the write phase for all replicas. + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->receive(_sender, make_get_reply(*sent_get_command(1), 0, false, false)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + ASSERT_EQ("PutReply(id:test:testdoctype1::test, " + "BucketId(0x0000000000000000), timestamp 100) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + TEST_F(PutOperationTest, create_flag_in_parent_put_is_propagated_to_sent_puts) { setup_stripe(Redundancy(2), NodeCount(2), "version:1 storage:2 distributor:1"); auto doc = createDummyDocument("test", "test"); diff --git a/storage/src/tests/distributor/removeoperationtest.cpp b/storage/src/tests/distributor/removeoperationtest.cpp index d169c80a95d..3fad2c194a2 100644 --- a/storage/src/tests/distributor/removeoperationtest.cpp +++ b/storage/src/tests/distributor/removeoperationtest.cpp @@ -68,6 +68,7 @@ struct RemoveOperationTest : Test, DistributorStripeTestUtil { std::unique_ptr<api::StorageReply> reply(removec->makeReply()); auto* removeR = dynamic_cast<api::RemoveReply*>(reply.get()); removeR->setOldTimestamp(oldTimestamp); + removeR->setBucketInfo(api::BucketInfo(1,2,3,4,5)); callback.onReceive(_sender, std::shared_ptr<api::StorageReply>(reply.release())); } @@ -307,6 +308,45 @@ TEST_F(ExtRemoveOperationTest, failed_condition_probe_fails_op_with_returned_err _sender.getLastReply()); } +// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops +// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests. + +TEST_F(ExtRemoveOperationTest, cancellation_during_condition_probe_fails_operation_on_probe_completion) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); + + reply_with(make_get_reply(0, 50, false, true)); + op->cancel(_sender, CancelScope::of_fully_cancelled()); + reply_with(make_get_reply(1, 50, false, true)); + + ASSERT_EQ("Get => 1,Get => 0", _sender.getCommands(true)); + EXPECT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, " + "timestamp 100, not found) " + "ReturnCode(ABORTED, Failed during write repair condition probe step. Reason: " + "Operation has been cancelled (likely due to a cluster state change))", + _sender.getLastReply()); +} + +TEST_F(ExtRemoveOperationTest, cancelled_nodes_are_not_updated_in_db) { + ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::CONSISTENT)); + ASSERT_EQ("Remove => 1,Remove => 0", _sender.getCommands(true)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(bucketId), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, 0, 50); + replyToMessage(*op, 1, 50); + + EXPECT_EQ("BucketId(0x4000000000000593) : " + "node(idx=0,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(bucketId)); + // Reply is still OK since the operation went through on the content nodes + ASSERT_EQ("RemoveReply(BucketId(0x0000000000000000), " + "id:test:test::uri, timestamp 100, removed doc from 50) ReturnCode(NONE)", + _sender.getLastReply()); + +} + TEST_F(ExtRemoveOperationTest, trace_is_propagated_from_condition_probe_gets_ok_probe_case) { ASSERT_NO_FATAL_FAILURE(set_up_tas_remove_with_2_nodes(ReplicaState::INCONSISTENT)); diff --git a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp index da32225cde3..1907335545a 100644 --- a/storage/src/tests/distributor/twophaseupdateoperationtest.cpp +++ b/storage/src/tests/distributor/twophaseupdateoperationtest.cpp @@ -4,16 +4,16 @@ #include <vespa/config/helper/configgetter.h> #include <vespa/document/base/testdocrepo.h> #include <vespa/document/fieldset/fieldsets.h> -#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/fieldvalue/intfieldvalue.h> +#include <vespa/document/repo/documenttyperepo.h> #include <vespa/document/test/make_document_bucket.h> #include <vespa/document/update/arithmeticvalueupdate.h> -#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storage/distributor/distributor_stripe.h> #include <vespa/storage/distributor/externaloperationhandler.h> #include <vespa/storage/distributor/operations/external/twophaseupdateoperation.h> +#include <vespa/storage/distributor/top_level_distributor.h> #include <vespa/storageapi/message/persistence.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> #include <gmock/gmock.h> namespace storage::distributor { @@ -30,8 +30,9 @@ using namespace ::testing; struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { document::TestDocRepo _testRepo; std::shared_ptr<const DocumentTypeRepo> _repo; - const DocumentType* _doc_type; + const DocumentType* _doc_type{nullptr}; DistributorMessageSenderStub _sender; + BucketId _bucket_id{0x400000000000cac4}; TwoPhaseUpdateOperationTest(); ~TwoPhaseUpdateOperationTest() override; @@ -39,7 +40,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { void checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const; - std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); + static std::string getUpdatedValueFromLastPut(DistributorMessageSenderStub&); void SetUp() override { _repo = _testRepo.getTypeRepoSp(); @@ -57,20 +58,21 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { close(); } - void replyToMessage(Operation& callback, - DistributorMessageSenderStub& sender, - uint32_t index, - uint64_t oldTimestamp, - api::ReturnCode::Result result = api::ReturnCode::OK); + static void replyToMessage( + Operation& callback, + DistributorMessageSenderStub& sender, + uint32_t index, + uint64_t oldTimestamp, + api::ReturnCode::Result result = api::ReturnCode::OK); - void replyToPut( + static void replyToPut( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void replyToCreateBucket( + static void replyToCreateBucket( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -85,7 +87,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& traceMsg = ""); - void reply_to_metadata_get( + static void reply_to_metadata_get( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -93,7 +95,7 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { api::ReturnCode::Result result = api::ReturnCode::OK, const std::string& trace_msg = ""); - void reply_to_get_with_tombstone( + static void reply_to_get_with_tombstone( Operation& callback, DistributorMessageSenderStub& sender, uint32_t index, @@ -148,11 +150,17 @@ struct TwoPhaseUpdateOperationTest : Test, DistributorStripeTestUtil { Timestamp highest_get_timestamp, Timestamp expected_response_timestamp); - std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { - setup_stripe(2, 2, "storage:2 distributor:1"); + void do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas); + + void enable_3phase_updates(bool enable = true) { auto cfg = make_config(); - cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable_3phase); + cfg->set_enable_metadata_only_fetch_phase_for_inconsistent_updates(enable); configure_stripe(cfg); + } + + std::shared_ptr<TwoPhaseUpdateOperation> set_up_2_inconsistent_replicas_and_start_update(bool enable_3phase = true) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(enable_3phase); auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // Inconsistent replicas. cb->start(_sender); return cb; @@ -199,13 +207,13 @@ TwoPhaseUpdateOperationTest::replyToPut( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<PutCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); + dynamic_cast<api::PutReply&>(*reply).setBucketInfo(api::BucketInfo(1,2,3,4,5)); if (!traceMsg.empty()) { MBUS_TRACE(reply->getTrace(), 1, traceMsg); } - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -217,10 +225,9 @@ TwoPhaseUpdateOperationTest::replyToCreateBucket( { std::shared_ptr<api::StorageMessage> msg2 = sender.command(index); auto& putc = dynamic_cast<CreateBucketCommand&>(*msg2); - std::unique_ptr<api::StorageReply> reply(putc.makeReply()); + std::shared_ptr<api::StorageReply> reply(putc.makeReply()); reply->setResult(api::ReturnCode(result, "")); - callback.receive(sender, - std::shared_ptr<StorageReply>(reply.release())); + callback.receive(sender, reply); } void @@ -312,6 +319,7 @@ TwoPhaseUpdateOperationTest::sendUpdate(const std::string& bucketState, update->setCreateIfNonExistent(options._createIfNonExistent); document::BucketId id = operation_context().make_split_bit_constrained_bucket_id(update->getId()); + assert(id == _bucket_id); document::BucketId id2 = document::BucketId(id.getUsedBits() + 1, id.getRawId()); if (bucketState.length()) { @@ -554,6 +562,33 @@ TEST_F(TwoPhaseUpdateOperationTest, fast_path_inconsistent_timestamps_inconsiste _sender.getLastReply(true)); } +TEST_F(TwoPhaseUpdateOperationTest, fast_path_cancellation_transitively_cancels_nested_update_operation) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=1/2/3"); + op->start(_sender); + + ASSERT_EQ("Update => 0,Update => 1", _sender.getCommands(true)); + + replyToMessage(*op, _sender, 0, 110); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToMessage(*op, _sender, 1, 110); + + // Client operation itself should return success since the update went through on all replica nodes + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 110) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=0,crc=0x123,docs=1/1,bytes=100/100,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + void TwoPhaseUpdateOperationTest::checkMessageSettingsPropagatedTo( const api::StorageCommand::SP& msg) const @@ -713,6 +748,38 @@ TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_safe_path_gets_fail) { EXPECT_EQ(metrics().updates.failures.storagefailure.getValue(), 1); } +void TwoPhaseUpdateOperationTest::do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(bool in_sync_replicas) { + setup_stripe(2, 2, "storage:2 distributor:1"); + enable_3phase_updates(); + auto op = sendUpdate("0=1/2/3,1=2/3/4", UpdateOptions().createIfNonExistent(true)); + op->start(_sender); + + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + replyToGet(*op, _sender, 0, 70); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 1, in_sync_replicas ? 70 : 80); + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO custom cancellation failure metric? +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_consistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(true); +} + +TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion_inconsistent_case) { + do_update_fails_if_cancelled_prior_to_safe_path_metadata_get_completion(false); +} + TEST_F(TwoPhaseUpdateOperationTest, update_fails_if_apply_throws_exception) { setup_stripe(2, 2, "storage:2 distributor:1"); // Create update for wrong doctype which will fail the update. @@ -1214,6 +1281,59 @@ TEST_F(ThreePhaseUpdateTest, puts_are_sent_after_receiving_full_document_get) { EXPECT_EQ(1, m.ok.getValue()); } +TEST_F(ThreePhaseUpdateTest, update_fails_if_cancelled_between_metadata_gets_and_full_get) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {1}); + op->cancel(_sender, CancelScope::of_node_subset({1})); + + replyToGet(*op, _sender, 2, 2000U); + ASSERT_EQ("", _sender.getCommands(true, false, 3)); // No puts sent + + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 0) " + "ReturnCode(BUCKET_NOT_FOUND, The update operation was cancelled due to a cluster " + "state change between executing the read and write phases of a write-repair update)", + _sender.getLastReply(true)); + + // TODO cancellation metrics? +} + +TEST_F(ThreePhaseUpdateTest, fast_path_cancellation_transitively_cancels_nested_put_operation) { + auto op = set_up_2_inconsistent_replicas_and_start_update(); + ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); + reply_to_metadata_get(*op, _sender, 0, 2000); + reply_to_metadata_get(*op, _sender, 1, 1000); + + ASSERT_EQ("Get => 0", _sender.getCommands(true, false, 2)); + replyToGet(*op, _sender, 2, 2000U); + + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bucket_id), {0}); + op->cancel(_sender, CancelScope::of_node_subset({0})); + + ASSERT_EQ("Put => 1,Put => 0", _sender.getCommands(true, false, 3)); + replyToPut(*op, _sender, 3); + replyToPut(*op, _sender, 4); + + // Update itself is ACKed + EXPECT_EQ("UpdateReply(id:ns:testdoctype1::1, " + "BucketId(0x0000000000000000), " + "timestamp 0, timestamp of updated doc: 2000) " + "ReturnCode(NONE)", + _sender.getLastReply(true)); + + // But cancelled replicas are not reintroduced into the bucket DB + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x1,docs=2/4,bytes=3/5,trusted=true,active=false,ready=false)", + dumpBucket(_bucket_id)); +} + + TEST_F(ThreePhaseUpdateTest, consistent_meta_get_timestamps_can_restart_in_fast_path) { auto cb = set_up_2_inconsistent_replicas_and_start_update(); ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); @@ -1277,8 +1397,7 @@ TEST_F(ThreePhaseUpdateTest, update_failed_with_transient_error_code_if_replica_ auto cb = sendUpdate("0=1/2/3,1=2/3/4"); // 2 replicas, room for 1 more. cb->start(_sender); // Add new replica to deterministic test bucket after gets have been sent - BucketId bucket(0x400000000000cac4); // Always the same in the test. - addNodesToBucketDB(bucket, "0=1/2/3,1=2/3/4,2=3/3/3"); + addNodesToBucketDB(_bucket_id, "0=1/2/3,1=2/3/4,2=3/3/3"); Timestamp old_timestamp = 500; ASSERT_EQ("Get => 0,Get => 1", _sender.getCommands(true)); diff --git a/storage/src/tests/distributor/updateoperationtest.cpp b/storage/src/tests/distributor/updateoperationtest.cpp index d0ae31b9524..fefc88a27c2 100644 --- a/storage/src/tests/distributor/updateoperationtest.cpp +++ b/storage/src/tests/distributor/updateoperationtest.cpp @@ -11,7 +11,7 @@ #include <vespa/storageapi/message/bucket.h> #include <vespa/storageapi/message/persistence.h> #include <vespa/storageapi/message/state.h> -#include <vespa/vespalib/gtest/gtest.h> +#include <gtest/gtest.h> using config::ConfigGetter; using config::FileSpec; @@ -29,11 +29,14 @@ struct UpdateOperationTest : Test, DistributorStripeTestUtil { std::shared_ptr<const DocumentTypeRepo> _repo; const DocumentType* _html_type; + UpdateOperationTest() + : _repo(std::make_shared<DocumentTypeRepo>(*ConfigGetter<DocumenttypesConfig>:: + getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))), + _html_type(_repo->getDocumentType("text/html")) + { + } + void SetUp() override { - _repo.reset( - new DocumentTypeRepo(*ConfigGetter<DocumenttypesConfig>:: - getConfig("config-doctypes", FileSpec("../config-doctypes.cfg")))); - _html_type = _repo->getDocumentType("text/html"); createLinks(); } @@ -241,4 +244,31 @@ TEST_F(UpdateOperationTest, inconsistent_create_if_missing_updates_picks_largest EXPECT_EQ(2, m.diverging_timestamp_updates.getValue()); } +// Note: we don't exhaustively test cancellation edges here, as we assume that Put/Update/Remove ops +// share the same underlying PersistenceMessageTracker logic. See PutOperationTest for more tests. + +TEST_F(UpdateOperationTest, cancelled_nodes_are_not_updated_in_db) { + setup_stripe(Redundancy(3), NodeCount(3), "distributor:1 storage:3"); + + std::shared_ptr<UpdateOperation> op = sendUpdate("0=1/2/3,1=1/2/3,2=1/2/3"); + DistributorMessageSenderStub sender; + op->start(sender); + + ASSERT_EQ("Update => 0,Update => 1,Update => 2", sender.getCommands(true)); + + // Simulate nodes 0 and 2 going down + operation_context().remove_nodes_from_bucket_database(makeDocumentBucket(_bId), {0, 2}); + // Cancelling shall be cumulative + op->cancel(_sender, CancelScope::of_node_subset({0})); + op->cancel(_sender, CancelScope::of_node_subset({2})); + + replyToMessage(*op, sender, 0, 120); + replyToMessage(*op, sender, 1, 120); + replyToMessage(*op, sender, 2, 120); + + EXPECT_EQ("BucketId(0x400000000000cac4) : " + "node(idx=1,crc=0x2,docs=4/4,bytes=6/6,trusted=true,active=false,ready=false)", + dumpBucket(_bId)); +} + } |